Changeset 76291d1


Ignore:
Timestamp:
03/31/15 16:02:53 (6 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
4007dbb
Parents:
58bfabf
Message:

Add a quick mockup of shanes interface using functions for each message

I.e. set a function for MESSAGE_PACKET etc. which only receives packets.

Ported tracestats_parallel to use this interface.

Note it is not yet complete, however should give a feel of how it will work.
And provide a good work around to the thread issues on older machines.

Files:
5 edited

Legend:

Unmodified
Added
Removed
  • lib/libtrace_int.h

    rc723e9e r76291d1  
    358358        struct user_configuration config;
    359359        libtrace_combine_t combiner;
     360        struct {
     361                fn_handler message_starting;
     362                fn_handler message_stopping;
     363                fn_handler message_resuming;
     364                fn_handler message_pausing;
     365                fn_handler message_packet;
     366        } callbacks;
    360367};
    361368
  • lib/libtrace_parallel.h

    rd3849c7 r76291d1  
    465465DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob,
    466466                           fn_per_pkt per_pkt, fn_reporter reporter);
     467
     468
     469/**
     470 * @param libtrace The parallel trace
     471 * @param t The thread
     472 * @param data The data associated with the message
     473 * @param global The global storage
     474 * @param tls The thread local storage
     475 */
     476typedef void* (*fn_handler)(libtrace_t *libtrace,
     477                               libtrace_thread_t *t,
     478                               libtrace_generic_t data,
     479                               void *global,
     480                               void *tls);
     481
     482/** Registers a built-in message with a handler.
     483 * Note we do not include the sending thread as an argument to the reporter.
     484 * If set to NULL, the message will be sent to default perpkt handler.
     485 *
     486 * @param libtrace The input trace to start
     487 * @param message The message to intercept
     488 * @param handler the handler to be called when the message is received
     489 * @return 0 if successful otherwise -1.
     490 */
     491DLLEXPORT int trace_set_handler(libtrace_t *libtrace, enum libtrace_messages message, fn_handler handler);
    467492
    468493/** Pauses a trace previously started with trace_pstart()
  • lib/trace.c

    r8370482 r76291d1  
    287287        ZERO_USER_CONFIG(libtrace->config);
    288288        memset(&libtrace->combiner, 0, sizeof(libtrace->combiner));
     289        memset(&libtrace->callbacks, 0, sizeof(libtrace->callbacks));
    289290
    290291        /* Parse the URI to determine what sort of trace we are dealing with */
     
    405406        ZERO_USER_CONFIG(libtrace->config);
    406407        memset(&libtrace->combiner, 0, sizeof(libtrace->combiner));
     408        memset(&libtrace->callbacks, 0, sizeof(libtrace->callbacks));
    407409       
    408410        for(tmp=formats_list;tmp;tmp=tmp->next) {
  • lib/trace_parallel.c

    rd3849c7 r76291d1  
    377377                t->accepted_packets++;
    378378                libtrace_generic_t data = {.pkt = *packet};
    379                 *packet = (*trace->per_pkt)(trace, t, MESSAGE_PACKET, data, t);
     379                if (trace->callbacks.message_packet)
     380                        *packet = (*trace->callbacks.message_packet)(trace, t, data, trace->global_blob, t->user_data);
     381                else
     382                        *packet = (*trace->per_pkt)(trace, t, MESSAGE_PACKET, data, t);
    380383                trace_fin_packet(*packet);
    381384        } else {
     
    535538
    536539        /* Let the per_packet function know we have started */
    537         (*trace->per_pkt)(trace, t, MESSAGE_STARTING, (libtrace_generic_t){0}, t);
    538         (*trace->per_pkt)(trace, t, MESSAGE_RESUMING, (libtrace_generic_t){0}, t);
     540        if (trace->callbacks.message_starting)
     541                (*trace->callbacks.message_starting)(trace, t, (libtrace_generic_t){0}, trace->global_blob, t->user_data);
     542        else
     543                (*trace->per_pkt)(trace, t, MESSAGE_STARTING, (libtrace_generic_t){0}, t);
     544
     545        if (trace->callbacks.message_resuming)
     546                (*trace->callbacks.message_resuming)(trace, t, (libtrace_generic_t){0}, trace->global_blob, t->user_data);
     547        else
     548                (*trace->per_pkt)(trace, t, MESSAGE_RESUMING, (libtrace_generic_t){0}, t);
    539549
    540550        for (;;) {
     
    619629
    620630        // Let the per_packet function know we have stopped
    621         (*trace->per_pkt)(trace, t, MESSAGE_PAUSING, (libtrace_generic_t){0}, t);
    622         (*trace->per_pkt)(trace, t, MESSAGE_STOPPING, (libtrace_generic_t){0}, t);
     631        if (trace->callbacks.message_pausing)
     632                (*trace->callbacks.message_pausing)(trace, t, (libtrace_generic_t){0}, trace->global_blob, t->user_data);
     633        else
     634                (*trace->per_pkt)(trace, t, MESSAGE_PAUSING, (libtrace_generic_t){0}, t);
     635        if (trace->callbacks.message_stopping)
     636                (*trace->callbacks.message_stopping)(trace, t, (libtrace_generic_t){0}, trace->global_blob, t->user_data);
     637        else
     638                (*trace->per_pkt)(trace, t, MESSAGE_STOPPING, (libtrace_generic_t){0}, t);
     639
    623640
    624641        // Free any remaining packets
     
    17191736        return ret;
    17201737}
     1738
     1739DLLEXPORT int trace_set_handler(libtrace_t *libtrace, enum libtrace_messages message, fn_handler handler) {
     1740        switch (message) {
     1741        case MESSAGE_STARTING:
     1742                libtrace->callbacks.message_starting = handler;
     1743                return 0;
     1744        case MESSAGE_STOPPING:
     1745                libtrace->callbacks.message_stopping = handler;
     1746                return 0;
     1747        case MESSAGE_RESUMING:
     1748                libtrace->callbacks.message_resuming = handler;
     1749                return 0;
     1750        case MESSAGE_PAUSING:
     1751                libtrace->callbacks.message_pausing = handler;
     1752                return 0;
     1753        case MESSAGE_PACKET:
     1754                libtrace->callbacks.message_packet = handler;
     1755                return 0;
     1756        default:
     1757                return -1;
     1758        }
     1759        return -1;
     1760}
     1761
    17211762
    17221763/*
  • tools/tracestats/tracestats_parallel.c

    rdfbdda7a r76291d1  
    8787
    8888
    89 static void* per_packet(libtrace_t *trace, libtrace_thread_t *t,
    90                         int mesg, libtrace_generic_t data,
     89static void* per_packet(libtrace_t *trace UNUSED, libtrace_thread_t *t UNUSED,
     90                        int mesg UNUSED, libtrace_generic_t data UNUSED,
    9191                        libtrace_thread_t *sender UNUSED)
    9292{
    93         /* Using first entry as total and those after for filter counts */
    94         static __thread statistics_t * results = NULL;
    95         int i, wlen;
    96         libtrace_stat_t *stats;
    97         libtrace_generic_t gen;
    98 
    99         switch (mesg) {
    100         case MESSAGE_PACKET:
    101                 /* Apply filters to every packet note the result */
    102                 wlen = trace_get_wire_length(data.pkt);
    103                 for(i=0;i<filter_count;++i) {
    104                         if (filters[i].filter == NULL)
    105                                 continue;
    106                         if(trace_apply_filter(filters[i].filter,data.pkt) > 0) {
    107                                 results[i+1].count++;
    108                                 results[i+1].bytes+=wlen;
    109                         }
    110                         if (trace_is_err(trace)) {
    111                                 trace_perror(trace, "trace_apply_filter");
    112                                 fprintf(stderr, "Removing filter from filterlist\n");
    113                                 /* This is a race, but will be atomic */
    114                                 filters[i].filter = NULL;
    115                         }
    116                 }
    117                 results[0].count++;
    118                 results[0].bytes +=wlen;
    119                 return data.pkt;
    120         case MESSAGE_STARTING:
    121                 /* Allocate space to hold a total count and one for each filter */
    122                 results = calloc(1, sizeof(statistics_t) * (filter_count + 1));
    123                 break;
    124         case MESSAGE_STOPPING:
    125                 /* We only output one result per thread with the key 0 when the
    126                  * trace is over. */
    127                 gen.ptr = results;
    128                 trace_publish_result(trace, t, 0, gen, RESULT_USER);
    129                 break;
    130         default:
    131                 break;
    132         }
    13393        return NULL;
    13494}
     
    184144}
    185145
     146
     147static void* fn_starting(libtrace_t *trace UNUSED, libtrace_thread_t *t,
     148                     libtrace_generic_t data UNUSED, void *global UNUSED, void*tls UNUSED) {
     149        /* Allocate space to hold a total count and one for each filter */
     150        statistics_t *results = calloc(1, sizeof(statistics_t) * (filter_count + 1));
     151        trace_set_tls(t, results);
     152        return NULL;
     153}
     154
     155
     156static void* fn_stopping(libtrace_t *trace, libtrace_thread_t *t UNUSED,
     157                     libtrace_generic_t data UNUSED, void *global UNUSED, void*tls) {
     158        statistics_t *results = tls;
     159        libtrace_generic_t gen;
     160        /* We only output one result per thread with the key 0 when the
     161         * trace is over. */
     162        gen.ptr = results;
     163        trace_publish_result(trace, t, 0, gen, RESULT_USER);
     164        return NULL;
     165}
     166
     167static void* fn_packet(libtrace_t *trace, libtrace_thread_t *t UNUSED,
     168                   libtrace_generic_t data, void *global UNUSED, void*tls) {
     169        statistics_t *results = tls;
     170        int i, wlen;
     171
     172        /* Apply filters to every packet note the result */
     173        wlen = trace_get_wire_length(data.pkt);
     174        for(i=0;i<filter_count;++i) {
     175                if (filters[i].filter == NULL)
     176                        continue;
     177                if(trace_apply_filter(filters[i].filter,data.pkt) > 0) {
     178                        results[i+1].count++;
     179                        results[i+1].bytes+=wlen;
     180                }
     181                if (trace_is_err(trace)) {
     182                        trace_perror(trace, "trace_apply_filter");
     183                        fprintf(stderr, "Removing filter from filterlist\n");
     184                        /* This is a race, but will be atomic */
     185                        filters[i].filter = NULL;
     186                }
     187        }
     188        results[0].count++;
     189        results[0].bytes +=wlen;
     190        return data.pkt;
     191}
     192
    186193/* Process a trace, counting packets that match filter(s) */
    187194static void run_trace(char *uri, char *config, char *config_file)
     
    212219                }
    213220        }
     221
     222        trace_set_handler(trace, MESSAGE_PACKET, fn_packet);
     223        trace_set_handler(trace, MESSAGE_STARTING, fn_starting);
     224        trace_set_handler(trace, MESSAGE_STOPPING, fn_stopping);
    214225
    215226        /* Start the trace as a parallel trace */
Note: See TracChangeset for help on using the changeset viewer.