Changeset 2498008 for lib


Ignore:
Timestamp:
09/16/14 02:35:10 (7 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
d994324
Parents:
50b1bee
Message:

Refactor the combining step to allow user defined functions here.

Remove the old trace_get_results, now instead simply provide a reporter function which gets called as soon as results are ready.
The combiner function used determines the order of these results and when they are released etc.
The combiner function can be selected from those built-in or a custom version can be defined results are provided when ready.
Quickly hacked the parallel tests to work with this update, these are still a bit messy.

Also some fixes some compile warnings.

Location:
lib
Files:
4 added
8 edited

Legend:

Unmodified
Added
Removed
  • lib/Makefile.am

    ra49a9eb r2498008  
    5959                data-struct/ring_buffer.c data-struct/vector.c data-struct/message_queue.c \
    6060                data-struct/deque.c data-struct/sliding_window.c data-struct/object_cache.c \
    61                 hash_toeplitz.c
     61                hash_toeplitz.c combiner_ordered.c combiner_sorted.c combiner_unordered.c
    6262
    6363if DAG2_4
  • lib/data-struct/vector.c

    ra49a9eb r2498008  
    5959        }
    6060        v->size--;
    61         // Of coarse this is mega slow
     61        // Of course this is mega slow
    6262        for (i = 0; i < v->size * v->element_size; i++)
    6363                v->elements[i] = v->elements[i+v->element_size];
     
    132132        ASSERT_RET(pthread_mutex_unlock(&v->lock), == 0);
    133133}
     134
     135DLLEXPORT void libtrace_vector_qsort(libtrace_vector_t *v, int (*compar)(const void *, const void*)) {
     136        ASSERT_RET(pthread_mutex_lock(&v->lock), == 0);
     137        qsort(v->elements, v->element_size, v->element_size, compar);
     138        ASSERT_RET(pthread_mutex_unlock(&v->lock), == 0);
     139}
  • lib/data-struct/vector.h

    r8c42377 r2498008  
    3030DLLEXPORT void libtrace_vector_apply_function(libtrace_vector_t *v, vector_data_fn fn);
    3131
     32// Sort the vector using qsort
     33DLLEXPORT void libtrace_vector_qsort(libtrace_vector_t *v, int (*compar)(const void *, const void*));
    3234#endif
  • lib/format_linux.c

    rf9a70ca r2498008  
    17501750                || FORMAT(trace->format_data)->stats_valid == 0) {
    17511751                if (FORMAT(trace->format_data)->per_thread) {
    1752                         size_t i;
     1752                        int i;
    17531753                        FORMAT(trace->format_data)->stats.tp_drops = 0;
    17541754                        FORMAT(trace->format_data)->stats.tp_packets = 0;
     
    17981798                || (FORMAT(trace->format_data)->stats_valid==0)) {
    17991799                if (FORMAT(trace->format_data)->per_thread) {
    1800                         size_t i;
     1800                        int i;
    18011801                        FORMAT(trace->format_data)->stats.tp_drops = 0;
    18021802                        FORMAT(trace->format_data)->stats.tp_packets = 0;
  • lib/libtrace.h.in

    r5b4d121 r2498008  
    126126#else
    127127#define ASSERT_RET(run, cond) assert(run cond)
     128//#define ASSERT_RET(run, cond) run
    128129#endif
    129130   
     
    31993200
    32003201typedef void* (*fn_per_pkt)(libtrace_t* trace, libtrace_packet_t *p, libtrace_message_t *m, libtrace_thread_t *thread);
    3201 typedef void* (*fn_reporter)(libtrace_t* trace, libtrace_result_t *r, libtrace_message_t *m);
     3202typedef void (*fn_reporter)(libtrace_t* trace, libtrace_result_t *r, libtrace_message_t *m);
    32023203typedef uint64_t (*fn_hasher)(const libtrace_packet_t* packet, void *data);
    32033204
     
    32243225DLLEXPORT void trace_publish_result(libtrace_t *libtrace, libtrace_thread_t *t, uint64_t key, void * value, int type);
    32253226typedef struct libtrace_vector libtrace_vector_t;
    3226 DLLEXPORT int trace_get_results(libtrace_t *libtrace, libtrace_vector_t * results);
    32273227
    32283228DLLEXPORT int trace_post_reporter(libtrace_t *libtrace);
     
    34693469DLLEXPORT void parse_user_config(struct user_configuration* uc, char * str);
    34703470DLLEXPORT void parse_user_config_file(struct user_configuration* uc, FILE *file);
     3471DLLEXPORT int libtrace_get_perpkt_count(libtrace_t* t);
     3472
     3473/**
     3474 * The methods we use to combine multiple outputs into a single output
     3475 * This is not considered a stable API however is public.
     3476 * Where possible use built in combiners
     3477 *
     3478 * NOTE this structure is duplicated per trace and as such can
     3479 * have functions rewritten, and in fact should if possible.
     3480 */
     3481typedef struct libtrace_combine libtrace_combine_t;
     3482struct libtrace_combine {
     3483
     3484        /**
     3485         * Called at the start of the trace to allow datastructures
     3486         * to be initilised and allow functions to be swapped if approriate.
     3487         *
     3488         * Also factors such as whether the trace is live or not can
     3489         * be used to determine the functions used.
     3490         * @return 0 if successful, -1 if an error occurs
     3491         */
     3492        int (*initialise)(libtrace_t *,libtrace_combine_t *);
     3493
     3494        /**
     3495         * Called when the trace ends, clean up any memory here
     3496         * from libtrace_t * init.
     3497         */
     3498        void (*destroy)(libtrace_t *, libtrace_combine_t *);
     3499
     3500        /**
     3501         * Publish a result against it's a threads queue.
     3502         * If null publish directly, expected to be used
     3503         * as a single threaded optimisation and can be
     3504         * set to NULL by init if this case is detected.
     3505         */
     3506        void (*publish)(libtrace_t *, int thread_id, libtrace_combine_t *, libtrace_result_t *);
     3507
     3508        /**
     3509         * Read as many results as possible from the trace.
     3510         * Directy calls the users code to handle results from here.
     3511         *
     3512         * THIS SHOULD BE NON-BLOCKING AND READ AS MANY AS POSSIBLE
     3513         * If publish is NULL, this probably should be NULL also otherwise
     3514         * it will not be called.
     3515         */
     3516        void (*read)(libtrace_t *, libtrace_combine_t *);
     3517
     3518        /**
     3519         * Called when the trace is finished to flush the final
     3520         * results to the reporter thread.
     3521         *
     3522         * There may be no results, in which case this should
     3523         * just return.
     3524         *
     3525         * Libtrace state:
     3526         * Called from reporter thread
     3527         * No perpkt threads will be running, i.e. publish will not be
     3528         * called again.
     3529         *
     3530         * If publish is NULL, this probably should be NULL also otherwise
     3531         * it will not be called.
     3532         */
     3533        void (*read_final)(libtrace_t *, libtrace_combine_t *);
     3534
     3535        /**
     3536         * Pause must make sure any results of the type packet are safe.
     3537         * That means trace_copy_packet() and destroy the original.
     3538         * This also should be NULL if publish is NULL.
     3539         */
     3540        void (*pause)(libtrace_t *, libtrace_combine_t *);
     3541
     3542        /**
     3543         * Data storage for all the combiner threads
     3544         */
     3545        void *queues;
     3546
     3547        /**
     3548         * XXX - todo make a union of useful types
     3549         * Configuration options what this does is upto the combiner
     3550         * choosen.
     3551         */
     3552        int configuration;
     3553};
     3554
    34713555
    34723556#define READ_EOF 0
  • lib/libtrace_int.h

    rf051c1b r2498008  
    214214        libtrace_message_queue_t messages; // Message handling
    215215        libtrace_ringbuffer_t rbuffer; // Input
    216         libtrace_vector_t vector; // Output
    217         libtrace_queue_t deque; // Real Output type makes more sense
    218216        libtrace_t * trace;
    219217        void* ret;
     
    345343        uint64_t received_packets;
    346344        struct user_configuration config;
     345        libtrace_combine_t combiner;
    347346};
    348347
  • lib/trace.c

    r5b4d121 r2498008  
    665665                // This has all of our packets
    666666                libtrace_ocache_destroy(&libtrace->packet_freelist);
    667                
    668                 for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
    669                         assert (libtrace_vector_get_size(&libtrace->perpkt_threads[i].vector) == 0);
    670                         libtrace_vector_destroy(&libtrace->perpkt_threads[i].vector);
    671                 }
     667                if (libtrace->combiner.destroy)
     668                        libtrace->combiner.destroy(libtrace, &libtrace->combiner);
    672669                free(libtrace->perpkt_threads);
    673670                libtrace->perpkt_threads = NULL;
  • lib/trace_parallel.c

    rf0e8bd6 r2498008  
    9595#include "rt_protocol.h"
    9696#include "hash_toeplitz.h"
     97#include "combiners.h"
    9798
    9899#include <pthread.h>
     
    185186        assert(libtrace->state != STATE_NEW);
    186187        return libtrace->reporter_thread.type == THREAD_REPORTER && libtrace->reporter;
     188}
     189
     190/**
     191 * When running the number of perpkt threads in use.
     192 * TODO what if the trace is not running yet, or has finished??
     193 *
     194 * @brief libtrace_perpkt_thread_nb
     195 * @param t The trace
     196 * @return
     197 */
     198DLLEXPORT int libtrace_get_perpkt_count(libtrace_t * t) {
     199        return t->perpkt_thread_count;
    187200}
    188201
     
    283296        t->type = THREAD_EMPTY;
    284297        libtrace_zero_ringbuffer(&t->rbuffer);
    285         libtrace_zero_vector(&t->vector);
    286         libtrace_zero_deque(&t->deque);
    287298        t->recorded_first = false;
    288299        t->perpkt_num = -1;
     
    328339}
    329340
    330 /** Used below in trace_make_results_packets_safe*/
     341/** Used below in trace_make_results_packets_safe */
    331342static void do_copy_result_packet(void *data)
    332343{
     
    344355
    345356/**
    346  * Make a safe replacement copy of any result packets that are owned
    347  * by the format in the result queue. Used when pausing traces.
    348  */
    349 static void trace_make_results_packets_safe(libtrace_t *trace) {
    350         libtrace_thread_t *t = get_thread_descriptor(trace);
    351         if (trace->reporter_flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED))
    352                 libtrace_deque_apply_function(&t->deque, &do_copy_result_packet);
    353         else
    354                 libtrace_vector_apply_function(&t->vector, &do_copy_result_packet);
    355 }
    356 
    357 /**
    358357 * Holds threads in a paused state, until released by broadcasting
    359358 * the condition mutex.
    360359 */
    361360static void trace_thread_pause(libtrace_t *trace, libtrace_thread_t *t) {
    362         if (t->type == THREAD_PERPKT)
    363                 trace_make_results_packets_safe(trace);
    364361        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
    365362        thread_change_state(trace, t, THREAD_PAUSED, false);
     
    11741171        libtrace_t *trace = (libtrace_t *)data;
    11751172        libtrace_thread_t *t = &trace->reporter_thread;
    1176         size_t res_size;
    11771173        libtrace_vector_t results;
    11781174        libtrace_vector_init(&results, sizeof(libtrace_result_t));
    11791175        fprintf(stderr, "Reporter thread starting\n");
    1180         libtrace_result_t result;
    1181         size_t i;
    11821176
    11831177        message.code = MESSAGE_STARTING;
     
    11971191                        // Check for results
    11981192                        case MESSAGE_POST_REPORTER:
    1199                                 res_size = trace_get_results(trace, &results);
    1200                                 for (i = 0; i < res_size; i++) {
    1201                                         ASSERT_RET(libtrace_vector_get(&results, i, (void *) &result), == 1);
    1202                                         (*trace->reporter)(trace, &result, NULL);
    1203                                 }
     1193                                trace->combiner.read(trace, &trace->combiner);
    12041194                                break;
    12051195                        case MESSAGE_DO_PAUSE:
     1196                                assert(trace->combiner.pause);
     1197                                trace->combiner.pause(trace, &trace->combiner);
    12061198                                message.code = MESSAGE_PAUSING;
    12071199                                message.sender = t;
     
    12171209
    12181210        // Flush out whats left now all our threads have finished
    1219         res_size = trace_get_results(trace, &results);
    1220         for (i = 0; i < res_size; i++) {
    1221                 ASSERT_RET(libtrace_vector_get(&results, i, (void *) &result), == 1);
    1222                 (*trace->reporter)(trace, &result, NULL);
    1223         }
    1224         libtrace_vector_destroy(&results);
     1211        trace->combiner.read_final(trace, &trace->combiner);
    12251212
    12261213        // GOODBYE
     
    15911578                        libtrace_ringbuffer_init(&t->rbuffer, libtrace->config.hasher_queue_size,
    15921579                                                 libtrace->config.hasher_polling?LIBTRACE_RINGBUFFER_POLLING:0);
    1593                 // Depending on the mode vector or deque might be chosen
    1594                 libtrace_vector_init(&t->vector, sizeof(libtrace_result_t));
    1595                 libtrace_deque_init(&t->deque, sizeof(libtrace_result_t));
    15961580                libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
    15971581                t->recorded_first = false;
     
    16111595        if (threads_started == 0)
    16121596                threads_started = trace_start_perpkt_threads(libtrace);
     1597
     1598        // No combiner set, use a default to reduce the chance of this breaking
     1599        if (libtrace->combiner.initialise == NULL && libtrace->combiner.publish == NULL)
     1600                libtrace->combiner = combiner_unordered;
     1601
     1602        if (libtrace->combiner.initialise)
     1603                libtrace->combiner.initialise(libtrace, &libtrace->combiner);
    16131604
    16141605        libtrace->reporter_thread.type = THREAD_REPORTER;
     
    20582049
    20592050/**
    2060  * Publish to the reduce queue, return
     2051 * Publishes a result to the reduce queue
    20612052 * Should only be called by a perpkt thread, i.e. from a perpkt handler
    20622053 */
    20632054DLLEXPORT void trace_publish_result(libtrace_t *libtrace, libtrace_thread_t *t, uint64_t key, void * value, int type) {
    20642055        libtrace_result_t res;
    2065         UNUSED static __thread int count = 0;
    20662056        res.type = type;
    2067 
    2068         libtrace_result_set_key_value(&res, key, value);
    2069 
    2070         /*
    2071         if (count == 1)
    2072                 printf("My vector size is %d\n", libtrace_vector_get_size(&t->vector));
    2073         count = (count+1) %1000;
    2074         libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :)
    2075         */
    2076         /*if (count == 1)
    2077                 printf("My vector size is %d\n", libtrace_deque_get_size(&t->deque));
    2078         count = (count+1)%1000;*/
    2079 
    2080         if (libtrace->reporter_flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED)) {
    2081                 if (libtrace_deque_get_size(&t->deque) >= libtrace->config.reporter_thold) {
    2082                         trace_post_reporter(libtrace);
    2083                 }
    2084                 //while (libtrace_deque_get_size(&t->deque) >= 1000)
    2085                 //      sched_yield();
    2086                 libtrace_deque_push_back(&t->deque, &res); // Automatically locking for us :)
    2087         } else {
    2088                 //while (libtrace_vector_get_size(&t->vector) >= 1000)
    2089                 //      sched_yield();
    2090 
    2091                 if (libtrace_vector_get_size(&t->vector) >= libtrace->config.reporter_thold) {
    2092                         trace_post_reporter(libtrace);
    2093                 }
    2094                 libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :)
    2095         }
    2096 }
    2097 
    2098 static int compareres(const void* p1, const void* p2)
    2099 {
    2100         if (libtrace_result_get_key((libtrace_result_t *) p1) < libtrace_result_get_key((libtrace_result_t *) p2))
    2101                 return -1;
    2102         if (libtrace_result_get_key((libtrace_result_t *) p1) == libtrace_result_get_key((libtrace_result_t *) p2))
    2103                 return 0;
    2104         else
    2105                 return 1;
    2106 }
    2107 
    2108 DLLEXPORT int trace_get_results(libtrace_t *libtrace, libtrace_vector_t * results) {
    2109         int i;
    2110         int flags = libtrace->reporter_flags; // Hint these aren't a changing
    2111 
    2112         libtrace_vector_empty(results);
    2113 
    2114         /* Here we assume queues are in order ascending order and they want
    2115          * the smallest result first. If they are not in order the results
    2116          * may not be in order.
    2117          */
    2118         if (flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED)) {
    2119                 int live_count = 0;
    2120                 bool live[libtrace->perpkt_thread_count]; // Set if a trace is alive
    2121                 uint64_t key[libtrace->perpkt_thread_count]; // Cached keys
    2122                 uint64_t min_key = UINT64_MAX; // XXX use max int here stdlimit.h?
    2123                 int min_queue = -1;
    2124 
    2125                 /* Loop through check all are alive (have data) and find the smallest */
    2126                 for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
    2127                         libtrace_queue_t *v = &libtrace->perpkt_threads[i].deque;
    2128                         if (libtrace_deque_get_size(v) != 0) {
    2129                                 libtrace_result_t r;
    2130                                 libtrace_deque_peek_front(v, (void *) &r);
    2131                                 live_count++;
    2132                                 live[i] = 1;
    2133                                 key[i] = libtrace_result_get_key(&r);
    2134                                 if (i==0 || min_key > key[i]) {
    2135                                         min_key = key[i];
    2136                                         min_queue = i;
    2137                                 }
    2138                         } else {
    2139                                 live[i] = 0;
    2140                         }
    2141                 }
    2142 
    2143                 /* Now remove the smallest and loop - special case if all threads have joined we always flush whats left */
    2144                 while ((live_count == libtrace->perpkt_thread_count) || (live_count &&
    2145                                 ((flags & REDUCE_SEQUENTIAL && min_key == libtrace->expected_key) ||
    2146                                 trace_finished(libtrace)))) {
    2147                         /* Get the minimum queue and then do stuff */
    2148                         libtrace_result_t r;
    2149 
    2150                         assert (libtrace_deque_pop_front(&libtrace->perpkt_threads[min_queue].deque, (void *) &r) == 1);
    2151                         libtrace_vector_push_back(results, &r);
    2152 
    2153                         // We expect the key we read +1 now
    2154                         libtrace->expected_key = key[min_queue] + 1;
    2155 
    2156                         // Now update the one we just removed
    2157                         if (libtrace_deque_get_size(&libtrace->perpkt_threads[min_queue].deque) )
    2158                         {
    2159                                 libtrace_deque_peek_front(&libtrace->perpkt_threads[min_queue].deque, (void *) &r);
    2160                                 key[min_queue] = libtrace_result_get_key(&r);
    2161                                 if (key[min_queue] <= min_key) {
    2162                                         // We are still the smallest, might be out of order though :(
    2163                                         min_key = key[min_queue];
    2164                                 } else {
    2165                                         min_key = key[min_queue]; // Update our minimum
    2166                                         // Check all find the smallest again - all are alive
    2167                                         for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
    2168                                                 if (live[i] && min_key > key[i]) {
    2169                                                         min_key = key[i];
    2170                                                         min_queue = i;
    2171                                                 }
    2172                                         }
    2173                                 }
    2174                         } else {
    2175                                 live[min_queue] = 0;
    2176                                 live_count--;
    2177                                 min_key = UINT64_MAX; // Update our minimum
    2178                                 // Check all find the smallest again - all are alive
    2179                                 for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
    2180                                         // Still not 100% TODO (what if order is wrong or not increasing)
    2181                                         if (live[i] && min_key >= key[i]) {
    2182                                                 min_key = key[i];
    2183                                                 min_queue = i;
    2184                                         }
    2185                                 }
    2186                         }
    2187                 }
    2188         } else { // Queues are not in order - return all results in the queue
    2189                 for (i = 0; i < libtrace->perpkt_thread_count; i++) {
    2190                         libtrace_vector_append(results, &libtrace->perpkt_threads[i].vector);
    2191                 }
    2192                 if (flags & REDUCE_SORT) {
    2193                         qsort(results->elements, results->size, results->element_size, &compareres);
    2194                 }
    2195         }
    2196         return libtrace_vector_get_size(results);
     2057        res.key = key;
     2058        res.value = value;
     2059        assert(libtrace->combiner.publish);
     2060        libtrace->combiner.publish(libtrace, t->perpkt_num, &libtrace->combiner, &res);
     2061        return;
    21972062}
    21982063
     
    22372102                        return 1;
    22382103                case TRACE_OPTION_SEQUENTIAL:
     2104                        libtrace->combiner = combiner_ordered;
    22392105                        if (*((int *) value))
    22402106                                libtrace->reporter_flags |= REDUCE_SEQUENTIAL;
     
    22432109                        return 1;
    22442110                case TRACE_OPTION_ORDERED:
     2111                        libtrace->combiner = combiner_ordered;
    22452112                        if (*((int *) value))
    22462113                                libtrace->reporter_flags |= REDUCE_ORDERED;
Note: See TracChangeset for help on using the changeset viewer.