Changeset 2498008


Ignore:
Timestamp:
09/16/14 02:35:10 (6 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
d994324
Parents:
50b1bee
Message:

Refactor the combining step to allow user defined functions here.

Remove the old trace_get_results, now instead simply provide a reporter function which gets called as soon as results are ready.
The combiner function used determines the order of these results and when they are released etc.
The combiner function can be selected from those built-in or a custom version can be defined results are provided when ready.
Quickly hacked the parallel tests to work with this update, these are still a bit messy.

Also some fixes some compile warnings.

Files:
4 added
16 edited

Legend:

Unmodified
Added
Removed
  • lib/Makefile.am

    ra49a9eb r2498008  
    5959                data-struct/ring_buffer.c data-struct/vector.c data-struct/message_queue.c \
    6060                data-struct/deque.c data-struct/sliding_window.c data-struct/object_cache.c \
    61                 hash_toeplitz.c
     61                hash_toeplitz.c combiner_ordered.c combiner_sorted.c combiner_unordered.c
    6262
    6363if DAG2_4
  • lib/data-struct/vector.c

    ra49a9eb r2498008  
    5959        }
    6060        v->size--;
    61         // Of coarse this is mega slow
     61        // Of course this is mega slow
    6262        for (i = 0; i < v->size * v->element_size; i++)
    6363                v->elements[i] = v->elements[i+v->element_size];
     
    132132        ASSERT_RET(pthread_mutex_unlock(&v->lock), == 0);
    133133}
     134
     135DLLEXPORT void libtrace_vector_qsort(libtrace_vector_t *v, int (*compar)(const void *, const void*)) {
     136        ASSERT_RET(pthread_mutex_lock(&v->lock), == 0);
     137        qsort(v->elements, v->element_size, v->element_size, compar);
     138        ASSERT_RET(pthread_mutex_unlock(&v->lock), == 0);
     139}
  • lib/data-struct/vector.h

    r8c42377 r2498008  
    3030DLLEXPORT void libtrace_vector_apply_function(libtrace_vector_t *v, vector_data_fn fn);
    3131
     32// Sort the vector using qsort
     33DLLEXPORT void libtrace_vector_qsort(libtrace_vector_t *v, int (*compar)(const void *, const void*));
    3234#endif
  • lib/format_linux.c

    rf9a70ca r2498008  
    17501750                || FORMAT(trace->format_data)->stats_valid == 0) {
    17511751                if (FORMAT(trace->format_data)->per_thread) {
    1752                         size_t i;
     1752                        int i;
    17531753                        FORMAT(trace->format_data)->stats.tp_drops = 0;
    17541754                        FORMAT(trace->format_data)->stats.tp_packets = 0;
     
    17981798                || (FORMAT(trace->format_data)->stats_valid==0)) {
    17991799                if (FORMAT(trace->format_data)->per_thread) {
    1800                         size_t i;
     1800                        int i;
    18011801                        FORMAT(trace->format_data)->stats.tp_drops = 0;
    18021802                        FORMAT(trace->format_data)->stats.tp_packets = 0;
  • lib/libtrace.h.in

    r5b4d121 r2498008  
    126126#else
    127127#define ASSERT_RET(run, cond) assert(run cond)
     128//#define ASSERT_RET(run, cond) run
    128129#endif
    129130   
     
    31993200
    32003201typedef void* (*fn_per_pkt)(libtrace_t* trace, libtrace_packet_t *p, libtrace_message_t *m, libtrace_thread_t *thread);
    3201 typedef void* (*fn_reporter)(libtrace_t* trace, libtrace_result_t *r, libtrace_message_t *m);
     3202typedef void (*fn_reporter)(libtrace_t* trace, libtrace_result_t *r, libtrace_message_t *m);
    32023203typedef uint64_t (*fn_hasher)(const libtrace_packet_t* packet, void *data);
    32033204
     
    32243225DLLEXPORT void trace_publish_result(libtrace_t *libtrace, libtrace_thread_t *t, uint64_t key, void * value, int type);
    32253226typedef struct libtrace_vector libtrace_vector_t;
    3226 DLLEXPORT int trace_get_results(libtrace_t *libtrace, libtrace_vector_t * results);
    32273227
    32283228DLLEXPORT int trace_post_reporter(libtrace_t *libtrace);
     
    34693469DLLEXPORT void parse_user_config(struct user_configuration* uc, char * str);
    34703470DLLEXPORT void parse_user_config_file(struct user_configuration* uc, FILE *file);
     3471DLLEXPORT int libtrace_get_perpkt_count(libtrace_t* t);
     3472
     3473/**
     3474 * The methods we use to combine multiple outputs into a single output
     3475 * This is not considered a stable API however is public.
     3476 * Where possible use built in combiners
     3477 *
     3478 * NOTE this structure is duplicated per trace and as such can
     3479 * have functions rewritten, and in fact should if possible.
     3480 */
     3481typedef struct libtrace_combine libtrace_combine_t;
     3482struct libtrace_combine {
     3483
     3484        /**
     3485         * Called at the start of the trace to allow datastructures
     3486         * to be initilised and allow functions to be swapped if approriate.
     3487         *
     3488         * Also factors such as whether the trace is live or not can
     3489         * be used to determine the functions used.
     3490         * @return 0 if successful, -1 if an error occurs
     3491         */
     3492        int (*initialise)(libtrace_t *,libtrace_combine_t *);
     3493
     3494        /**
     3495         * Called when the trace ends, clean up any memory here
     3496         * from libtrace_t * init.
     3497         */
     3498        void (*destroy)(libtrace_t *, libtrace_combine_t *);
     3499
     3500        /**
     3501         * Publish a result against it's a threads queue.
     3502         * If null publish directly, expected to be used
     3503         * as a single threaded optimisation and can be
     3504         * set to NULL by init if this case is detected.
     3505         */
     3506        void (*publish)(libtrace_t *, int thread_id, libtrace_combine_t *, libtrace_result_t *);
     3507
     3508        /**
     3509         * Read as many results as possible from the trace.
     3510         * Directy calls the users code to handle results from here.
     3511         *
     3512         * THIS SHOULD BE NON-BLOCKING AND READ AS MANY AS POSSIBLE
     3513         * If publish is NULL, this probably should be NULL also otherwise
     3514         * it will not be called.
     3515         */
     3516        void (*read)(libtrace_t *, libtrace_combine_t *);
     3517
     3518        /**
     3519         * Called when the trace is finished to flush the final
     3520         * results to the reporter thread.
     3521         *
     3522         * There may be no results, in which case this should
     3523         * just return.
     3524         *
     3525         * Libtrace state:
     3526         * Called from reporter thread
     3527         * No perpkt threads will be running, i.e. publish will not be
     3528         * called again.
     3529         *
     3530         * If publish is NULL, this probably should be NULL also otherwise
     3531         * it will not be called.
     3532         */
     3533        void (*read_final)(libtrace_t *, libtrace_combine_t *);
     3534
     3535        /**
     3536         * Pause must make sure any results of the type packet are safe.
     3537         * That means trace_copy_packet() and destroy the original.
     3538         * This also should be NULL if publish is NULL.
     3539         */
     3540        void (*pause)(libtrace_t *, libtrace_combine_t *);
     3541
     3542        /**
     3543         * Data storage for all the combiner threads
     3544         */
     3545        void *queues;
     3546
     3547        /**
     3548         * XXX - todo make a union of useful types
     3549         * Configuration options what this does is upto the combiner
     3550         * choosen.
     3551         */
     3552        int configuration;
     3553};
     3554
    34713555
    34723556#define READ_EOF 0
  • lib/libtrace_int.h

    rf051c1b r2498008  
    214214        libtrace_message_queue_t messages; // Message handling
    215215        libtrace_ringbuffer_t rbuffer; // Input
    216         libtrace_vector_t vector; // Output
    217         libtrace_queue_t deque; // Real Output type makes more sense
    218216        libtrace_t * trace;
    219217        void* ret;
     
    345343        uint64_t received_packets;
    346344        struct user_configuration config;
     345        libtrace_combine_t combiner;
    347346};
    348347
  • lib/trace.c

    r5b4d121 r2498008  
    665665                // This has all of our packets
    666666                libtrace_ocache_destroy(&libtrace->packet_freelist);
    667                
    668                 for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
    669                         assert (libtrace_vector_get_size(&libtrace->perpkt_threads[i].vector) == 0);
    670                         libtrace_vector_destroy(&libtrace->perpkt_threads[i].vector);
    671                 }
     667                if (libtrace->combiner.destroy)
     668                        libtrace->combiner.destroy(libtrace, &libtrace->combiner);
    672669                free(libtrace->perpkt_threads);
    673670                libtrace->perpkt_threads = NULL;
  • lib/trace_parallel.c

    rf0e8bd6 r2498008  
    9595#include "rt_protocol.h"
    9696#include "hash_toeplitz.h"
     97#include "combiners.h"
    9798
    9899#include <pthread.h>
     
    185186        assert(libtrace->state != STATE_NEW);
    186187        return libtrace->reporter_thread.type == THREAD_REPORTER && libtrace->reporter;
     188}
     189
     190/**
     191 * When running the number of perpkt threads in use.
     192 * TODO what if the trace is not running yet, or has finished??
     193 *
     194 * @brief libtrace_perpkt_thread_nb
     195 * @param t The trace
     196 * @return
     197 */
     198DLLEXPORT int libtrace_get_perpkt_count(libtrace_t * t) {
     199        return t->perpkt_thread_count;
    187200}
    188201
     
    283296        t->type = THREAD_EMPTY;
    284297        libtrace_zero_ringbuffer(&t->rbuffer);
    285         libtrace_zero_vector(&t->vector);
    286         libtrace_zero_deque(&t->deque);
    287298        t->recorded_first = false;
    288299        t->perpkt_num = -1;
     
    328339}
    329340
    330 /** Used below in trace_make_results_packets_safe*/
     341/** Used below in trace_make_results_packets_safe */
    331342static void do_copy_result_packet(void *data)
    332343{
     
    344355
    345356/**
    346  * Make a safe replacement copy of any result packets that are owned
    347  * by the format in the result queue. Used when pausing traces.
    348  */
    349 static void trace_make_results_packets_safe(libtrace_t *trace) {
    350         libtrace_thread_t *t = get_thread_descriptor(trace);
    351         if (trace->reporter_flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED))
    352                 libtrace_deque_apply_function(&t->deque, &do_copy_result_packet);
    353         else
    354                 libtrace_vector_apply_function(&t->vector, &do_copy_result_packet);
    355 }
    356 
    357 /**
    358357 * Holds threads in a paused state, until released by broadcasting
    359358 * the condition mutex.
    360359 */
    361360static void trace_thread_pause(libtrace_t *trace, libtrace_thread_t *t) {
    362         if (t->type == THREAD_PERPKT)
    363                 trace_make_results_packets_safe(trace);
    364361        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
    365362        thread_change_state(trace, t, THREAD_PAUSED, false);
     
    11741171        libtrace_t *trace = (libtrace_t *)data;
    11751172        libtrace_thread_t *t = &trace->reporter_thread;
    1176         size_t res_size;
    11771173        libtrace_vector_t results;
    11781174        libtrace_vector_init(&results, sizeof(libtrace_result_t));
    11791175        fprintf(stderr, "Reporter thread starting\n");
    1180         libtrace_result_t result;
    1181         size_t i;
    11821176
    11831177        message.code = MESSAGE_STARTING;
     
    11971191                        // Check for results
    11981192                        case MESSAGE_POST_REPORTER:
    1199                                 res_size = trace_get_results(trace, &results);
    1200                                 for (i = 0; i < res_size; i++) {
    1201                                         ASSERT_RET(libtrace_vector_get(&results, i, (void *) &result), == 1);
    1202                                         (*trace->reporter)(trace, &result, NULL);
    1203                                 }
     1193                                trace->combiner.read(trace, &trace->combiner);
    12041194                                break;
    12051195                        case MESSAGE_DO_PAUSE:
     1196                                assert(trace->combiner.pause);
     1197                                trace->combiner.pause(trace, &trace->combiner);
    12061198                                message.code = MESSAGE_PAUSING;
    12071199                                message.sender = t;
     
    12171209
    12181210        // Flush out whats left now all our threads have finished
    1219         res_size = trace_get_results(trace, &results);
    1220         for (i = 0; i < res_size; i++) {
    1221                 ASSERT_RET(libtrace_vector_get(&results, i, (void *) &result), == 1);
    1222                 (*trace->reporter)(trace, &result, NULL);
    1223         }
    1224         libtrace_vector_destroy(&results);
     1211        trace->combiner.read_final(trace, &trace->combiner);
    12251212
    12261213        // GOODBYE
     
    15911578                        libtrace_ringbuffer_init(&t->rbuffer, libtrace->config.hasher_queue_size,
    15921579                                                 libtrace->config.hasher_polling?LIBTRACE_RINGBUFFER_POLLING:0);
    1593                 // Depending on the mode vector or deque might be chosen
    1594                 libtrace_vector_init(&t->vector, sizeof(libtrace_result_t));
    1595                 libtrace_deque_init(&t->deque, sizeof(libtrace_result_t));
    15961580                libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
    15971581                t->recorded_first = false;
     
    16111595        if (threads_started == 0)
    16121596                threads_started = trace_start_perpkt_threads(libtrace);
     1597
     1598        // No combiner set, use a default to reduce the chance of this breaking
     1599        if (libtrace->combiner.initialise == NULL && libtrace->combiner.publish == NULL)
     1600                libtrace->combiner = combiner_unordered;
     1601
     1602        if (libtrace->combiner.initialise)
     1603                libtrace->combiner.initialise(libtrace, &libtrace->combiner);
    16131604
    16141605        libtrace->reporter_thread.type = THREAD_REPORTER;
     
    20582049
    20592050/**
    2060  * Publish to the reduce queue, return
     2051 * Publishes a result to the reduce queue
    20612052 * Should only be called by a perpkt thread, i.e. from a perpkt handler
    20622053 */
    20632054DLLEXPORT void trace_publish_result(libtrace_t *libtrace, libtrace_thread_t *t, uint64_t key, void * value, int type) {
    20642055        libtrace_result_t res;
    2065         UNUSED static __thread int count = 0;
    20662056        res.type = type;
    2067 
    2068         libtrace_result_set_key_value(&res, key, value);
    2069 
    2070         /*
    2071         if (count == 1)
    2072                 printf("My vector size is %d\n", libtrace_vector_get_size(&t->vector));
    2073         count = (count+1) %1000;
    2074         libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :)
    2075         */
    2076         /*if (count == 1)
    2077                 printf("My vector size is %d\n", libtrace_deque_get_size(&t->deque));
    2078         count = (count+1)%1000;*/
    2079 
    2080         if (libtrace->reporter_flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED)) {
    2081                 if (libtrace_deque_get_size(&t->deque) >= libtrace->config.reporter_thold) {
    2082                         trace_post_reporter(libtrace);
    2083                 }
    2084                 //while (libtrace_deque_get_size(&t->deque) >= 1000)
    2085                 //      sched_yield();
    2086                 libtrace_deque_push_back(&t->deque, &res); // Automatically locking for us :)
    2087         } else {
    2088                 //while (libtrace_vector_get_size(&t->vector) >= 1000)
    2089                 //      sched_yield();
    2090 
    2091                 if (libtrace_vector_get_size(&t->vector) >= libtrace->config.reporter_thold) {
    2092                         trace_post_reporter(libtrace);
    2093                 }
    2094                 libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :)
    2095         }
    2096 }
    2097 
    2098 static int compareres(const void* p1, const void* p2)
    2099 {
    2100         if (libtrace_result_get_key((libtrace_result_t *) p1) < libtrace_result_get_key((libtrace_result_t *) p2))
    2101                 return -1;
    2102         if (libtrace_result_get_key((libtrace_result_t *) p1) == libtrace_result_get_key((libtrace_result_t *) p2))
    2103                 return 0;
    2104         else
    2105                 return 1;
    2106 }
    2107 
    2108 DLLEXPORT int trace_get_results(libtrace_t *libtrace, libtrace_vector_t * results) {
    2109         int i;
    2110         int flags = libtrace->reporter_flags; // Hint these aren't a changing
    2111 
    2112         libtrace_vector_empty(results);
    2113 
    2114         /* Here we assume queues are in order ascending order and they want
    2115          * the smallest result first. If they are not in order the results
    2116          * may not be in order.
    2117          */
    2118         if (flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED)) {
    2119                 int live_count = 0;
    2120                 bool live[libtrace->perpkt_thread_count]; // Set if a trace is alive
    2121                 uint64_t key[libtrace->perpkt_thread_count]; // Cached keys
    2122                 uint64_t min_key = UINT64_MAX; // XXX use max int here stdlimit.h?
    2123                 int min_queue = -1;
    2124 
    2125                 /* Loop through check all are alive (have data) and find the smallest */
    2126                 for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
    2127                         libtrace_queue_t *v = &libtrace->perpkt_threads[i].deque;
    2128                         if (libtrace_deque_get_size(v) != 0) {
    2129                                 libtrace_result_t r;
    2130                                 libtrace_deque_peek_front(v, (void *) &r);
    2131                                 live_count++;
    2132                                 live[i] = 1;
    2133                                 key[i] = libtrace_result_get_key(&r);
    2134                                 if (i==0 || min_key > key[i]) {
    2135                                         min_key = key[i];
    2136                                         min_queue = i;
    2137                                 }
    2138                         } else {
    2139                                 live[i] = 0;
    2140                         }
    2141                 }
    2142 
    2143                 /* Now remove the smallest and loop - special case if all threads have joined we always flush whats left */
    2144                 while ((live_count == libtrace->perpkt_thread_count) || (live_count &&
    2145                                 ((flags & REDUCE_SEQUENTIAL && min_key == libtrace->expected_key) ||
    2146                                 trace_finished(libtrace)))) {
    2147                         /* Get the minimum queue and then do stuff */
    2148                         libtrace_result_t r;
    2149 
    2150                         assert (libtrace_deque_pop_front(&libtrace->perpkt_threads[min_queue].deque, (void *) &r) == 1);
    2151                         libtrace_vector_push_back(results, &r);
    2152 
    2153                         // We expect the key we read +1 now
    2154                         libtrace->expected_key = key[min_queue] + 1;
    2155 
    2156                         // Now update the one we just removed
    2157                         if (libtrace_deque_get_size(&libtrace->perpkt_threads[min_queue].deque) )
    2158                         {
    2159                                 libtrace_deque_peek_front(&libtrace->perpkt_threads[min_queue].deque, (void *) &r);
    2160                                 key[min_queue] = libtrace_result_get_key(&r);
    2161                                 if (key[min_queue] <= min_key) {
    2162                                         // We are still the smallest, might be out of order though :(
    2163                                         min_key = key[min_queue];
    2164                                 } else {
    2165                                         min_key = key[min_queue]; // Update our minimum
    2166                                         // Check all find the smallest again - all are alive
    2167                                         for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
    2168                                                 if (live[i] && min_key > key[i]) {
    2169                                                         min_key = key[i];
    2170                                                         min_queue = i;
    2171                                                 }
    2172                                         }
    2173                                 }
    2174                         } else {
    2175                                 live[min_queue] = 0;
    2176                                 live_count--;
    2177                                 min_key = UINT64_MAX; // Update our minimum
    2178                                 // Check all find the smallest again - all are alive
    2179                                 for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
    2180                                         // Still not 100% TODO (what if order is wrong or not increasing)
    2181                                         if (live[i] && min_key >= key[i]) {
    2182                                                 min_key = key[i];
    2183                                                 min_queue = i;
    2184                                         }
    2185                                 }
    2186                         }
    2187                 }
    2188         } else { // Queues are not in order - return all results in the queue
    2189                 for (i = 0; i < libtrace->perpkt_thread_count; i++) {
    2190                         libtrace_vector_append(results, &libtrace->perpkt_threads[i].vector);
    2191                 }
    2192                 if (flags & REDUCE_SORT) {
    2193                         qsort(results->elements, results->size, results->element_size, &compareres);
    2194                 }
    2195         }
    2196         return libtrace_vector_get_size(results);
     2057        res.key = key;
     2058        res.value = value;
     2059        assert(libtrace->combiner.publish);
     2060        libtrace->combiner.publish(libtrace, t->perpkt_num, &libtrace->combiner, &res);
     2061        return;
    21972062}
    21982063
     
    22372102                        return 1;
    22382103                case TRACE_OPTION_SEQUENTIAL:
     2104                        libtrace->combiner = combiner_ordered;
    22392105                        if (*((int *) value))
    22402106                                libtrace->reporter_flags |= REDUCE_SEQUENTIAL;
     
    22432109                        return 1;
    22442110                case TRACE_OPTION_ORDERED:
     2111                        libtrace->combiner = combiner_ordered;
    22452112                        if (*((int *) value))
    22462113                                libtrace->reporter_flags |= REDUCE_ORDERED;
  • test/test-format-parallel-hasher.c

    rf051c1b r2498008  
    9999        int count;
    100100};
    101 int x;
    102 
     101
     102static int totalpkts = 0;
     103static int expected;
     104static void report_result(libtrace_t *trace, libtrace_result_t *result, libtrace_message_t *mesg) {
     105        static int totalthreads = 0;
     106        if (result) {
     107                assert(libtrace_result_get_key(result) == 0);
     108                printf("%d,", (int) libtrace_result_get_value(result));
     109                totalthreads++;
     110                totalpkts += (int) libtrace_result_get_value(result);
     111                assert(libtrace_result_get_value(result) == 25 ||
     112                        libtrace_result_get_value(result) == expected - 25);
     113        } else {
     114                switch(mesg->code) {
     115                        case MESSAGE_STARTING:
     116                                // Should have two threads here
     117                                assert(libtrace_get_perpkt_count(trace) == 2);
     118                                printf("\tLooks like %d threads are being used!\n\tcounts(", libtrace_get_perpkt_count(trace));
     119                                break;
     120                        case MESSAGE_STOPPING:
     121                                printf(")\n");
     122                                assert(totalthreads == libtrace_get_perpkt_count(trace));
     123                                break;
     124                }
     125        }
     126}
     127
     128static int x;
    103129static void* per_packet(libtrace_t *trace, libtrace_packet_t *pkt,
    104130                                                libtrace_message_t *mesg,
     
    177203 * Test that the hasher function works
    178204 */
    179 int test_hasher(const char *tracename, int expected) {
     205int test_hasher(const char *tracename) {
    180206        libtrace_t *trace;
    181207        int error = 0;
    182         int count = 0;
    183208        int i;
    184209        int hashercount = 0;
     
    195220
    196221        // Start it
    197         trace_pstart(trace, NULL, per_packet, NULL);
     222        trace_pstart(trace, NULL, per_packet, report_result);
    198223        iferr(trace,tracename);
    199224        /* Make sure traces survive a pause and restart */
     
    205230        /* Wait for all threads to stop */
    206231        trace_join(trace);
    207         libtrace_vector_t results;
    208 
    209         /* Now lets check the results */
    210         libtrace_vector_init(&results, sizeof(libtrace_result_t));
    211         trace_get_results(trace, &results);
    212         // Should have two results/threads here
    213         assert(libtrace_vector_get_size(&results) == 2);
    214         for (i = 0; i < libtrace_vector_get_size(&results); i++) {
    215                 int ret;
    216                 libtrace_result_t result;
    217                 ret = libtrace_vector_get(&results, i, (void *) &result);
    218                 assert(ret == 1);
    219                 assert(libtrace_result_get_key(&result) == 0);
    220                 count += (int) libtrace_result_get_value(&result);
    221                 printf("%d,", (int) libtrace_result_get_value(&result));
    222                
    223         }
    224         printf(")\n");
    225         for (i = 0; i < libtrace_vector_get_size(&results); i++) {
    226                 int ret;
    227                 libtrace_result_t result;
    228                 ret = libtrace_vector_get(&results, i, (void *) &result);
    229                 assert(libtrace_result_get_value(&result) == 25 ||
    230                         libtrace_result_get_value(&result) == expected - 25);
    231         }
    232         libtrace_vector_destroy(&results);
    233 
     232
     233        /* Now check we have all received all the packets */
    234234        if (error == 0) {
    235                 if (count == expected) {
     235                if (totalpkts == expected) {
    236236                        printf("success: %d packets read\n",expected);
    237237                } else {
    238                         printf("failure: %d packets expected, %d seen\n",expected,count);
     238                        printf("failure: %d packets expected, %d seen\n",expected,totalpkts);
    239239                        error = 1;
    240240                }
     
    250250int main(int argc, char *argv[]) {
    251251        int error = 0;
    252         int expected = 100;
    253252        const char *tracename;
     253        expected = 100;
    254254
    255255        if (argc<2) {
     
    262262        if (strcmp(argv[1],"rtclient")==0) expected=101;
    263263
    264         error = test_hasher(tracename, expected);
     264        error = test_hasher(tracename);
    265265
    266266    return error;
  • test/test-format-parallel-singlethreaded-hasher.c

    rf051c1b r2498008  
    9999        int count;
    100100};
    101 int x;
    102 
     101
     102static int totalpkts = 0;
     103static int expected;
     104static void report_result(libtrace_t *trace, libtrace_result_t *result, libtrace_message_t *mesg) {
     105        static int totalthreads = 0;
     106        if (result) {
     107                assert(libtrace_result_get_key(result) == 0);
     108                printf("%d,", (int) libtrace_result_get_value(result));
     109                totalthreads++;
     110                totalpkts += (int) libtrace_result_get_value(result);
     111        } else {
     112                switch(mesg->code) {
     113                        case MESSAGE_STARTING:
     114                                // Should have a single thread here
     115                                assert(libtrace_get_perpkt_count(trace) == 1);
     116                                printf("\tLooks like %d threads are being used!\n\tcounts(", libtrace_get_perpkt_count(trace));
     117                                break;
     118                        case MESSAGE_STOPPING:
     119                                printf(")\n");
     120                                assert(totalthreads == libtrace_get_perpkt_count(trace));
     121                                break;
     122                }
     123        }
     124}
     125
     126static int x;
    103127static void* per_packet(libtrace_t *trace, libtrace_packet_t *pkt,
    104128                                                libtrace_message_t *mesg,
     
    177201 * It might not be called but this ensures consistency
    178202 */
    179 int test_hasher_singlethreaded(const char *tracename, int expected) {
     203int test_hasher_singlethreaded(const char *tracename) {
    180204        libtrace_t *trace;
    181205        int error = 0;
    182         int count = 0;
    183206        int i;
    184207        int hashercount = 0;
     
    195218
    196219        // Start it
    197         trace_pstart(trace, NULL, per_packet, NULL);
     220        trace_pstart(trace, NULL, per_packet, report_result);
    198221        iferr(trace,tracename);
    199222
     
    206229        /* Wait for all threads to stop */
    207230        trace_join(trace);
    208         libtrace_vector_t results;
    209 
    210         /* Now lets check the results */
    211         libtrace_vector_init(&results, sizeof(libtrace_result_t));
    212         trace_get_results(trace, &results);
    213         // Should have one result/thread here
    214         assert(libtrace_vector_get_size(&results) == 1);
    215         for (i = 0; i < libtrace_vector_get_size(&results); i++) {
    216                 int ret;
    217                 libtrace_result_t result;
    218                 ret = libtrace_vector_get(&results, i, (void *) &result);
    219                 assert(ret == 1);
    220                 assert(libtrace_result_get_key(&result) == 0);
    221                 count += (int) libtrace_result_get_value(&result);
    222                 printf("%d,", (int) libtrace_result_get_value(&result));
    223         }
    224         printf(")\n");
    225         libtrace_vector_destroy(&results);
     231
     232        /* Now check we have all received all the packets */
    226233        if (error == 0) {
    227                 if (count == expected) {
     234                if (totalpkts == expected) {
    228235                        printf("success: %d packets read\n",expected);
    229236                } else {
    230                         printf("failure: %d packets expected, %d seen\n",expected,count);
     237                        printf("failure: %d packets expected, %d seen\n",expected,totalpkts);
    231238                        error = 1;
    232239                }
     
    241248int main(int argc, char *argv[]) {
    242249        int error = 0;
    243         int expected = 100;
    244250        const char *tracename;
     251        expected = 100;
    245252
    246253        if (argc<2) {
     
    253260        if (strcmp(argv[1],"rtclient")==0) expected=101;
    254261
    255         error = test_hasher_singlethreaded(tracename, expected);
     262        error = test_hasher_singlethreaded(tracename);
    256263    return error;
    257264}
  • test/test-format-parallel-singlethreaded.c

    rf051c1b r2498008  
    9999        int count;
    100100};
    101 int x;
    102 
     101
     102static int totalpkts = 0;
     103static void report_result(libtrace_t *trace, libtrace_result_t *result, libtrace_message_t *mesg) {
     104        static int totalthreads = 0;
     105        if (result) {
     106                assert(libtrace_result_get_key(result) == 0);
     107                printf("%d,", (int) libtrace_result_get_value(result));
     108                totalthreads++;
     109                totalpkts += (int) libtrace_result_get_value(result);
     110        } else {
     111                switch(mesg->code) {
     112                        case MESSAGE_STARTING:
     113                                // Should have a single thread here
     114                                assert(libtrace_get_perpkt_count(trace) == 1);
     115                                printf("\tLooks like %d threads are being used!\n\tcounts(", libtrace_get_perpkt_count(trace));
     116                                break;
     117                        case MESSAGE_STOPPING:
     118                                printf(")\n");
     119                                assert(totalthreads == libtrace_get_perpkt_count(trace));
     120                                break;
     121                }
     122        }
     123}
     124
     125static int x;
    103126static void* per_packet(libtrace_t *trace, libtrace_packet_t *pkt,
    104127                                                libtrace_message_t *mesg,
     
    167190        libtrace_t *trace;
    168191        int error = 0;
    169         int count = 0;
    170192        int i;
    171193        printf("Testing single threaded\n");
     
    180202
    181203        // Start it
    182         trace_pstart(trace, NULL, per_packet, NULL);
     204        trace_pstart(trace, NULL, per_packet, report_result);
    183205        iferr(trace,tracename);
    184206
     
    191213        /* Wait for all threads to stop */
    192214        trace_join(trace);
    193         libtrace_vector_t results;
    194 
    195         /* Now lets check the results */
    196         libtrace_vector_init(&results, sizeof(libtrace_result_t));
    197         trace_get_results(trace, &results);
    198         // Should have one result/thread here
    199         assert(libtrace_vector_get_size(&results) == 1);
    200         for (i = 0; i < libtrace_vector_get_size(&results); i++) {
    201                 int ret;
    202                 libtrace_result_t result;
    203                 ret = libtrace_vector_get(&results, i, (void *) &result);
    204                 assert(ret == 1);
    205                 assert(libtrace_result_get_key(&result) == 0);
    206                 count += (int) libtrace_result_get_value(&result);
    207                 printf("%d,", (int) libtrace_result_get_value(&result));
    208         }
    209         printf(")\n");
    210         libtrace_vector_destroy(&results);
    211 
     215
     216        /* Now check we have all received all the packets */
    212217        if (error == 0) {
    213                 if (count == expected) {
     218                if (totalpkts == expected) {
    214219                        printf("success: %d packets read\n",expected);
    215220                } else {
    216                         printf("failure: %d packets expected, %d seen\n",expected,count);
     221                        printf("failure: %d packets expected, %d seen\n",expected,totalpkts);
    217222                        error = 1;
    218223                }
  • test/test-format-parallel-stressthreads.c

    rf051c1b r2498008  
    9999        int count;
    100100};
    101 int x;
    102 
     101
     102static int totalpkts = 0;
     103static void report_result(libtrace_t *trace, libtrace_result_t *result, libtrace_message_t *mesg) {
     104        static int totalthreads = 0;
     105        if (result) {
     106                assert(libtrace_result_get_key(result) == 0);
     107                printf("%d,", (int) libtrace_result_get_value(result));
     108                totalthreads++;
     109                totalpkts += (int) libtrace_result_get_value(result);
     110        } else {
     111                switch(mesg->code) {
     112                        case MESSAGE_STARTING:
     113                                // Should have a single thread here
     114                                assert(libtrace_get_perpkt_count(trace) == 100);
     115                                printf("\tLooks like %d threads are being used!\n\tcounts(", libtrace_get_perpkt_count(trace));
     116                                break;
     117                        case MESSAGE_STOPPING:
     118                                printf(")\n");
     119                                assert(totalthreads == libtrace_get_perpkt_count(trace));
     120                                break;
     121                }
     122        }
     123}
     124
     125static int x;
    103126static void* per_packet(libtrace_t *trace, libtrace_packet_t *pkt,
    104127                                                libtrace_message_t *mesg,
     
    167190        libtrace_t *trace;
    168191        int error = 0;
    169         int count = 0;
    170192        int i;
    171193        printf("Testing single threaded\n");
     
    180202
    181203        // Start it
    182         trace_pstart(trace, NULL, per_packet, NULL);
     204        trace_pstart(trace, NULL, per_packet, report_result);
    183205        iferr(trace,tracename);
    184206
     
    191213        /* Wait for all threads to stop */
    192214        trace_join(trace);
    193         libtrace_vector_t results;
    194 
    195         /* Now lets check the results */
    196         libtrace_vector_init(&results, sizeof(libtrace_result_t));
    197         trace_get_results(trace, &results);
    198         // Should have 100 results/threads here
    199         assert(libtrace_vector_get_size(&results) == 100);
    200         for (i = 0; i < libtrace_vector_get_size(&results); i++) {
    201                 int ret;
    202                 libtrace_result_t result;
    203                 ret = libtrace_vector_get(&results, i, (void *) &result);
    204                 assert(ret == 1);
    205                 assert(libtrace_result_get_key(&result) == 0);
    206                 count += (int) libtrace_result_get_value(&result);
    207                 printf("%d,", (int) libtrace_result_get_value(&result));
    208         }
    209         printf(")\n");
    210         libtrace_vector_destroy(&results);
    211 
     215
     216        /* Now check we have all received all the packets */
    212217        if (error == 0) {
    213                 if (count == expected) {
     218                if (totalpkts == expected) {
    214219                        printf("success: %d packets read\n",expected);
    215220                } else {
    216                         printf("failure: %d packets expected, %d seen\n",expected,count);
     221                        printf("failure: %d packets expected, %d seen\n",expected,totalpkts);
    217222                        error = 1;
    218223                }
  • test/test-format-parallel.c

    rf051c1b r2498008  
    9191}
    9292
    93 
    9493struct TLS {
    9594        bool seen_start_message;
     
    9998        int count;
    10099};
     100
     101static int totalpkts = 0;
     102static void report_result(libtrace_t *trace, libtrace_result_t *result, libtrace_message_t *mesg) {
     103        static int totalthreads = 0;
     104        if (result) {
     105                assert(libtrace_result_get_key(result) == 0);
     106                printf("%d,", (int) libtrace_result_get_value(result));
     107                totalthreads++;
     108                totalpkts += (int) libtrace_result_get_value(result);
     109        } else {
     110                switch(mesg->code) {
     111                        case MESSAGE_STARTING:
     112                                printf("\tLooks like %d threads are being used!\n\tcounts(", libtrace_get_perpkt_count(trace));
     113                                break;
     114                        case MESSAGE_STOPPING:
     115                                printf(")\n");
     116                                assert(totalthreads == libtrace_get_perpkt_count(trace));
     117                                break;
     118                }
     119        }
     120}
     121
    101122int x;
    102 
    103 static void* per_packet(libtrace_t *trace, libtrace_packet_t *pkt,
     123static void* per_packet(libtrace_t *trace, libtrace_packet_t *pkt,
    104124                                                libtrace_message_t *mesg,
    105125                                                libtrace_thread_t *t) {
     
    111131        static __thread bool seen_paused_message = false;
    112132        static __thread bool seen_pausing_message = false;
    113         static __thread count = 0;
     133        static __thread int count = 0;
    114134        tls = trace_get_tls(t);
    115135
     
    176196int main(int argc, char *argv[]) {
    177197        int error = 0;
    178         int count = 0;
    179198        int expected = 100;
    180         int i;
    181199        const char *tracename;
    182200        libtrace_t *trace;
     
    194212        if (strcmp(argv[1],"rtclient")==0) expected=101;
    195213
    196         trace_pstart(trace, NULL, per_packet, NULL);
     214        trace_pstart(trace, NULL, per_packet, report_result);
    197215        iferr(trace,tracename);
    198216
     
    205223        /* Wait for all threads to stop */
    206224        trace_join(trace);
    207         libtrace_vector_t results;
    208 
    209         /* Now lets check the results */
    210         libtrace_vector_init(&results, sizeof(libtrace_result_t));
    211         trace_get_results(trace, &results);
    212         printf("\tLooks like %d threads were used!\n\tcounts(", libtrace_vector_get_size(&results));
    213         for (i = 0; i < libtrace_vector_get_size(&results); i++) {
    214                 int ret;
    215                 libtrace_result_t result;
    216                 ret = libtrace_vector_get(&results, i, (void *) &result);
    217                 assert(ret == 1);
    218                 assert(libtrace_result_get_key(&result) == 0);
    219                 count += (int) libtrace_result_get_value(&result);
    220                 printf("%d,", (int) libtrace_result_get_value(&result));
    221         }
    222         printf(")\n");
    223         libtrace_vector_destroy(&results);
    224 
     225
     226        /* Now check we have all received all the packets */
    225227        if (error == 0) {
    226                 if (count == expected) {
     228                if (totalpkts == expected) {
    227229                        printf("success: %d packets read\n",expected);
    228230                } else {
    229                         printf("failure: %d packets expected, %d seen\n",expected,count);
     231                        printf("failure: %d packets expected, %d seen\n",expected,totalpkts);
    230232                        error = 1;
    231233                }
     
    233235                iferr(trace,tracename);
    234236        }
     237
    235238    trace_destroy(trace);
    236239    return error;
  • tools/traceanon/traceanon_parallel.c

    r50b1bee r2498008  
    208208struct libtrace_out_t *writer = 0;
    209209
    210 static void* write_out(libtrace_t *trace, libtrace_result_t *result, UNUSED libtrace_message_t *mesg) {
     210static void write_out(libtrace_t *trace, libtrace_result_t *result, UNUSED libtrace_message_t *mesg) {
    211211        static uint64_t packet_count = 0; // TESTING PURPOSES, this is not going to work with a live format
    212212
     
    226226                }
    227227        }
    228         return NULL;
    229228}
    230229
  • tools/tracertstats/tracertstats_parallel.c

    rf051c1b r2498008  
    136136} result_t;
    137137
    138 
    139 static int reduce(libtrace_t* trace, void* global_blob, uint64_t *last_ts)
    140 {       
    141         int i,j;
    142         //uint64_t count=0, bytes=0;
     138static uint64_t last_ts = 0;
     139static void process_result(libtrace_t *trace UNUSED, libtrace_result_t *result, libtrace_message_t *mesg UNUSED)  {
    143140        static uint64_t ts = 0;
    144         libtrace_vector_t results;
    145         libtrace_vector_init(&results, sizeof(libtrace_result_t));
    146         trace_get_results(trace, &results);
    147         //uint64_t packets;
    148        
    149         /* Get the results from each core and sum 'em up */
    150         for (i = 0 ; i < libtrace_vector_get_size(&results) ; i++) {
    151                 libtrace_result_t result;
    152                
    153                 assert(libtrace_vector_get(&results, i, (void *) &result) == 1);
    154                 ts = libtrace_result_get_key(&result);
    155                 if (*last_ts == 0)
    156                         *last_ts = ts;
    157                
    158                 result_t * res = libtrace_result_get_value(&result);
    159                 static result_t *  last_res = NULL;
    160                 // Memory manager might falsely trigger this
    161                 assert(res != last_res);
    162                 last_res = res;
    163                 //printf("Perpkt published %"PRIu64" - c=%"PRIu64"\n", ts, res->total.count);
    164                 while (*last_ts < ts) {
    165                         report_results((double) *last_ts * (double) packet_interval, count, bytes);
     141
     142        if (result) {
     143                int j;
     144                result_t *res;
     145                ts = libtrace_result_get_key(result);
     146                res = libtrace_result_get_value(result);
     147                if (last_ts == 0)
     148                        last_ts = ts;
     149                while (last_ts < ts) {
     150                        report_results((double) last_ts * (double) packet_interval, count, bytes);
    166151                        count = 0;
    167152                        bytes = 0;
    168153                        for (j = 0; j < filter_count; j++)
    169154                                filters[j].count = filters[j].bytes = 0;
    170                         (*last_ts)++;
    171                 }
    172                
     155                        last_ts++;
     156                }
    173157                count += res->total.count;
    174158                bytes += res->total.bytes;
     
    179163                free(res);
    180164        }
    181         // Done with these results - Free internally and externally
    182         libtrace_vector_destroy(&results);
    183        
    184         return 0;
    185165}
    186166
     
    276256}
    277257
    278 static uint64_t bad_hash(const libtrace_packet_t * pkt, void *data) {
     258static uint64_t bad_hash(const libtrace_packet_t * pkt UNUSED, void *data UNUSED) {
    279259        return 0;
    280260}
     
    284264{
    285265        int j;
    286         uint64_t last_ts = 0;
    287266
    288267        if (!merge_inputs)
     
    318297        }
    319298
    320         if (trace_pstart(trace, NULL, &per_packet, NULL)==-1) {
     299        if (trace_pstart(trace, NULL, &per_packet, process_result)==-1) {
    321300                trace_perror(trace,"Failed to start trace");
    322301                trace_destroy(trace);
     
    327306
    328307
    329         // reduce
    330         while (!trace_finished(trace)) {
    331                 // Read messages
    332                 libtrace_message_t message;
    333                
    334                 // We just release and do work currently, maybe if something
    335                 // interesting comes through we'd deal with that
    336                 libtrace_thread_get_message(trace, &message);
    337                
    338                 while (libtrace_thread_try_get_message(trace, &message) != LIBTRACE_MQ_FAILED) { }
    339                 reduce(trace, NULL, &last_ts);
    340         }
    341 
    342308        // Wait for all threads to stop
    343309        trace_join(trace);
    344310       
    345         reduce(trace, NULL, &last_ts);
    346311        // Flush the last one out
    347312        report_results((double) last_ts * (double) packet_interval, count, bytes);
  • tools/tracestats/tracestats_parallel.c

    rf051c1b r2498008  
    150150}
    151151
    152 static int reduce(libtrace_t* trace, void* global_blob)
    153 {
    154         int i,j;
    155         uint64_t count=0, bytes=0;
    156         libtrace_vector_t results;
    157         libtrace_vector_init(&results, sizeof(libtrace_result_t));
    158         trace_get_results(trace, &results);
     152static void report_result(libtrace_t *trace UNUSED, libtrace_result_t *result, libtrace_message_t *mesg) {
     153        static uint64_t count=0, bytes=0;
    159154        uint64_t packets;
    160        
    161         /* Get the results from each core and sum 'em up */
    162         for (i = 0 ; i < libtrace_vector_get_size(&results) ; i++) {
    163                 libtrace_result_t result;
    164                 assert(libtrace_vector_get(&results, i, (void *) &result) == 1);
    165                 assert(libtrace_result_get_key(&result) == 0);
    166                 statistics_t * res = libtrace_result_get_value(&result);
     155        int i;
     156        if (result) {
     157                int j;
     158                /* Get the results from each core and sum 'em up */
     159                assert(libtrace_result_get_key(result) == 0);
     160                statistics_t * res = libtrace_result_get_value(result);
    167161                count += res[0].count;
    168162                bytes += res[0].bytes;
     
    172166                }
    173167                free(res);
    174         }
    175         // Done with these results - Free internally and externally
    176         libtrace_vector_destroy(&results);
    177 
    178     printf("%-30s\t%12s\t%12s\t%7s\n","filter","count","bytes","%");
    179         for(i=0;i<filter_count;++i) {
    180                 printf("%30s:\t%12"PRIu64"\t%12"PRIu64"\t%7.03f\n",filters[i].expr,filters[i].count,filters[i].bytes,filters[i].count*100.0/count);
    181                 filters[i].bytes=0;
    182                 filters[i].count=0;
    183         }
    184         packets=trace_get_received_packets(trace);
    185         if (packets!=UINT64_MAX)
    186                 fprintf(stderr,"%30s:\t%12" PRIu64"\n",
    187                                 "Input packets", packets);
    188         packets=trace_get_filtered_packets(trace);
    189         if (packets!=UINT64_MAX)
    190                 fprintf(stderr,"%30s:\t%12" PRIu64"\n",
    191                                 "Filtered packets", packets);
    192         packets=trace_get_dropped_packets(trace);
    193         if (packets!=UINT64_MAX)
    194                 fprintf(stderr,"%30s:\t%12" PRIu64"\n",
    195                                 "Dropped packets",packets);
    196         packets=trace_get_accepted_packets(trace);
    197         if (packets!=UINT64_MAX)
    198                 fprintf(stderr,"%30s:\t%12" PRIu64 "\n",
    199                                 "Accepted Packets",packets);
    200         printf("%30s:\t%12"PRIu64"\t%12" PRIu64 "\n","Total",count,bytes);
    201         totcount+=count;
    202         totbytes+=bytes;
    203        
    204         return 0;
     168        } else switch (mesg->code) {
     169                case MESSAGE_STOPPING:
     170                        printf("%-30s\t%12s\t%12s\t%7s\n","filter","count","bytes","%");
     171                        for(i=0;i<filter_count;++i) {
     172                                printf("%30s:\t%12"PRIu64"\t%12"PRIu64"\t%7.03f\n",filters[i].expr,filters[i].count,filters[i].bytes,filters[i].count*100.0/count);
     173                                filters[i].bytes=0;
     174                                filters[i].count=0;
     175                        }
     176                        packets=trace_get_received_packets(trace);
     177                        if (packets!=UINT64_MAX)
     178                                fprintf(stderr,"%30s:\t%12" PRIu64"\n",
     179                                                "Input packets", packets);
     180                        packets=trace_get_filtered_packets(trace);
     181                        if (packets!=UINT64_MAX)
     182                                fprintf(stderr,"%30s:\t%12" PRIu64"\n",
     183                                                "Filtered packets", packets);
     184                        packets=trace_get_dropped_packets(trace);
     185                        if (packets!=UINT64_MAX)
     186                                fprintf(stderr,"%30s:\t%12" PRIu64"\n",
     187                                                "Dropped packets",packets);
     188                        packets=trace_get_accepted_packets(trace);
     189                        if (packets!=UINT64_MAX)
     190                                fprintf(stderr,"%30s:\t%12" PRIu64 "\n",
     191                                                "Accepted Packets",packets);
     192                        printf("%30s:\t%12"PRIu64"\t%12" PRIu64 "\n","Total",count,bytes);
     193                        totcount+=count;
     194                        totbytes+=bytes;
     195        }
    205196}
    206197
     
    212203        return 0;
    213204}
     205
     206struct user_configuration uc;
     207
    214208
    215209/* Process a trace, counting packets that match filter(s) */
     
    230224    //trace_set_hasher(trace, HASHER_CUSTOM, &rand_hash, NULL);
    231225        option = 2;
    232         trace_parallel_config(trace, TRACE_OPTION_SET_PERPKT_THREAD_COUNT, &option);
     226        //trace_parallel_config(trace, TRACE_OPTION_SET_PERPKT_THREAD_COUNT, &option);
     227        trace_parallel_config(trace, TRACE_OPTION_SET_CONFIG, &uc);
     228        trace_parallel_config(trace, TRACE_OPTION_ORDERED, &uc);
     229
    233230        //trace_parallel_config(trace, TRACE_OPTION_SET_MAPPER_BUFFER_SIZE, &option);
    234231
     
    242239
    243240
    244         if (trace_pstart(trace, (void *)&blob, &per_packet, NULL)==-1) {
     241        if (trace_pstart(trace, (void *)&blob, &per_packet, report_result)==-1) {
    245242                trace_perror(trace,"Failed to start trace");
    246243                return;
     
    249246        // Wait for all threads to stop
    250247        trace_join(trace);
    251         reduce(trace, NULL);
    252248
    253249        //map_pair_iterator_t * results = NULL;
     
    273269        int i;
    274270        struct sigaction sigact;
    275 
     271        ZERO_USER_CONFIG(uc);
    276272        while(1) {
    277273                int option_index;
     
    279275                        { "filter",        1, 0, 'f' },
    280276                        { "libtrace-help", 0, 0, 'H' },
     277                        { "config",             1, 0, 'u' },
     278                        { "config-file",                1, 0, 'U' },
    281279                        { NULL,            0, 0, 0   },
    282280                };
    283281
    284                 int c=getopt_long(argc, argv, "f:H",
     282                int c=getopt_long(argc, argv, "f:Hu:U:",
    285283                                long_options, &option_index);
    286284
     
    301299                                exit(1);
    302300                                break;
     301                        case 'u':
     302                                  parse_user_config(&uc, optarg);
     303                                  break;
     304                        case 'U':;
     305                                FILE * f = fopen(optarg, "r");
     306                                if (f != NULL) {
     307                                        parse_user_config_file(&uc, f);
     308                                } else {
     309                                        perror("Failed to open configuration file\n");
     310                                        usage(argv[0]);
     311                                }
     312                                break;
    303313                        default:
    304314                                fprintf(stderr,"Unknown option: %c\n",c);
Note: See TracChangeset for help on using the changeset viewer.