Changeset d994324


Ignore:
Timestamp:
09/17/14 13:45:29 (6 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
9e429e8
Parents:
2498008
Message:

Remove anything to do with the combiner from set configuration and removes any options/storage related to these such as the next expected packet.

Instead this is done using trace_set_combiner now, and the for the built-in combiners.h header. This is a lot more flexible and allows the users to specify there own combiner, and any number of options for it.

Files:
16 edited

Legend:

Unmodified
Added
Removed
  • lib/combiner_ordered.c

    r2498008 rd994324  
    44#include <assert.h>
    55#include <stdlib.h>
     6
     7/* TODO hook up configuration option for sequentual packets again */
    68
    79static int init_combiner(libtrace_t *t, libtrace_combine_t *c) {
     
    2729}
    2830
    29 static void inline read_internal(libtrace_t *trace, libtrace_queue_t *queues, bool final){
     31inline static void read_internal(libtrace_t *trace, libtrace_queue_t *queues, const bool final){
    3032        int i;
    31         int flags = trace->reporter_flags; // Hint these aren't changing
    3233        int live_count = 0;
    3334        bool live[libtrace_get_perpkt_count(trace)]; // Set if a trace is alive
     
    5556
    5657        /* Now remove the smallest and loop - special case if all threads have joined we always flush what's left */
    57         while ((live_count == libtrace_get_perpkt_count(trace)) || (live_count &&
    58                         ((flags & REDUCE_SEQUENTIAL && min_key == trace->expected_key) ||
    59                         final))) {
     58        while ((live_count == libtrace_get_perpkt_count(trace)) || (live_count && final)) {
     59                // || (live_count && ((flags & REDUCE_SEQUENTIAL && min_key == trace->expected_key)))
    6060                /* Get the minimum queue and then do stuff */
    6161                libtrace_result_t r;
     
    6565
    6666                // We expect the key we read +1 now , todo put expected in our storage area
    67                 trace->expected_key = key[min_queue] + 1;
     67                //trace->expected_key = key[min_queue] + 1;
    6868
    6969                // Now update the one we just removed
     
    120120}
    121121
    122 /** Used below in trace_make_results_packets_safe*/
    123 static void do_copy_result_packet(void *data)
    124 {
    125         libtrace_result_t *res = (libtrace_result_t *)data;
    126         if (res->type == RESULT_PACKET) {
    127                 // Duplicate the packet in standard malloc'd memory and free the
    128                 // original, This is a 1:1 exchange so is ocache count remains unchanged.
    129                 libtrace_packet_t *oldpkt, *dup;
    130                 oldpkt = (libtrace_packet_t *) res->value;
    131                 dup = trace_copy_packet(oldpkt);
    132                 res->value = (void *)dup;
    133                 trace_destroy_packet(oldpkt);
    134         }
    135 }
    136122
    137123static void pause(libtrace_t *trace, libtrace_combine_t *c) {
     
    139125        int i;
    140126        for (i = 0; i < libtrace_get_perpkt_count(trace); i++) {
    141                 libtrace_deque_apply_function(&queues[i], &do_copy_result_packet);
     127                libtrace_deque_apply_function(&queues[i], (deque_data_fn) libtrace_make_result_safe);
    142128        }
    143129}
    144130
    145 const libtrace_combine_t combiner_ordered = {
     131DLLEXPORT const libtrace_combine_t combiner_ordered = {
    146132    init_combiner,      /* initialise */
    147133        destroy,                /* destroy */
     
    151137    pause,                      /* pause */
    152138    NULL,                       /* queues */
    153     0                           /* opts */
     139    {0}                         /* opts */
    154140};
  • lib/combiner_sorted.c

    r2498008 rd994324  
    3636}
    3737
    38 
    39 /** Used below in trace_make_results_packets_safe*/
    40 static void do_copy_result_packet(void *data)
    41 {
    42         libtrace_result_t *res = (libtrace_result_t *)data;
    43         if (res->type == RESULT_PACKET) {
    44                 // Duplicate the packet in standard malloc'd memory and free the
    45                 // original, This is a 1:1 exchange so is ocache count remains unchanged.
    46                 libtrace_packet_t *oldpkt, *dup;
    47                 oldpkt = (libtrace_packet_t *) res->value;
    48                 dup = trace_copy_packet(oldpkt);
    49                 res->value = (void *)dup;
    50                 trace_destroy_packet(oldpkt);
    51         }
    52 }
    53 
    5438static void pause(libtrace_t *trace, libtrace_combine_t *c) {
    5539        libtrace_vector_t *queues = c->queues;
    5640        int i;
    5741        for (i = 0; i < libtrace_get_perpkt_count(trace); ++i) {
    58                 libtrace_vector_apply_function(&queues[i], &do_copy_result_packet);
     42                libtrace_vector_apply_function(&queues[i], (vector_data_fn) libtrace_make_result_safe);
    5943        }
    6044}
     
    9276}
    9377
    94 const libtrace_combine_t combiner_sorted = {
     78DLLEXPORT const libtrace_combine_t combiner_sorted = {
    9579    init_combiner,      /* initialise */
    9680        destroy,                /* destroy */
     
    10084    pause,                      /* pause */
    10185    NULL,                       /* queues */
    102     0                           /* opts */
     86    {0}                         /* opts */
    10387};
  • lib/combiner_unordered.c

    r2498008 rd994324  
    5151}
    5252
    53 const libtrace_combine_t combiner_unordered = {
     53DLLEXPORT const libtrace_combine_t combiner_unordered = {
    5454    init_combiner,      /* initialise */
    5555        destroy,                /* destroy */
     
    5959    read,                       /* pause */
    6060    NULL,                       /* queues */
    61     0                           /* opts */
     61    {0}                         /* opts */
    6262};
  • lib/libtrace.h.in

    r2498008 rd994324  
    240240/** Opaque structure holding information about a bpf filter */
    241241typedef struct libtrace_filter_t libtrace_filter_t;
    242 
    243 /** Structure holding information about a result */
    244 typedef struct libtrace_result_t {
    245         uint64_t key;
    246         void * value;
    247         int type;
    248 } libtrace_result_t;
    249 #define RESULT_NORMAL 0
    250 #define RESULT_PACKET 1
    251 #define RESULT_TICK   2
    252242
    253243typedef struct libtrace_thread_t libtrace_thread_t;
     
    31793169/*@}*/
    31803170
    3181 union libtrace_64byte_things {
     3171/**
     3172 * A collection of types for convenience used in place of a
     3173 * simple void* to allow a any type of data to be stored.
     3174 *
     3175 * This is expected to be 8 bytes in length.
     3176 */
     3177typedef union {
     3178        /* Pointers */
    31823179        void *ptr;
     3180        libtrace_packet_t *pkt;
     3181
     3182        /* C99 Integer types */
     3183        /* NOTE: Standard doesn't require 64-bit
     3184     * but x32 and x64 gcc does */
    31833185        int64_t sint64;
    31843186        uint64_t uint64;
     3187
    31853188        uint32_t uint32s[2];
    31863189        int32_t sint32s[2];
    31873190        uint32_t uint32;
    31883191        int32_t sint32;
     3192
     3193        uint16_t uint16s[4];
     3194        int16_t sint16s[4];
     3195        uint16_t uint16;
     3196        int16_t sint16;
     3197
     3198        uint8_t uint8s[8];
     3199        int8_t sint8s[8];
     3200        uint8_t uint8;
     3201        int8_t sint8;
     3202
     3203        size_t size;
     3204
     3205        /* C basic types - we cannot be certian of the size */
    31893206        int sint;
    31903207        unsigned int uint;
    3191         char schars[8];
    3192         char uchars[8];
    3193 };
     3208
     3209        signed char schars[8];
     3210        unsigned char uchars[8];
     3211        signed char schar;
     3212        unsigned char uchar;
     3213
     3214        /* Real numbers */
     3215        float rfloat;
     3216        double rdouble;
     3217} libtrace_generic_types_t;
    31943218
    31953219typedef struct libtrace_message_t {
    31963220        int code;
    3197         union libtrace_64byte_things additional;
     3221        libtrace_generic_types_t additional;
    31983222        libtrace_thread_t *sender;
    31993223} libtrace_message_t;
     3224
     3225/** Structure holding information about a result */
     3226typedef struct libtrace_result_t {
     3227        uint64_t key;
     3228        libtrace_generic_types_t value;
     3229        int type;
     3230} libtrace_result_t;
     3231#define RESULT_NORMAL 0
     3232#define RESULT_PACKET 1
     3233#define RESULT_TICK   2
     3234
    32003235
    32013236typedef void* (*fn_per_pkt)(libtrace_t* trace, libtrace_packet_t *p, libtrace_message_t *m, libtrace_thread_t *thread);
     
    32113246DLLEXPORT void libtrace_result_set_key(libtrace_result_t * result, uint64_t key);
    32123247DLLEXPORT uint64_t libtrace_result_get_key(libtrace_result_t * result);
    3213 DLLEXPORT void libtrace_result_set_value(libtrace_result_t * result, void * value);
    3214 DLLEXPORT void* libtrace_result_get_value(libtrace_result_t * result);
    3215 DLLEXPORT void libtrace_result_set_key_value(libtrace_result_t * result, uint64_t key, void * value);
     3248DLLEXPORT void libtrace_result_set_value(libtrace_result_t * result, libtrace_generic_types_t value);
     3249DLLEXPORT libtrace_generic_types_t libtrace_result_get_value(libtrace_result_t * result);
     3250DLLEXPORT void libtrace_result_set_key_value(libtrace_result_t * result, uint64_t key, libtrace_generic_types_t value);
    32163251DLLEXPORT void trace_destroy_result(libtrace_result_t ** result);
    32173252
     
    32233258
    32243259
    3225 DLLEXPORT void trace_publish_result(libtrace_t *libtrace, libtrace_thread_t *t, uint64_t key, void * value, int type);
     3260DLLEXPORT void trace_publish_result(libtrace_t *libtrace, libtrace_thread_t *t, uint64_t key, libtrace_generic_types_t value, int type);
    32263261typedef struct libtrace_vector libtrace_vector_t;
    32273262
     
    32423277DLLEXPORT int retrive_first_packet(libtrace_t *libtrace, libtrace_packet_t **packet, struct timeval **tv);
    32433278
     3279DLLEXPORT void libtrace_make_packet_safe(libtrace_packet_t *pkt);
     3280DLLEXPORT void libtrace_make_result_safe(libtrace_result_t *res);
     3281
    32443282typedef enum {
    32453283        /**
     
    32553293       
    32563294        /**
    3257          * Libtrace should expect sequential keys from the output to count
    3258          * up starting numbered from 1, 2, 3 ...
    3259          * such as is the case with numbered packets.
    3260          *
    3261          * ALSO consider - TRACE_OPTIONS_ORDERED_RESULTS suitable for live formats
     3295         * Delays packets so they are played back in trace-time rather than as fast
     3296         * as possible.
    32623297         */
    3263          TRACE_OPTION_SEQUENTIAL,
    3264          
    3265          /**
    3266           * Libtrace ordered results, results in each queue are ordered by key
    3267           * however my not be sequential, a typically case is packet timestamps
    3268           * the reporter will receive packets in order - note threasholds
    3269           * will be used such that a empty queue wont break things
    3270           */
    3271          TRACE_OPTION_ORDERED,
    3272          
    3273          
    3274          /**
    3275           * When accepting ordered results if a threashold is meet before an
    3276           * older result is available from another queue drop that packet
    3277           */
    3278           TRACE_DROP_OUT_OF_ORDER,
    3279 
    3280           /**
    3281            * Delays packets so they are played back in trace-time rather than as fast
    3282            * as possible.
    3283            */
    3284           TRACE_OPTION_TRACETIME,
    3285 
    3286           /**
    3287            * Specifies the interval between tick packets in milliseconds, if 0
    3288            * or less this is ignored.
    3289            */
    3290           TRACE_OPTION_TICK_INTERVAL,
     3298        TRACE_OPTION_TRACETIME,
     3299
     3300        /**
     3301         * Specifies the interval between tick packets in milliseconds, if 0
     3302         * or less this is ignored.
     3303         */
     3304        TRACE_OPTION_TICK_INTERVAL,
    32913305        TRACE_OPTION_GET_CONFIG,
    32923306        TRACE_OPTION_SET_CONFIG
     
    35463560
    35473561        /**
    3548          * XXX - todo make a union of useful types
    3549          * Configuration options what this does is upto the combiner
    3550          * choosen.
     3562         * Configuration options, what this does is upto the combiner
     3563         * chosen.
    35513564         */
    3552         int configuration;
     3565        libtrace_generic_types_t configuration;
    35533566};
    35543567
     3568DLLEXPORT void trace_set_combiner(libtrace_t *trace, const libtrace_combine_t *combiner, libtrace_generic_types_t config);
    35553569
    35563570#define READ_EOF 0
  • lib/libtrace_int.h

    r2498008 rd994324  
    188188        THREAD_STATE_MAX
    189189};
    190 
    191 // Reduce expects sequential data
    192 #define REDUCE_SEQUENTIAL 0x1
    193 // Reduce is working on ordered data
    194 #define REDUCE_ORDERED 0x2
    195 // Reduce should sort the data
    196 #define REDUCE_SORT 0x4
    197 // Drop out of order valid with
    198 #define REDUCE_DROP_OOO 0x8
    199 // Reduce reads all queues with same key
    200 #define REDUCE_STEPPING 0x10
    201190
    202191/**
     
    311300        /** The actual freelist */
    312301        libtrace_ocache_t packet_freelist;
    313         /** The reporter flags */
    314         int reporter_flags;
    315         /** Used to track the next expected key */
    316         uint64_t expected_key;
    317302        /** User defined per_pkt function called when a pkt is ready */
    318303        fn_per_pkt per_pkt;
  • lib/trace.c

    r2498008 rd994324  
    265265        libtrace->state = STATE_NEW;
    266266        libtrace->perpkt_queue_full = false;
    267         libtrace->reporter_flags = 0;
    268267        libtrace->global_blob = NULL;
    269268        libtrace->per_pkt = NULL;
    270269        libtrace->reporter = NULL;
    271270        libtrace->hasher = NULL;
    272         libtrace->expected_key = 0;
    273271        libtrace_zero_ocache(&libtrace->packet_freelist);
    274272        libtrace_zero_thread(&libtrace->hasher_thread);
     
    386384        libtrace->state = STATE_NEW; // TODO MAYBE DEAD
    387385        libtrace->perpkt_queue_full = false;
    388         libtrace->reporter_flags = 0;
    389386        libtrace->global_blob = NULL;
    390387        libtrace->per_pkt = NULL;
    391388        libtrace->reporter = NULL;
    392389        libtrace->hasher = NULL;
    393         libtrace->expected_key = 0;
    394390        libtrace_zero_ocache(&libtrace->packet_freelist);
    395391        libtrace_zero_thread(&libtrace->hasher_thread);
     
    741737        dest->buf_control=TRACE_CTRL_PACKET;
    742738        dest->order = packet->order;
     739        dest->hash = packet->hash;
     740        dest->error = packet->error;
    743741        /* Reset the cache - better to recalculate than try to convert
    744742         * the values over to the new packet */
  • lib/trace_parallel.c

    r2498008 rd994324  
    339339}
    340340
    341 /** Used below in trace_make_results_packets_safe */
    342 static void do_copy_result_packet(void *data)
    343 {
    344         libtrace_result_t *res = (libtrace_result_t *)data;
     341/** Makes a packet safe, a packet may become invaild after a
     342 * pause (or stop/destroy) of a trace. This copies a packet
     343 * in such a way that it will be able to survive a pause.
     344 *
     345 * However this will not allow the packet to be used after
     346 * the format is destroyed. Or while the trace is still paused.
     347 */
     348DLLEXPORT void libtrace_make_packet_safe(libtrace_packet_t *pkt) {
     349        // Duplicate the packet in standard malloc'd memory and free the
     350        // original, This is a 1:1 exchange so is ocache count remains unchanged.
     351        if (pkt->buf_control != TRACE_CTRL_PACKET) {
     352                libtrace_packet_t *dup;
     353                dup = trace_copy_packet(pkt);
     354                /* Release the external buffer */
     355                trace_fin_packet(pkt);
     356                /* Copy the duplicated packet over the existing */
     357                memcpy(pkt, dup, sizeof(libtrace_packet_t));
     358        }
     359}
     360
     361/**
     362 * Makes a libtrace_result_t safe, used when pausing a trace.
     363 * This will call libtrace_make_packet_safe if the result is
     364 * a packet.
     365 */
     366DLLEXPORT void libtrace_make_result_safe(libtrace_result_t *res) {
    345367        if (res->type == RESULT_PACKET) {
    346                 // Duplicate the packet in standard malloc'd memory and free the
    347                 // original, This is a 1:1 exchange so is ocache count remains unchanged.
    348                 libtrace_packet_t *oldpkt, *dup;
    349                 oldpkt = (libtrace_packet_t *) res->value;
    350                 dup = trace_copy_packet(oldpkt);
    351                 res->value = (void *)dup;
    352                 trace_destroy_packet(oldpkt);
     368                libtrace_make_packet_safe(res->value.pkt);
    353369        }
    354370}
     
    19982014        return result->key;
    19992015}
    2000 DLLEXPORT void libtrace_result_set_value(libtrace_result_t * result, void * value) {
     2016DLLEXPORT void libtrace_result_set_value(libtrace_result_t * result, libtrace_generic_types_t value) {
    20012017        result->value = value;
    20022018}
    2003 DLLEXPORT void* libtrace_result_get_value(libtrace_result_t * result) {
     2019DLLEXPORT libtrace_generic_types_t libtrace_result_get_value(libtrace_result_t * result) {
    20042020        return result->value;
    20052021}
    2006 DLLEXPORT void libtrace_result_set_key_value(libtrace_result_t * result, uint64_t key, void * value) {
     2022DLLEXPORT void libtrace_result_set_key_value(libtrace_result_t * result, uint64_t key, libtrace_generic_types_t value) {
    20072023        result->key = key;
    20082024        result->value = value;
     
    20522068 * Should only be called by a perpkt thread, i.e. from a perpkt handler
    20532069 */
    2054 DLLEXPORT void trace_publish_result(libtrace_t *libtrace, libtrace_thread_t *t, uint64_t key, void * value, int type) {
     2070DLLEXPORT void trace_publish_result(libtrace_t *libtrace, libtrace_thread_t *t, uint64_t key, libtrace_generic_types_t value, int type) {
    20552071        libtrace_result_t res;
    20562072        res.type = type;
     
    20602076        libtrace->combiner.publish(libtrace, t->perpkt_num, &libtrace->combiner, &res);
    20612077        return;
     2078}
     2079
     2080/**
     2081 * Sets a combiner function against the trace.
     2082 */
     2083DLLEXPORT void trace_set_combiner(libtrace_t *trace, const libtrace_combine_t *combiner, libtrace_generic_types_t config){
     2084        if (combiner) {
     2085                trace->combiner = *combiner;
     2086                trace->combiner.configuration = config;
     2087        } else {
     2088                // No combiner, so don't try use it
     2089                memset(&trace->combiner, 0, sizeof(trace->combiner));
     2090        }
    20622091}
    20632092
     
    20942123                case TRACE_OPTION_SET_PERPKT_THREAD_COUNT:
    20952124                        libtrace->config.perpkt_threads = *((int *) value);
    2096                         return 1;
    2097                 case TRACE_DROP_OUT_OF_ORDER:
    2098                         if (*((int *) value))
    2099                                 libtrace->reporter_flags |= REDUCE_DROP_OOO;
    2100                         else
    2101                                 libtrace->reporter_flags &= ~REDUCE_DROP_OOO;
    2102                         return 1;
    2103                 case TRACE_OPTION_SEQUENTIAL:
    2104                         libtrace->combiner = combiner_ordered;
    2105                         if (*((int *) value))
    2106                                 libtrace->reporter_flags |= REDUCE_SEQUENTIAL;
    2107                         else
    2108                                 libtrace->reporter_flags &= ~REDUCE_SEQUENTIAL;
    2109                         return 1;
    2110                 case TRACE_OPTION_ORDERED:
    2111                         libtrace->combiner = combiner_ordered;
    2112                         if (*((int *) value))
    2113                                 libtrace->reporter_flags |= REDUCE_ORDERED;
    2114                         else
    2115                                 libtrace->reporter_flags &= ~REDUCE_ORDERED;
    21162125                        return 1;
    21172126                case TRACE_OPTION_TRACETIME:
  • test/test-format-parallel-hasher.c

    r2498008 rd994324  
    9191}
    9292
    93 
    9493struct TLS {
    9594        bool seen_start_message;
     
    106105        if (result) {
    107106                assert(libtrace_result_get_key(result) == 0);
    108                 printf("%d,", (int) libtrace_result_get_value(result));
     107                printf("%d,", libtrace_result_get_value(result).sint);
    109108                totalthreads++;
    110                 totalpkts += (int) libtrace_result_get_value(result);
    111                 assert(libtrace_result_get_value(result) == 25 ||
    112                         libtrace_result_get_value(result) == expected - 25);
     109                totalpkts += libtrace_result_get_value(result).sint;
     110                assert(libtrace_result_get_value(result).sint == 25 ||
     111                        libtrace_result_get_value(result).sint == expected - 25);
    113112        } else {
    114113                switch(mesg->code) {
     
    165164
    166165                        // All threads publish to verify the thread count
    167                         trace_publish_result(trace, t, (uint64_t) 0, (void *) tls->count, RESULT_NORMAL);
     166                        trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_types_t){.sint=tls->count}, RESULT_NORMAL);
    168167                        trace_post_reporter(trace);
    169168                        free(tls);
  • test/test-format-parallel-reporter.c

    rf051c1b rd994324  
    5151#include "libtrace.h"
    5252#include "data-struct/vector.h"
     53#include "combiners.h"
    5354
    5455void iferr(libtrace_t *trace,const char *msg)
     
    9394int globalcount = 0;
    9495
    95 static void* reporter(libtrace_t *libtrace, libtrace_result_t *res, libtrace_message_t *mesg) {
     96static void reporter(libtrace_t *libtrace, libtrace_result_t *res, libtrace_message_t *mesg) {
    9697        static uint64_t last = -1;
    9798        static int pktcount = 0;
    9899        if (res) {
    99                 libtrace_packet_t *packet =  (libtrace_packet_t *) libtrace_result_get_value(res);
     100                libtrace_packet_t *packet = libtrace_result_get_value(res).pkt;
    100101                assert(libtrace_result_get_key(res) == trace_packet_get_order(packet));
    101102                if(last == (uint64_t)-1) {
     
    116117                }
    117118        }
    118         return NULL;
    119119}
    120120
     
    131131                }
    132132                x = c;
    133                 trace_publish_result(trace, t, trace_packet_get_order(pkt), pkt, RESULT_PACKET);
     133                trace_publish_result(trace, t, trace_packet_get_order(pkt), (libtrace_generic_types_t){.pkt=pkt}, RESULT_PACKET);
    134134                return NULL;
    135135        }
     
    160160        if (strcmp(argv[1],"rtclient")==0) expected=101;
    161161
    162         int i = 1;
    163         trace_parallel_config(trace, TRACE_OPTION_SEQUENTIAL, &i);
     162        trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_types_t){0});
    164163
    165164        trace_pstart(trace, NULL, per_packet, reporter);
  • test/test-format-parallel-singlethreaded-hasher.c

    r2498008 rd994324  
    106106        if (result) {
    107107                assert(libtrace_result_get_key(result) == 0);
    108                 printf("%d,", (int) libtrace_result_get_value(result));
     108                printf("%d,", libtrace_result_get_value(result).sint);
    109109                totalthreads++;
    110                 totalpkts += (int) libtrace_result_get_value(result);
     110                totalpkts += libtrace_result_get_value(result).sint;
    111111        } else {
    112112                switch(mesg->code) {
     
    163163
    164164                        // All threads publish to verify the thread count
    165                         trace_publish_result(trace, t, (uint64_t) 0, (void *) tls->count, RESULT_NORMAL);
     165                        trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_types_t){.sint=tls->count}, RESULT_NORMAL);
    166166                        trace_post_reporter(trace);
    167167                        free(tls);
  • test/test-format-parallel-singlethreaded.c

    r2498008 rd994324  
    105105        if (result) {
    106106                assert(libtrace_result_get_key(result) == 0);
    107                 printf("%d,", (int) libtrace_result_get_value(result));
     107                printf("%d,", libtrace_result_get_value(result).sint);
    108108                totalthreads++;
    109                 totalpkts += (int) libtrace_result_get_value(result);
     109                totalpkts += libtrace_result_get_value(result).sint;
    110110        } else {
    111111                switch(mesg->code) {
     
    162162
    163163                        // All threads publish to verify the thread count
    164                         trace_publish_result(trace, t, (uint64_t) 0, (void *) tls->count, RESULT_NORMAL);
     164                        trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_types_t){.sint=tls->count}, RESULT_NORMAL);
    165165                        trace_post_reporter(trace);
    166166                        free(tls);
  • test/test-format-parallel-stressthreads.c

    r2498008 rd994324  
    105105        if (result) {
    106106                assert(libtrace_result_get_key(result) == 0);
    107                 printf("%d,", (int) libtrace_result_get_value(result));
     107                printf("%d,", libtrace_result_get_value(result).sint);
    108108                totalthreads++;
    109                 totalpkts += (int) libtrace_result_get_value(result);
     109                totalpkts += libtrace_result_get_value(result).sint;
    110110        } else {
    111111                switch(mesg->code) {
     
    162162
    163163                        // All threads publish to verify the thread count
    164                         trace_publish_result(trace, t, (uint64_t) 0, (void *) tls->count, RESULT_NORMAL);
     164                        trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_types_t){.sint=tls->count}, RESULT_NORMAL);
    165165                        trace_post_reporter(trace);
    166166                        free(tls);
  • test/test-format-parallel.c

    r2498008 rd994324  
    104104        if (result) {
    105105                assert(libtrace_result_get_key(result) == 0);
    106                 printf("%d,", (int) libtrace_result_get_value(result));
     106                printf("%d,", libtrace_result_get_value(result).sint);
    107107                totalthreads++;
    108                 totalpkts += (int) libtrace_result_get_value(result);
     108                totalpkts += libtrace_result_get_value(result).sint;
    109109        } else {
    110110                switch(mesg->code) {
     
    172172
    173173                        // All threads publish to verify the thread count
    174                         trace_publish_result(trace, t, (uint64_t) 0, (void *) count, RESULT_NORMAL);
     174                        trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_types_t){.sint = count}, RESULT_NORMAL);
    175175                        trace_post_reporter(trace);
    176176                        break;
  • tools/traceanon/traceanon_parallel.c

    r2498008 rd994324  
    11#define _GNU_SOURCE
    22#include "libtrace.h"
     3#include "data-struct/vector.h"
     4#include "data-struct/message_queue.h"
     5#include "combiners.h"
    36#include <stdio.h>
    47#include <unistd.h>
     
    1114#include <assert.h>
    1215#include "ipenc.h"
    13 #include <data-struct/vector.h>
    14 #include <data-struct/message_queue.h>
    1516#include <signal.h>
    1617
     
    155156static void* per_packet(libtrace_t *trace, libtrace_packet_t *pkt, libtrace_message_t *mesg, UNUSED libtrace_thread_t *t)
    156157{
    157        
    158158        if (pkt) {
    159159                struct libtrace_ip *ipptr;
     
    190190                //libtrace_packet_t * packet_copy = trace_result_packet(trace, pkt);
    191191                //trace_publish_result(trace, trace_packet_get_order(pkt), pkt);
    192                 trace_publish_result(trace, t, trace_packet_get_order(pkt), pkt, RESULT_PACKET);
     192
     193                trace_publish_result(trace, t, trace_packet_get_order(pkt), (libtrace_generic_types_t){.pkt=pkt}, RESULT_PACKET);
    193194                //return ;
    194195        }
     
    200201                        break;
    201202                        case MESSAGE_TICK:
    202                                 trace_publish_result(trace, t, mesg->additional.uint64, NULL, RESULT_TICK);
     203                                trace_publish_result(trace, t, mesg->additional.uint64, (libtrace_generic_types_t){.pkt=NULL}, RESULT_TICK);
    203204                }
    204205        }
     
    213214        if (result) {
    214215                if (result->type == RESULT_PACKET) {
    215                         libtrace_packet_t *packet = (libtrace_packet_t*) libtrace_result_get_value(result);
     216                        libtrace_packet_t *packet = (libtrace_packet_t*) libtrace_result_get_value(result).pkt;
    216217                        assert(libtrace_result_get_key(result) == packet_count++);
    217218                        if (trace_write_packet(writer,packet)==-1) {
     
    422423         
    423424        int i = 1;
    424         trace_parallel_config(trace, TRACE_OPTION_ORDERED, &i);
     425        trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_types_t){0});
    425426        trace_parallel_config(trace, TRACE_OPTION_SET_CONFIG, &uc);
     427
    426428        //trace_set_hasher(trace, HASHER_CUSTOM, rand_hash, NULL);
    427429       
  • tools/tracertstats/tracertstats_parallel.c

    r2498008 rd994324  
    5959#include "data-struct/vector.h"
    6060#include "data-struct/message_queue.h"
     61#include "combiners.h"
    6162
    6263#ifndef UINT32_MAX
     
    144145                result_t *res;
    145146                ts = libtrace_result_get_key(result);
    146                 res = libtrace_result_get_value(result);
     147                res = libtrace_result_get_value(result).ptr;
    147148                if (last_ts == 0)
    148149                        last_ts = ts;
     
    177178        static __thread uint64_t last_ts = 0, ts = 0;
    178179        static __thread result_t * results = NULL;
    179        
    180180        // Unsure when we would hit this case but the old code had it, I
    181181        // guess we should keep it
     
    189189                        // Publish and make a new one new
    190190                        //fprintf(stderr, "Publishing result %"PRIu64"\n", last_ts);
    191                         trace_publish_result(trace, t, (uint64_t) last_ts, results, RESULT_NORMAL);
     191                        trace_publish_result(trace, t, (uint64_t) last_ts, (libtrace_generic_types_t){.ptr = results}, RESULT_NORMAL);
    192192                        trace_post_reporter(trace);
    193193                        results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
     
    220220                                // Should we always post this?
    221221                                if (results->total.count) {
    222                                         trace_publish_result(trace, t, (uint64_t) last_ts, results, RESULT_NORMAL);
     222                                        trace_publish_result(trace, t, (uint64_t) last_ts, (libtrace_generic_types_t){.ptr = results}, RESULT_NORMAL);
    223223                                        trace_post_reporter(trace);
    224224                                        results = NULL;
     
    240240                                        if (next_update_time <= mesg->additional.uint64) {
    241241                                                //fprintf(stderr, "Got a tick and publishing early!!\n");
    242                                                 trace_publish_result(trace, t, (uint64_t) last_ts, results, RESULT_NORMAL);
     242                                                trace_publish_result(trace, t, (uint64_t) last_ts, (libtrace_generic_types_t){.ptr = NULL}, RESULT_NORMAL);
    243243                                                trace_post_reporter(trace);
    244244                                                results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
     
    288288        }*/
    289289        int i = 1;
    290         trace_parallel_config(trace, TRACE_OPTION_ORDERED, &i);
     290        trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_types_t){0});
    291291        /* trace_parallel_config(trace, TRACE_OPTION_TRACETIME, &i); */
    292292        //trace_set_hasher(trace, HASHER_CUSTOM, &bad_hash, NULL);
  • tools/tracestats/tracestats_parallel.c

    r2498008 rd994324  
    5757#include "data-struct/vector.h"
    5858#include "data-struct/message_queue.h"
     59#include "combiners.h"
    5960#include <pthread.h>
    6061
     
    130131                switch (mesg->code) {
    131132                        case MESSAGE_STOPPING:
    132                                 trace_publish_result(trace, t, 0, results, RESULT_NORMAL); // Only ever using a single key 0
     133                                trace_publish_result(trace, t, 0, (libtrace_generic_types_t){.ptr = results}, RESULT_NORMAL); // Only ever using a single key 0
    133134                                fprintf(stderr, "Thread published resuslts WOWW\n");
    134135                                break;
     
    158159                /* Get the results from each core and sum 'em up */
    159160                assert(libtrace_result_get_key(result) == 0);
    160                 statistics_t * res = libtrace_result_get_value(result);
     161                statistics_t * res = libtrace_result_get_value(result).ptr;
    161162                count += res[0].count;
    162163                bytes += res[0].bytes;
     
    223224        //option = 10000;
    224225    //trace_set_hasher(trace, HASHER_CUSTOM, &rand_hash, NULL);
    225         option = 2;
    226226        //trace_parallel_config(trace, TRACE_OPTION_SET_PERPKT_THREAD_COUNT, &option);
    227227        trace_parallel_config(trace, TRACE_OPTION_SET_CONFIG, &uc);
    228         trace_parallel_config(trace, TRACE_OPTION_ORDERED, &uc);
     228        trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_types_t){0});
    229229
    230230        //trace_parallel_config(trace, TRACE_OPTION_SET_MAPPER_BUFFER_SIZE, &option);
Note: See TracChangeset for help on using the changeset viewer.