Changeset f625817


Ignore:
Timestamp:
09/11/15 15:00:27 (5 years ago)
Author:
Shane Alcock <salcock@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
322c516
Parents:
8c7490fe
Message:

Reworked callback API and removed old per_msg and reporter functions

Updated tracertstats to use the new callback API.

Extended the callback approach to the reporter thread as well as the per
packet threads.

Added libtrace_callback_set_t structure, which is used to register the
user callback functions.

Added callback functionality for MESSAGE_RESULT (needed now that reporter
threads also do callbacks) and MESSAGE_USER (for user-defined messages). The
MESSAGE_USER callback is essentially the same as the old per_msg function
style.

Updated combiners to use send_message to pass results to the reporter thread.
send_message itself is now no longer static, so that combiners can use it.

Disabled building of tracestats_parallel as it was using the older version
of the callback API. Will update in a future commit.

Files:
10 edited

Legend:

Unmodified
Added
Removed
  • lib/combiner_ordered.c

    r3dd5acc rf625817  
    6565                                libtrace_generic_t gt = {.res = &r};
    6666                                ASSERT_RET (libtrace_deque_pop_front(v, (void *) &r), == 1);
    67                                 trace->reporter(trace, MESSAGE_RESULT, gt, &trace->reporter_thread);
     67                                send_message(trace, &trace->reporter_thread,
     68                                                MESSAGE_RESULT, gt,
     69                                                &trace->reporter_thread);
    6870                                return 0;
    6971                        }
     
    8890                                libtrace_generic_t gt = {.res = &r};
    8991                                ASSERT_RET (libtrace_deque_pop_front(v, (void *) &r), == 1);
    90                                 trace->reporter(trace, MESSAGE_RESULT, gt, &trace->reporter_thread);
     92                                send_message(trace, &trace->reporter_thread,
     93                                                MESSAGE_RESULT, gt,
     94                                                &trace->reporter_thread);
    9195                                return 0;
    9296                        }
     
    152156
    153157                ASSERT_RET (libtrace_deque_pop_front(&queues[min_queue], (void *) &r), == 1);
    154                 trace->reporter(trace, MESSAGE_RESULT, gt, &trace->reporter_thread);
     158                send_message(trace, &trace->reporter_thread,
     159                                MESSAGE_RESULT, gt,
     160                                NULL);
    155161
    156162                // Now update the one we just removed
  • lib/combiner_sorted.c

    r3dd5acc rf625817  
    6767                        continue;
    6868                }
    69                 trace->reporter(trace, MESSAGE_RESULT, gt, &trace->reporter_thread);
     69                send_message(trace, &trace->reporter_thread, MESSAGE_RESULT,
     70                                gt, NULL);
    7071        }
    7172        libtrace_vector_empty(&queues[0]);
  • lib/combiner_unordered.c

    r3dd5acc rf625817  
    4949                                c->last_count_tick = r.key;
    5050                        }
    51                         trace->reporter(trace, MESSAGE_RESULT, gt, &trace->reporter_thread);
     51                        send_message(trace, &trace->reporter_thread,
     52                                MESSAGE_RESULT, gt, NULL);
    5253                }
    5354        }
  • lib/libtrace.h.in

    rd420777 rf625817  
    252252/** Opaque structure holding information about libtrace thread */
    253253typedef struct libtrace_thread_t libtrace_thread_t;
     254
     255/** Opaque structure holding callback functions for libtrace threads */
     256typedef struct callback_set libtrace_callback_set_t;
    254257
    255258/** If the packet has allocated its own memory the buffer_control should be
     
    14801483DLLEXPORT void trace_destroy_output(libtrace_out_t *trace);
    14811484
     1485/** Create a callback set that can be used to define callbacks for parallel
     1486  * libtrace threads.
     1487  *
     1488  * @return A pointer to a freshly allocated callback set.
     1489  */
     1490DLLEXPORT libtrace_callback_set_t *trace_create_callback_set();
     1491
     1492/** Destroys a callback set, freeing up an resources it was using.
     1493 *
     1494 * @param cbset         The callback set to be destroyed.
     1495 */
     1496DLLEXPORT void trace_destroy_callback_set(libtrace_callback_set_t *cbset);
     1497
     1498
    14821499/** Check (and clear) the current error state of an input trace
    14831500 * @param trace         The input trace to check the error state on
  • lib/libtrace_int.h

    r2fa43fa rf625817  
    284284};
    285285#define ZERO_USER_CONFIG(config) memset(&config, 0, sizeof(struct user_configuration));
     286
     287struct callback_set {
     288
     289        fn_cb_starting message_starting;
     290        fn_cb_dataless message_stopping;
     291        fn_cb_dataless message_resuming;
     292        fn_cb_dataless message_pausing;
     293        fn_cb_packet message_packet;
     294        fn_cb_result message_result;
     295        fn_cb_first_packet message_first_packet;
     296        fn_cb_tick message_tick_count;
     297        fn_cb_tick message_tick_interval;
     298        fn_cb_usermessage message_user;
     299};
    286300
    287301/** A libtrace input trace
     
    333347        /** The actual freelist */
    334348        libtrace_ocache_t packet_freelist;
    335         /** User defined per_msg function called when a message is ready */
    336         fn_cb_msg per_msg;
    337         /** User defined reporter function entry point */
    338         fn_reporter reporter;
    339349        /** The hasher function */
    340350        enum hasher_types hasher_type;
     
    361371        struct user_configuration config;
    362372        libtrace_combine_t combiner;
    363         struct {
    364                 fn_cb_starting message_starting;
    365                 fn_cb_dataless message_stopping;
    366                 fn_cb_dataless message_resuming;
    367                 fn_cb_dataless message_pausing;
    368                 fn_cb_packet message_packet;
    369                 fn_cb_first_packet message_first_packet;
    370                 fn_cb_tick message_tick_count;
    371                 fn_cb_tick message_tick_interval;
    372         } callbacks;
     373       
     374        /* Set of callbacks to be executed by per packet threads in response
     375         * to various messages. */
     376        struct callback_set *perpkt_cbs;
     377        /* Set of callbacks to be executed by the reporter thread in response
     378         * to various messages. */
     379        struct callback_set *reporter_cbs;
    373380};
    374381
     
    380387libtrace_thread_t * get_thread_table(libtrace_t *libtrace);
    381388
     389
     390void send_message(libtrace_t *trace, libtrace_thread_t *target,
     391                const enum libtrace_messages type,
     392                libtrace_generic_t data, libtrace_thread_t *sender);
    382393
    383394/** A libtrace output trace
  • lib/libtrace_parallel.h

    r3dd5acc rf625817  
    460460 * @param libtrace The input trace to start
    461461 * @param global_blob Global data related to this trace accessible using trace_get_global()
    462  * @param per_msg A user supplied function called when a message is ready
    463  * @param reporter A user supplied function called when a result is ready.
     462 * @param per_packet_cbs A set of user supplied functions to be called in response to events being observed by the per_pkt threads.
     463 * @param reporter_cbs A set of user supplied functions to be called in response to events / results being seen by the reporter thread.
    464464 * Optional if NULL the reporter thread will not be started.
    465465 * @return 0 on success, otherwise -1 to indicate an error has occurred
     
    467467 * This can also be used to restart an existing parallel trace,
    468468 * that has previously been paused using trace_ppause().
    469  * In this case global_blob,per_msg and reporter will only be updated
    470  * if they are non-null. Otherwise their previous values will be maintained.
     469 * In this case global_blob, per_packet_cbs and reporter_cbs will only be
     470 * updated if they are non-null. Otherwise their previous values will be
     471 * maintained.
    471472 *
    472473 */
    473474DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob,
    474                            fn_cb_msg per_msg, fn_reporter reporter);
     475                           libtrace_callback_set_t *per_packet_cbs,
     476                           libtrace_callback_set_t *reporter_cbs);
    475477
    476478/**
     
    539541                                           libtrace_packet_t *packet);
    540542
     543/**
     544 * Callback for handling a result message. Should only be required by the
     545 * reporter thread.
     546 *
     547 * @param libtrace The parallel trace
     548 * @param sender The thread that generated this result
     549 * @param global The global storage
     550 * @param tls The thread local storage
     551 * @param result The result associated with the message
     552 *
     553 */
     554typedef void (*fn_cb_result)(libtrace_t *libtrace, libtrace_thread_t *sender,
     555                void *global, void *tls, libtrace_result_t *result);
     556
     557
     558/**
     559 * Callback for handling any user-defined message types. This will handle
     560 * any messages with a type >= MESSAGE_USER.
     561 *
     562 * @param libtrace The parallel trace
     563 * @param t The thread
     564 * @param global The global storage
     565 * @param tls The thread local storage
     566 * @param mesg The code identifying the message type
     567 * @param data The data associated with the message
     568 *
     569 */
     570typedef void (*fn_cb_usermessage) (libtrace_t *libtrace, libtrace_thread_t *t,
     571                void *global, void *tls, int mesg, libtrace_generic_t data);
     572
    541573/** Registers a built-in message with a handler.
    542574 * Note we do not include the sending thread as an argument to the reporter.
     
    548580 */
    549581
    550 DLLEXPORT int trace_cb_starting(libtrace_t *libtrace, fn_cb_starting handler);
    551 DLLEXPORT int trace_cb_stopping(libtrace_t *libtrace, fn_cb_dataless handler);
    552 DLLEXPORT int trace_cb_resuming(libtrace_t *libtrace, fn_cb_dataless handler);
    553 DLLEXPORT int trace_cb_pausing(libtrace_t *libtrace, fn_cb_dataless handler);
    554 DLLEXPORT int trace_cb_packet(libtrace_t *libtrace, fn_cb_packet handler);
    555 DLLEXPORT int trace_cb_first_packet(libtrace_t *libtrace, fn_cb_first_packet handler);
    556 DLLEXPORT int trace_cb_tick_count(libtrace_t *libtrace, fn_cb_tick handler);
    557 DLLEXPORT int trace_cb_tick_interval(libtrace_t *libtrace, fn_cb_tick handler);
     582DLLEXPORT int trace_set_starting_cb(libtrace_callback_set_t *cbset,
     583                fn_cb_starting handler);
     584
     585DLLEXPORT int trace_set_stopping_cb(libtrace_callback_set_t *cbset,
     586                fn_cb_dataless handler);
     587
     588DLLEXPORT int trace_set_resuming_cb(libtrace_callback_set_t *cbset,
     589                fn_cb_dataless handler);
     590
     591DLLEXPORT int trace_set_pausing_cb(libtrace_callback_set_t *cbset,
     592                fn_cb_dataless handler);
     593
     594DLLEXPORT int trace_set_packet_cb(libtrace_callback_set_t *cbset,
     595                fn_cb_packet handler);
     596
     597DLLEXPORT int trace_set_first_packet_cb(libtrace_callback_set_t *cbset,
     598                fn_cb_first_packet handler);
     599
     600DLLEXPORT int trace_set_result_cb(libtrace_callback_set_t *cbset,
     601                fn_cb_result handler);
     602
     603DLLEXPORT int trace_set_tick_count_cb(libtrace_callback_set_t *cbset,
     604                fn_cb_tick handler);
     605
     606DLLEXPORT int trace_set_tick_interval_cb(libtrace_callback_set_t *cbset,
     607                fn_cb_tick handler);
     608
     609DLLEXPORT int trace_set_user_message_cb(libtrace_callback_set_t *cbset,
     610                fn_cb_usermessage handler);
    558611
    559612/** Pauses a trace previously started with trace_pstart()
  • lib/trace.c

    r0a368ae rf625817  
    269269        libtrace->perpkt_queue_full = false;
    270270        libtrace->global_blob = NULL;
    271         libtrace->per_msg = NULL;
    272         libtrace->reporter = NULL;
    273271        libtrace->hasher = NULL;
    274272        libtrace_zero_ocache(&libtrace->packet_freelist);
     
    288286        ZERO_USER_CONFIG(libtrace->config);
    289287        memset(&libtrace->combiner, 0, sizeof(libtrace->combiner));
    290         memset(&libtrace->callbacks, 0, sizeof(libtrace->callbacks));
     288        libtrace->perpkt_cbs = NULL;
     289        libtrace->reporter_cbs = NULL;
    291290
    292291        /* Parse the URI to determine what sort of trace we are dealing with */
     
    393392        libtrace->perpkt_queue_full = false;
    394393        libtrace->global_blob = NULL;
    395         libtrace->per_msg = NULL;
    396         libtrace->reporter = NULL;
    397394        libtrace->hasher = NULL;
    398395        libtrace_zero_ocache(&libtrace->packet_freelist);
     
    409406        ZERO_USER_CONFIG(libtrace->config);
    410407        memset(&libtrace->combiner, 0, sizeof(libtrace->combiner));
    411         memset(&libtrace->callbacks, 0, sizeof(libtrace->callbacks));
     408        libtrace->perpkt_cbs = NULL;
     409        libtrace->reporter_cbs = NULL;
    412410       
    413411        for(tmp=formats_list;tmp;tmp=tmp->next) {
     
    721719                // This has all of our packets
    722720                libtrace_ocache_destroy(&libtrace->packet_freelist);
    723                 if (libtrace->combiner.destroy && libtrace->reporter)
     721                if (libtrace->combiner.destroy && libtrace->reporter_cbs)
    724722                        libtrace->combiner.destroy(libtrace, &libtrace->combiner);
    725723                free(libtrace->perpkt_threads);
    726724                libtrace->perpkt_threads = NULL;
    727725                libtrace->perpkt_thread_count = 0;
    728         }
     726
     727        }
     728
     729        if (libtrace->perpkt_cbs)
     730                trace_destroy_callback_set(libtrace->perpkt_cbs);
     731        if (libtrace->reporter_cbs)
     732                trace_destroy_callback_set(libtrace->reporter_cbs);
     733
    729734       
    730735        if (libtrace->event.packet) {
  • lib/trace_parallel.c

    r9346e4a rf625817  
    8080
    8181#include "libtrace.h"
    82 #include "libtrace_int.h"
     82#include "libtrace_parallel.h"
    8383
    8484#ifdef HAVE_PCAP_BPF_H
     
    173173
    174174/* This should optimise away the switch to nothing in the explict cases */
    175 static inline void send_message(libtrace_t *trace, libtrace_thread_t *thread, const enum libtrace_messages type,
    176                                 libtrace_generic_t data, libtrace_thread_t *sender) {
     175inline void send_message(libtrace_t *trace, libtrace_thread_t *thread,
     176                const enum libtrace_messages type,
     177                libtrace_generic_t data, libtrace_thread_t *sender) {
     178
    177179        fn_cb_dataless fn = NULL;
    178         switch (type) {
     180        enum libtrace_messages switchtype;
     181        libtrace_callback_set_t *cbs = NULL;
     182
     183        if (thread == &trace->reporter_thread) {
     184                cbs = trace->reporter_cbs;
     185        } else {
     186                cbs = trace->perpkt_cbs;
     187        }
     188
     189        if (cbs == NULL)
     190                return;
     191
     192        if (type >= MESSAGE_USER)
     193                switchtype = MESSAGE_USER;
     194        else
     195                switchtype = (enum libtrace_messages) type;
     196
     197        switch (switchtype) {
    179198        case MESSAGE_STARTING:
    180                 if (trace->callbacks.message_starting)
    181                         thread->user_data = (*trace->callbacks.message_starting)(trace, thread, trace->global_blob);
    182                 else if (trace->per_msg)
    183                         (*trace->per_msg)(trace, thread, type, data, sender);
     199                if (cbs->message_starting)
     200                        thread->user_data = (*cbs->message_starting)(trace,
     201                                        thread, trace->global_blob);
    184202                return;
    185203        case MESSAGE_FIRST_PACKET:
    186                 if (trace->callbacks.message_first_packet)
    187                         (*trace->callbacks.message_first_packet)(trace, thread, trace->global_blob, thread->user_data, data.pkt, sender);
    188                 else if (trace->per_msg)
    189                         (*trace->per_msg)(trace, thread, type, data, sender);
     204                if (cbs->message_first_packet)
     205                                (*cbs->message_first_packet)(trace, thread,
     206                                trace->global_blob, thread->user_data,
     207                                data.pkt, sender);
    190208                return;
    191209        case MESSAGE_TICK_COUNT:
    192                 if (trace->callbacks.message_tick_count)
    193                         (*trace->callbacks.message_tick_count)(trace, thread, trace->global_blob, thread->user_data, data.uint64);
    194                 else if (trace->per_msg)
    195                         (*trace->per_msg)(trace, thread, type, data, sender);
     210                if (cbs->message_tick_count)
     211                        (*cbs->message_tick_count)(trace, thread,
     212                                        trace->global_blob, thread->user_data,
     213                                        data.uint64);
    196214                return;
    197215        case MESSAGE_TICK_INTERVAL:
    198                 if (trace->callbacks.message_tick_interval)
    199                         (*trace->callbacks.message_tick_interval)(trace, thread, trace->global_blob, thread->user_data,  data.uint64);
    200                 else if (trace->per_msg)
    201                         (*trace->per_msg)(trace, thread, type, data, sender);
     216                if (cbs->message_tick_interval)
     217                        (*cbs->message_tick_interval)(trace, thread,
     218                                        trace->global_blob, thread->user_data,
     219                                        data.uint64);
    202220                return;
    203221        case MESSAGE_STOPPING:
    204                 fn = trace->callbacks.message_stopping;
     222                fn = cbs->message_stopping;
    205223                break;
    206224        case MESSAGE_RESUMING:
    207                 fn = trace->callbacks.message_resuming;
     225                fn = cbs->message_resuming;
    208226                break;
    209227        case MESSAGE_PAUSING:
    210                 fn = trace->callbacks.message_pausing;
     228                fn = cbs->message_pausing;
    211229                break;
     230        case MESSAGE_USER:
     231                if (cbs->message_user)
     232                        (*cbs->message_user)(trace, thread, trace->global_blob,
     233                                        thread->user_data, type, data);
     234                return;
     235        case MESSAGE_RESULT:
     236                if (cbs->message_result)
     237                        (*cbs->message_result)(trace, thread,
     238                                        trace->global_blob, thread->user_data,
     239                                        data.res);
    212240
    213241        /* These should be unused */
     
    215243        case MESSAGE_DO_STOP:
    216244        case MESSAGE_POST_REPORTER:
    217         case MESSAGE_RESULT:
    218245        case MESSAGE_PACKET:
    219246                return;
    220         case MESSAGE_USER:
    221                 break;
    222         }
     247        }
     248
    223249        if (fn)
    224250                (*fn)(trace, thread, trace->global_blob, thread->user_data);
    225         else if (trace->per_msg)
    226                 (*trace->per_msg)(trace, thread, type, data, sender);
     251}
     252
     253DLLEXPORT libtrace_callback_set_t *trace_create_callback_set() {
     254        libtrace_callback_set_t *cbset;
     255
     256        cbset = (libtrace_callback_set_t *)malloc(sizeof(libtrace_callback_set_t));
     257        memset(cbset, 0, sizeof(libtrace_callback_set_t));
     258        return cbset;
     259}
     260
     261DLLEXPORT void trace_destroy_callback_set(libtrace_callback_set_t *cbset) {
     262        free(cbset);
    227263}
    228264
     
    239275{
    240276        assert(libtrace->state != STATE_NEW);
    241         return libtrace->reporter_thread.type == THREAD_REPORTER && libtrace->reporter;
     277        return libtrace->reporter_thread.type == THREAD_REPORTER && libtrace->reporter_cbs;
    242278}
    243279
     
    441477                }
    442478                t->accepted_packets++;
    443                 libtrace_generic_t data = {.pkt = *packet};
    444                 if (trace->callbacks.message_packet)
    445                         *packet = (*trace->callbacks.message_packet)(trace, t, trace->global_blob, t->user_data, *packet);
    446                 else if (trace->per_msg)
    447                         *packet = (*trace->per_msg)(trace, t, MESSAGE_PACKET, data, t);
     479                if (trace->perpkt_cbs->message_packet)
     480                        *packet = (*trace->perpkt_cbs->message_packet)(trace, t, trace->global_blob, t->user_data, *packet);
    448481                trace_fin_packet(*packet);
    449482        } else {
     
    624657                                        goto eof;
    625658                        }
    626                         (*trace->per_msg)(trace, t, message.code, message.data, message.sender);
    627                         (*trace->per_msg)(trace, t, message.code, message.data, message.sender);
     659                        send_message(trace, t, message.code, message.data,
     660                                        message.sender);
    628661                        /* Continue and the empty messages out before packets */
    629662                        continue;
     
    10671100        }
    10681101
    1069         (*trace->reporter)(trace, MESSAGE_STARTING, (libtrace_generic_t) {0}, t);
    1070         (*trace->reporter)(trace, MESSAGE_RESUMING, (libtrace_generic_t) {0}, t);
     1102        send_message(trace, t, MESSAGE_STARTING, (libtrace_generic_t){0}, t);
     1103        send_message(trace, t, MESSAGE_RESUMING, (libtrace_generic_t){0}, t);
    10711104
    10721105        while (!trace_has_finished(trace)) {
     
    10851118                                assert(trace->combiner.pause);
    10861119                                trace->combiner.pause(trace, &trace->combiner);
    1087                                 (*trace->reporter)(trace, MESSAGE_PAUSING, (libtrace_generic_t) {0}, t);
     1120                                send_message(trace, t, MESSAGE_PAUSING,
     1121                                                (libtrace_generic_t) {0}, t);
    10881122                                trace_thread_pause(trace, t);
    1089                                 (*trace->reporter)(trace, MESSAGE_RESUMING, (libtrace_generic_t) {0}, t);
     1123                                send_message(trace, t, MESSAGE_RESUMING,
     1124                                                (libtrace_generic_t) {0}, t);
    10901125                                break;
    10911126                default:
    1092                         (*trace->reporter)(trace, message.code, message.data, message.sender);
     1127                        send_message(trace, t, message.code, message.data,
     1128                                        message.sender);
    10931129                }
    10941130        }
     
    10981134
    10991135        // GOODBYE
    1100         (*trace->reporter)(trace, MESSAGE_PAUSING, (libtrace_generic_t) {0}, t);
    1101         (*trace->reporter)(trace, MESSAGE_STOPPING, (libtrace_generic_t) {0}, t);
     1136        send_message(trace, t, MESSAGE_PAUSING,(libtrace_generic_t) {0}, t);
     1137        send_message(trace, t, MESSAGE_STOPPING,(libtrace_generic_t) {0}, t);
    11021138
    11031139        thread_change_state(trace, &trace->reporter_thread, THREAD_FINISHED, true);
     
    13151351 */
    13161352static int trace_prestart(libtrace_t * libtrace, void *global_blob,
    1317                           fn_cb_msg per_msg, fn_reporter reporter) {
     1353                          libtrace_callback_set_t *per_packet_cbs,
     1354                          libtrace_callback_set_t *reporter_cbs) {
    13181355        int i, err = 0;
    13191356        if (libtrace->state != STATE_PAUSED) {
     
    13581395
    13591396        /* Update functions if requested */
    1360         if (per_msg)
    1361                 libtrace->per_msg = per_msg;
    1362         assert(libtrace->per_msg);
    1363         if (reporter)
    1364                 libtrace->reporter = reporter;
    13651397        if(global_blob)
    13661398                libtrace->global_blob = global_blob;
     1399
     1400        if (per_packet_cbs) {
     1401                if (libtrace->perpkt_cbs)
     1402                        trace_destroy_callback_set(libtrace->perpkt_cbs);
     1403                libtrace->perpkt_cbs = trace_create_callback_set();
     1404                memcpy(libtrace->perpkt_cbs, per_packet_cbs,
     1405                                sizeof(libtrace_callback_set_t));
     1406        }
     1407
     1408        if (reporter_cbs) {
     1409                if (libtrace->reporter_cbs)
     1410                        trace_destroy_callback_set(libtrace->reporter_cbs);
     1411
     1412                libtrace->reporter_cbs = trace_create_callback_set();
     1413                memcpy(libtrace->reporter_cbs, reporter_cbs,
     1414                                sizeof(libtrace_callback_set_t));
     1415        }
    13671416
    13681417        if (trace_is_parallel(libtrace)) {
     
    15861635
    15871636DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob,
    1588                            fn_cb_msg per_msg, fn_reporter reporter) {
     1637                           libtrace_callback_set_t *per_packet_cbs,
     1638                           libtrace_callback_set_t *reporter_cbs) {
    15891639        int i;
    15901640        int ret = -1;
     
    15991649
    16001650        if (libtrace->state == STATE_PAUSED) {
    1601                 ret = trace_prestart(libtrace, global_blob, per_msg, reporter);
     1651                ret = trace_prestart(libtrace, global_blob, per_packet_cbs,
     1652                                reporter_cbs);
    16021653                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    16031654                return ret;
     
    16141665        /* Store the user defined things against the trace */
    16151666        libtrace->global_blob = global_blob;
    1616         libtrace->per_msg = per_msg;
    1617         libtrace->reporter = reporter;
     1667
     1668        /* Save a copy of the callbacks in case the user tries to change them
     1669         * on us later */
     1670        if (!per_packet_cbs) {
     1671                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "trace_pstart "
     1672                                "requires a non-NULL set of per packet "
     1673                                "callbacks.");
     1674                goto cleanup_none;
     1675        }
     1676
     1677        if (per_packet_cbs->message_packet == NULL) {
     1678                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "The per "
     1679                                "packet callbacks must include a handler "
     1680                                "for a packet. Please set this using "
     1681                                "trace_set_packet_cb().");
     1682                goto cleanup_none;
     1683        }
     1684
     1685        libtrace->perpkt_cbs = trace_create_callback_set();
     1686        memcpy(libtrace->perpkt_cbs, per_packet_cbs, sizeof(libtrace_callback_set_t));
     1687       
     1688        if (reporter_cbs) {
     1689                libtrace->reporter_cbs = trace_create_callback_set();
     1690                memcpy(libtrace->reporter_cbs, reporter_cbs, sizeof(libtrace_callback_set_t));
     1691        }
     1692
     1693       
     1694
     1695
    16181696        /* And zero other fields */
    16191697        for (i = 0; i < THREAD_STATE_MAX; ++i) {
     
    16941772
    16951773        /* Start the reporter thread */
    1696         if (reporter) {
     1774        if (reporter_cbs) {
    16971775                if (libtrace->combiner.initialise)
    16981776                        libtrace->combiner.initialise(libtrace, &libtrace->combiner);
     
    17931871}
    17941872
    1795 DLLEXPORT int trace_cb_starting(libtrace_t *libtrace, fn_cb_starting handler) {
    1796         libtrace->callbacks.message_starting = handler;
    1797         return 0;
    1798 }
    1799 
    1800 DLLEXPORT int trace_cb_pausing(libtrace_t *libtrace, fn_cb_dataless handler) {
    1801         libtrace->callbacks.message_pausing = handler;
    1802         return 0;
    1803 }
    1804 
    1805 DLLEXPORT int trace_cb_resuming(libtrace_t *libtrace, fn_cb_dataless handler) {
    1806         libtrace->callbacks.message_resuming = handler;
    1807         return 0;
    1808 }
    1809 
    1810 DLLEXPORT int trace_cb_stopping(libtrace_t *libtrace, fn_cb_dataless handler) {
    1811         libtrace->callbacks.message_stopping = handler;
    1812         return 0;
    1813 }
    1814 
    1815 DLLEXPORT int trace_cb_packet(libtrace_t *libtrace, fn_cb_packet handler) {
    1816         libtrace->callbacks.message_packet = handler;
    1817         return 0;
    1818 }
    1819 
    1820 DLLEXPORT int trace_cb_first_packet(libtrace_t *libtrace, fn_cb_first_packet handler) {
    1821         libtrace->callbacks.message_first_packet = handler;
    1822         return 0;
    1823 }
    1824 
    1825 DLLEXPORT int trace_cb_tick_count(libtrace_t *libtrace, fn_cb_tick handler) {
    1826         libtrace->callbacks.message_tick_count = handler;
    1827         return 0;
    1828 }
    1829 
    1830 DLLEXPORT int trace_cb_tick_interval(libtrace_t *libtrace, fn_cb_tick handler) {
    1831         libtrace->callbacks.message_tick_interval = handler;
     1873DLLEXPORT int trace_set_starting_cb(libtrace_callback_set_t *cbset,
     1874                fn_cb_starting handler) {
     1875        cbset->message_starting = handler;
     1876        return 0;
     1877}
     1878
     1879DLLEXPORT int trace_set_pausing_cb(libtrace_callback_set_t *cbset,
     1880                fn_cb_dataless handler) {
     1881        cbset->message_pausing = handler;
     1882        return 0;
     1883}
     1884
     1885DLLEXPORT int trace_set_resuming_cb(libtrace_callback_set_t *cbset,
     1886                fn_cb_dataless handler) {
     1887        cbset->message_resuming = handler;
     1888        return 0;
     1889}
     1890
     1891DLLEXPORT int trace_set_stopping_cb(libtrace_callback_set_t *cbset,
     1892                fn_cb_dataless handler) {
     1893        cbset->message_stopping = handler;
     1894        return 0;
     1895}
     1896
     1897DLLEXPORT int trace_set_packet_cb(libtrace_callback_set_t *cbset,
     1898                fn_cb_packet handler) {
     1899        cbset->message_packet = handler;
     1900        return 0;
     1901}
     1902
     1903DLLEXPORT int trace_set_first_packet_cb(libtrace_callback_set_t *cbset,
     1904                fn_cb_first_packet handler) {
     1905        cbset->message_first_packet = handler;
     1906        return 0;
     1907}
     1908
     1909DLLEXPORT int trace_set_tick_count_cb(libtrace_callback_set_t *cbset,
     1910                fn_cb_tick handler) {
     1911        cbset->message_tick_count = handler;
     1912        return 0;
     1913}
     1914
     1915DLLEXPORT int trace_set_tick_interval_cb(libtrace_callback_set_t *cbset,
     1916                fn_cb_tick handler) {
     1917        cbset->message_tick_interval = handler;
     1918        return 0;
     1919}
     1920
     1921DLLEXPORT int trace_set_result_cb(libtrace_callback_set_t *cbset,
     1922                fn_cb_result handler) {
     1923        cbset->message_result = handler;
     1924        return 0;
     1925}
     1926
     1927DLLEXPORT int trace_set_user_message_cb(libtrace_callback_set_t *cbset,
     1928                fn_cb_usermessage handler) {
     1929        cbset->message_user = handler;
    18321930        return 0;
    18331931}
  • tools/tracertstats/tracertstats.c

    r8c7490fe rf625817  
    130130
    131131static uint64_t glob_last_ts = 0;
    132 static void process_result(libtrace_t *trace, int mesg,
    133                            libtrace_generic_t data,
    134                            libtrace_thread_t *sender UNUSED) {
     132static void cb_result(libtrace_t *trace, libtrace_thread_t *sender UNUSED,
     133                void *global UNUSED, void *tls UNUSED,
     134                libtrace_result_t *result) {
    135135        uint64_t ts = 0;
    136136        static bool stopped = false;
     
    142142                return;
    143143
    144         switch (mesg) {
    145                 case MESSAGE_RESULT:
    146                 ts = data.res->key;
    147                 res = data.res->value.ptr;
    148                 if (glob_last_ts == 0)
    149                         glob_last_ts = ts;
    150                 while ((glob_last_ts >> 32) < (ts >> 32)) {
    151                         report_results(glob_last_ts >> 32, count, bytes);
    152                         count = 0;
    153                         bytes = 0;
    154                         for (j = 0; j < filter_count; j++)
    155                                 filters[j].count = filters[j].bytes = 0;
    156                         glob_last_ts = ts;
    157                 }
    158                 count += res->total.count;
    159                 packets_seen += res->total.count;
    160                 bytes += res->total.bytes;
    161                 for (j = 0; j < filter_count; j++) {
    162                         filters[j].count += res->filters[j].count;
    163                         filters[j].bytes += res->filters[j].bytes;
    164                 }
    165                 free(res);
    166         }
     144        ts = result->key;
     145        res = result->value.ptr;
     146        if (glob_last_ts == 0)
     147                glob_last_ts = ts;
     148        while ((glob_last_ts >> 32) < (ts >> 32)) {
     149                report_results(glob_last_ts >> 32, count, bytes);
     150                count = 0;
     151                bytes = 0;
     152                for (j = 0; j < filter_count; j++)
     153                        filters[j].count = filters[j].bytes = 0;
     154                glob_last_ts = ts;
     155        }
     156        count += res->total.count;
     157        packets_seen += res->total.count;
     158        bytes += res->total.bytes;
     159        for (j = 0; j < filter_count; j++) {
     160                filters[j].count += res->filters[j].count;
     161                filters[j].bytes += res->filters[j].bytes;
     162        }
     163        free(res);
    167164
    168165        /* Be careful to only call pstop once from within this thread! */
     
    246243{
    247244        libtrace_t *trace = NULL;
    248         if (!merge_inputs)
     245        libtrace_callback_set_t *pktcbs, *repcbs;
     246
     247        if (!merge_inputs)
    249248                create_output(uri);
    250249
     
    268267        }
    269268
    270         trace_cb_starting(trace, cb_starting);
    271         trace_cb_stopping(trace, cb_stopping);
    272         trace_cb_packet(trace, cb_packet);
    273         trace_cb_tick_count(trace, cb_tick);
    274         trace_cb_tick_interval(trace, cb_tick);
    275 
    276         if (trace_pstart(trace, NULL, NULL, process_result)==-1) {
     269        pktcbs = trace_create_callback_set();
     270        trace_set_starting_cb(pktcbs, cb_starting);
     271        trace_set_stopping_cb(pktcbs, cb_stopping);
     272        trace_set_packet_cb(pktcbs, cb_packet);
     273        trace_set_tick_count_cb(pktcbs, cb_tick);
     274        trace_set_tick_interval_cb(pktcbs, cb_tick);
     275
     276        repcbs = trace_create_callback_set();
     277        trace_set_result_cb(repcbs, cb_result);
     278
     279        if (trace_pstart(trace, NULL, pktcbs, repcbs)==-1) {
    277280                trace_perror(trace,"Failed to start trace");
    278281                trace_destroy(trace);
     282                trace_destroy_callback_set(pktcbs);
     283                trace_destroy_callback_set(repcbs);
    279284                if (!merge_inputs)
    280285                        output_destroy(output);
     
    292297
    293298        trace_destroy(trace);
     299        trace_destroy_callback_set(pktcbs);
     300        trace_destroy_callback_set(repcbs);
    294301
    295302        if (!merge_inputs)
  • tools/tracestats/Makefile.am

    r29bbef0 rf625817  
    1 bin_PROGRAMS = tracestats tracestats_parallel
     1bin_PROGRAMS = tracestats
    22bin_SCRIPTS = tracesummary
    33
     
    77include ../Makefile.tools
    88tracestats_SOURCES = tracestats.c
    9 tracestats_parallel_SOURCES = tracestats_parallel.c
Note: See TracChangeset for help on using the changeset viewer.