Changeset 5b4d121


Ignore:
Timestamp:
08/19/14 13:59:33 (6 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
f0e8bd6
Parents:
957a72a
Message:

Adds a configuration parser to make it easy to change the parallel configuration.
Adds more configuration options (Tidies some verbose debugging output).
Implements tick packets for the hasher thread case.
Some other minor bug fixes

Files:
4 edited

Legend:

Unmodified
Added
Removed
  • lib/libtrace.h.in

    rf051c1b r5b4d121  
    32883288           * or less this is ignored.
    32893289           */
    3290           TRACE_OPTION_TICK_INTERVAL
     3290          TRACE_OPTION_TICK_INTERVAL,
     3291        TRACE_OPTION_GET_CONFIG,
     3292        TRACE_OPTION_SET_CONFIG
    32913293} trace_parallel_option_t;
    32923294
     
    33823384         * A unblockable error message will be printed.
    33833385         */
    3384         size_t packet_global_cache_size;
    3385         // Per thread
     3386        size_t packet_cache_size;
     3387        /**
     3388         * Per thread local cache size for the packet freelist
     3389         */
    33863390        size_t packet_thread_cache_size;
    3387         // Packet count limited
     3391        /**
     3392         * If true the total number of packets that can be created by a trace is limited
     3393         * to the packet_cache_size, otherwise once packet_cache_size is exceeded alloc
     3394         * and free will be used to create and free packets, this will be slower than
     3395         * using the freelist and could run a machine out of memory.
     3396         *
     3397         * However this does make it easier to ensure that deadlocks will not occur
     3398         * due to running out of packets
     3399         */
    33883400        bool fixed_packet_count;
    3389         // Bursts/Batches of packets this size are combined, used in single thread mode
     3401        /**
     3402         * When reading from a single threaded input source to reduce
     3403         * lock contention a 'burst' of packets is read per pkt thread
     3404         * this determines the bursts size.
     3405         */
    33903406        size_t burst_size;
    33913407        // Each perpkt thread has a queue leading into the reporter
    33923408        //size_t reporter_queue_size;
    3393         /** The tick interval - in milliseconds (0) */
     3409
     3410        /**
     3411         * The tick interval - in milliseconds
     3412         * When a live trace is used messages are sent at the tick
     3413         * interval to ensure that all perpkt threads receive data
     3414         * this allows results to be printed in cases flows are
     3415         * not being directed to a certian thread, while still
     3416         * maintaining order.
     3417         */
    33943418        size_t tick_interval;
    3395         // The tick interval for file based traces, in number of packets TODO implement this
     3419
     3420        /**
     3421         * Like the tick interval but used in the case of file format
     3422         * This specifies the number of packets before inserting a tick to
     3423         * every thread.
     3424         */
    33963425        size_t tick_count;
    33973426
    3398         // The number of per packet threads requested
     3427        /**
     3428         * The number of per packet threads requested, 0 means use default.
     3429         * Default typically be the number of processor threads detected less one or two.
     3430         */
    33993431        size_t perpkt_threads;
    34003432
     
    34093441        size_t hasher_queue_size;
    34103442
    3411         // Reporter threashold before results are sent
     3443        /**
     3444         * If true use a polling hasher queue, that means that we will spin/or yeild
     3445         * when rather than blocking on a lock. This applies to both the hasher thread
     3446         * and perpkts reading the queues.
     3447         */
     3448        bool hasher_polling;
     3449
     3450        /**
     3451         * If true the reporter thread will continuously poll waiting for results
     3452         * if false they are only checked when a message is received, this message
     3453         * is controlled by reporter_thold.
     3454         */
     3455        bool reporter_polling;
     3456
     3457        /**
     3458         * Perpkt thread result queue size before triggering the reporter step to read results
     3459         */
    34123460        size_t reporter_thold;
     3461
     3462        /**
     3463         * Prints a line to standard error for every state change
     3464         * for both the trace as a whole and for each thread.
     3465         */
     3466        bool debug_state;
    34133467};
     3468#include <stdio.h>
     3469DLLEXPORT void parse_user_config(struct user_configuration* uc, char * str);
     3470DLLEXPORT void parse_user_config_file(struct user_configuration* uc, FILE *file);
     3471
     3472#define READ_EOF 0
     3473#define READ_ERROR -1
     3474#define READ_MESSAGE -2
     3475// Used for inband tick message
     3476#define READ_TICK -3
    34143477
    34153478#define ZERO_USER_CONFIG(config) memset(&config, 0, sizeof(struct user_configuration));
  • lib/trace.c

    rf051c1b r5b4d121  
    919919DLLEXPORT int trace_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet) {
    920920        assert(libtrace);
    921         assert(packet); 
     921        assert(packet);
    922922        /* Verify the packet is valid */
    923923        if (!libtrace->started) {
  • lib/trace_parallel.c

    rf051c1b r5b4d121  
    101101
    102102
    103 #define VERBOSE_DEBBUGING 1
    104 
    105 
    106103static size_t trace_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packets[], size_t nb_packets);
    107104
     
    215212        }
    216213
    217 #if VERBOSE_DEBBUGING
    218         fprintf(stderr, "Thread %d state changed from %d to %d\n", (int) t->tid,
    219                 prev_state, t->state);
    220 #endif
     214        if (trace->config.debug_state)
     215                fprintf(stderr, "Thread %d state changed from %d to %d\n", (int) t->tid,
     216                        prev_state, t->state);
     217
    221218        if (need_lock)
    222219                pthread_mutex_unlock(&trace->libtrace_lock);
     
    240237        prev_state = trace->state;
    241238        trace->state = new_state;
    242 #if VERBOSE_DEBBUGING
    243         fprintf(stderr, "Trace(%s) state changed from %s to %s\n",
    244                 trace->uridata, get_trace_state_name(prev_state),
    245                 get_trace_state_name(trace->state));
    246 #endif
     239
     240        if (trace->config.debug_state)
     241                fprintf(stderr, "Trace(%s) state changed from %s to %s\n",
     242                        trace->uridata, get_trace_state_name(prev_state),
     243                        get_trace_state_name(trace->state));
     244
    247245        if (need_lock)
    248246                pthread_mutex_unlock(&trace->libtrace_lock);
     
    342340                res->value = (void *)dup;
    343341                trace_destroy_packet(oldpkt);
    344                 fprintf(stderr, "Made a packet safe!!\n");
    345342        }
    346343}
     
    375372
    376373
     374
     375/**
     376 * Dispatches packets to their correct place and applies any translations
     377 * as needed
     378 * @param trace
     379 * @param t
     380 * @param packet (in, out) this will be set to NULL if the user doesn't return the packet for reuse
     381 * @return -1 if an error or EOF has occured and the trace should end otherwise 0 to continue as normal
     382 */
     383static inline int dispatch_packets(libtrace_t *trace, libtrace_thread_t *t, libtrace_packet_t **packets,
     384                                   size_t nb_packets) {
     385        libtrace_message_t message;
     386        size_t i;
     387        for (i = 0; i < nb_packets; ++i) {
     388                if (packets[i]->error > 0) {
     389                        packets[i] = (*trace->per_pkt)(trace, packets[i], NULL, t);
     390                } else if (packets[i]->error == READ_TICK) {
     391                        message.code = MESSAGE_TICK;
     392                        message.additional.uint64 = trace_packet_get_order(packets[i]);
     393                        message.sender = t;
     394                        (*trace->per_pkt)(trace, NULL, &message, t);
     395                } else if (packets[i]->error != READ_MESSAGE) {
     396                        // An error this should be the last packet we read
     397                        size_t z;
     398                        // We could have an eof or error and a message such as pause
     399                        for (z = i ; z < nb_packets; ++z) {
     400                                fprintf(stderr, "i=%d nb_packet=%d err=%d\n", (int) z, (int) nb_packets, packets[z]->error);
     401                                assert (packets[z]->error <= 0);
     402                        }
     403                        return -1;
     404                }
     405                // -2 is a message its not worth checking now just finish this lot and we'll check
     406                // when we loop next
     407        }
     408        return 0;
     409}
     410
    377411/**
    378412 * The is the entry point for our packet processing threads.
     
    417451                                        message.sender = t;
    418452                                        (*trace->per_pkt)(trace, NULL, &message, t);
    419                                         // If a hasher thread is running empty input queues so we don't loose data
     453                                        // If a hasher thread is running empty input queues so we don't lose data
    420454                                        if (trace_has_dedicated_hasher(trace)) {
    421455                                                fprintf(stderr, "Trace is using a hasher thread emptying queues\n");
     
    423457                                                while(!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
    424458                                                        ASSERT_RET(trace_pread_packet(trace, t, packets, 1), == 1);
    425                                                         if (packets[0]->error > 0)
    426                                                                 packets[0] = (*trace->per_pkt)(trace, packets[0], NULL, t);
    427                                                         else if (packets[0]->error != -2) {
     459                                                        if (dispatch_packets(trace, t, packets, 1) == -1) {
    428460                                                                // EOF or error, either way we'll stop
    429461                                                                while (!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
     
    460492                }
    461493                // Loop through the packets we just read
    462                 for (i = 0; i < nb_packets; ++i) {
    463                        
    464                         if (packets[i]->error > 0) {
    465                                 packets[i] = (*trace->per_pkt)(trace, packets[i], NULL, t);
    466                         } else if (packets[i]->error != -2) {
    467                                 // An error this should be the last packet we read
    468                                 size_t z;
    469                                 // We could have an eof or error and a message such as pause
    470                                 for (z = i ; z < nb_packets; ++z) {
    471                                         fprintf(stderr, "i=%d nb_packet=%d err=%d\n", (int) z, (int) nb_packets, packets[z]->error);
    472                                         assert (packets[z]->error < 1);
    473                                 }
    474                                 goto stop;
    475                         }
    476                         // -2 is a message its not worth checking now just finish this lot and we'll check
    477                         // when we loop next
    478                 }
     494                if (dispatch_packets(trace, t, packets, nb_packets) == -1)
     495                        break;
    479496        }
    480497
     
    523540 * core to process it.
    524541 */
    525 static void* hasher_start(void *data) {
     542static void* hasher_entry(void *data) {
    526543        libtrace_t *trace = (libtrace_t *)data;
    527544        libtrace_thread_t * t;
     
    569586                                        assert(trace->started == false);
    570587                                        assert(trace->state == STATE_FINSHED);
     588                                        break;
    571589                                default:
    572590                                        fprintf(stderr, "Hasher thread didn't expect message code=%d\n", message.code);
     
    585603                /* Blocking write to the correct queue - I'm the only writer */
    586604                if (trace->perpkt_threads[thread].state != THREAD_FINISHED) {
     605                        uint64_t order = trace_packet_get_order(packet);
    587606                        libtrace_ringbuffer_write(&trace->perpkt_threads[thread].rbuffer, packet);
     607                        if (trace->config.tick_count && order % trace->config.tick_count == 0) {
     608                                // Write ticks to everyone else
     609                                libtrace_packet_t * pkts[trace->perpkt_thread_count];
     610                                memset(pkts, 0, sizeof(void *) * trace->perpkt_thread_count);
     611                                libtrace_ocache_alloc(&trace->packet_freelist, (void **) pkts, trace->perpkt_thread_count, trace->perpkt_thread_count);
     612                                for (i = 0; i < trace->perpkt_thread_count; i++) {
     613                                        pkts[i]->error = READ_TICK;
     614                                        trace_packet_set_order(pkts[i], order);
     615                                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, pkts[i]);
     616                                }
     617                        }
    588618                        pkt_skipped = 0;
    589619                } else {
     
    705735{
    706736        size_t i = 0;
     737        bool tick_hit = false;
    707738
    708739        nb_packets = fill_array_with_empty_packets(libtrace, packets, nb_packets);
     
    712743        for (i = 0; i < nb_packets; ++i) {
    713744                packets[i]->error = trace_read_packet(libtrace, packets[i]);
    714                 // Doing this inside the lock ensures the first packet is always
    715                 // recorded first
    716745                if (packets[i]->error <= 0) {
    717746                        ++i;
    718747                        break;
    719748                }
    720         }
     749                /*
     750                if (libtrace->config.tick_count && trace_packet_get_order(packets[i]) % libtrace->config.tick_count == 0) {
     751                        tick_hit = true;
     752                }*/
     753        }
     754        // Doing this inside the lock ensures the first packet is always
     755        // recorded first
    721756        if (packets[0]->error > 0) {
    722757                store_first_packet(libtrace, packets[0], t);
    723758        }
    724759        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
     760        /* XXX TODO this needs to be inband with packets, or we don't bother in this case
     761        if (tick_hit) {
     762                libtrace_message_t tick;
     763                tick.additional.uint64 = trace_packet_get_order(packets[i]);
     764                tick.code = MESSAGE_TICK;
     765                trace_send_message_to_perpkts(libtrace, &tick);
     766        } */
    725767        return i;
    726768}
     
    739781                libtrace_ocache_free(&libtrace->packet_freelist, (void **) packets, 1, 1);
    740782        packets[0] = libtrace_ringbuffer_read(&t->rbuffer);
    741 
    742 
    743         if (packets[0] == NULL) {
    744                 libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) packets, 1, 1);
    745                 packets[0]->error = -2;
    746         }
    747783
    748784        if (packets[0]->error < 0)
     
    756792                        break;
    757793                }
    758                 // Message wating
    759                 if (packets[i] == NULL) {
    760                         libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &packets[i], 1, 1);
    761                         packets[i]->error = -2;
    762                         ++i;
     794                // These are typically urgent
     795                if (packets[i]->error < 0)
    763796                        break;
    764                 }
    765797        }
    766798       
    767799        return i;
    768         /*if (*packet) {
    769                 return (*packet)->error;
    770         } else {
    771                 // This is how we do a notify, we send a message before hand to note that the trace is over etc.
    772                 // And this will notify the perpkt thread to read that message, this is easiest
    773                 // since cases like pause can also be dealt with this way without actually
    774                 // having to be the end of the stream.
    775                 fprintf(stderr, "Got a NULL packet from hasher therefore message waiting\n");
    776                 return -2;
    777         }*/
    778800}
    779801
     
    11661188
    11671189        while (!trace_finished(trace)) {
    1168 
    1169                 //while ( != LIBTRACE_MQ_FAILED) { }
    1170                 libtrace_message_queue_get(&t->messages, &message);
    1171 
     1190                if (trace->config.reporter_polling) {
     1191                        if (libtrace_message_queue_try_get(&t->messages, &message) == LIBTRACE_MQ_FAILED)
     1192                                message.code = MESSAGE_POST_REPORTER;
     1193                } else {
     1194                        libtrace_message_queue_get(&t->messages, &message);
     1195                }
    11721196                switch (message.code) {
    11731197                        // Check for results
     
    11981222                (*trace->reporter)(trace, &result, NULL);
    11991223        }
     1224        libtrace_vector_destroy(&results);
    12001225
    12011226        // GOODBYE
     
    13201345                        packet->trace = libtrace;
    13211346                        ret=libtrace->format->pread_packet(libtrace, t, packet);
    1322                         if (ret==(size_t)-1 || ret==(size_t)-2 || ret==0) {
     1347                        if (ret <= 0) {
    13231348                                return ret;
    13241349                        }
     
    14991524        if (libtrace->config.packet_thread_cache_size <= 0)
    15001525                libtrace->config.packet_thread_cache_size = 20;
    1501         if (libtrace->config.packet_global_cache_size <= 0)
    1502                 libtrace->config.packet_global_cache_size = (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count;
    1503 
    1504         if (libtrace->config.packet_global_cache_size <
     1526        if (libtrace->config.packet_cache_size <= 0)
     1527                libtrace->config.packet_cache_size = (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count;
     1528
     1529        if (libtrace->config.packet_cache_size <
    15051530                (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count)
    15061531                fprintf(stderr, "WARNING deadlocks may occur and extra memory allocating buffer sizes (packet_freelist_size) mismatched\n");
     
    15231548                t->state = THREAD_RUNNING;
    15241549                libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
    1525                 ASSERT_RET(pthread_create(&t->tid, NULL, hasher_start, (void *) libtrace), == 0);
     1550                ASSERT_RET(pthread_create(&t->tid, NULL, hasher_entry, (void *) libtrace), == 0);
    15261551                snprintf(name, sizeof(name), "hasher-thread");
    15271552                pthread_setname_np(t->tid, name);
     
    15341559                                                 (void (*)(void *))trace_destroy_packet,
    15351560                                                 libtrace->config.packet_thread_cache_size,
    1536                                                  libtrace->config.packet_global_cache_size * 4,
     1561                                                 libtrace->config.packet_cache_size * 4,
    15371562                                                 libtrace->config.fixed_packet_count);
    15381563        // Unused slidingwindow code
     
    15621587                t->perpkt_num = i;
    15631588                if (libtrace->hasher)
    1564                         libtrace_ringbuffer_init(&t->rbuffer, libtrace->config.hasher_queue_size, LIBTRACE_RINGBUFFER_POLLING);
     1589                        libtrace_ringbuffer_init(&t->rbuffer, libtrace->config.hasher_queue_size,
     1590                                                 libtrace->config.hasher_polling?LIBTRACE_RINGBUFFER_POLLING:0);
    15651591                // Depending on the mode vector or deque might be chosen
    15661592                libtrace_vector_init(&t->vector, sizeof(libtrace_result_t));
     
    16541680        // Special case handle the hasher thread case
    16551681        if (trace_has_dedicated_hasher(libtrace)) {
    1656                 fprintf(stderr, "Hasher thread running we deal with this special!\n");
     1682                if (libtrace->config.debug_state)
     1683                        fprintf(stderr, "Hasher thread is running, asking it to pause ...");
    16571684                libtrace_message_t message = {0};
    16581685                message.code = MESSAGE_DO_PAUSE;
     
    16641691                }
    16651692                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    1666         }
    1667 
    1668         fprintf(stderr, "Sending messages \n");
     1693                if (libtrace->config.debug_state)
     1694                        fprintf(stderr, " DONE\n");
     1695        }
     1696
     1697        if (libtrace->config.debug_state)
     1698                fprintf(stderr, "Asking perpkt threads to pause ...");
    16691699        // Stop threads, skip this one if it's a perpkt
    16701700        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
     
    16761706                                // The hasher has stopped and other threads have messages waiting therefore
    16771707                                // If the queues are empty the other threads would have no data
    1678                                 // So send some NULL packets to simply ask the threads to check there message queues
     1708                                // So send some message packets to simply ask the threads to check
    16791709                                // We are the only writer since hasher has paused
    1680                                 libtrace_ringbuffer_write(&libtrace->perpkt_threads[i].rbuffer, NULL);
     1710                                libtrace_packet_t *pkt;
     1711                                libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &pkt, 1, 1);
     1712                                pkt->error = READ_MESSAGE;
     1713                                libtrace_ringbuffer_write(&libtrace->perpkt_threads[i].rbuffer, pkt);
    16811714                        }
    16821715                } else {
     
    16851718        }
    16861719
     1720        if (t) {
     1721                // A perpkt is doing the pausing, interesting, fake an extra thread paused
     1722                // We rely on the user to *not* return before starting the trace again
     1723                thread_change_state(libtrace, t, THREAD_PAUSED, true);
     1724        }
     1725
     1726        // Wait for all threads to pause
     1727        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
     1728        while(libtrace->perpkt_thread_states[THREAD_RUNNING]) {
     1729                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
     1730        }
     1731        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
     1732
     1733        if (libtrace->config.debug_state)
     1734                fprintf(stderr, " DONE\n");
     1735
    16871736        // Deal with the reporter
    16881737        if (trace_has_dedicated_reporter(libtrace)) {
    1689                 fprintf(stderr, "Reporter thread running we deal with this special!\n");
     1738                if (libtrace->config.debug_state)
     1739                        fprintf(stderr, "Reporter thread is running, asking it to pause ...");
    16901740                libtrace_message_t message = {0};
    16911741                message.code = MESSAGE_DO_PAUSE;
     
    16971747                }
    16981748                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    1699         }
    1700 
    1701         // Formats must support native message handling if a message is ready
    1702         // Approach per Perry's suggestion is a non-blocking read
    1703         // followed by a blocking read. XXX STRIP THIS OUT
    1704 
    1705         if (t) {
    1706                 // A perpkt is doing the pausing, interesting, fake an extra thread paused
    1707                 // We rely on the user to *not* return before starting the trace again
    1708                 thread_change_state(libtrace, t, THREAD_PAUSED, true);
    1709         }
    1710 
    1711         fprintf(stderr, "Asking threads to pause\n");
    1712 
    1713         // Wait for all threads to pause
    1714         ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
    1715         while(libtrace->perpkt_thread_states[THREAD_RUNNING]) {
    1716                 ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
    1717         }
    1718         ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    1719 
    1720         fprintf(stderr, "Threads have paused\n");
     1749                if (libtrace->config.debug_state)
     1750                        fprintf(stderr, " DONE\n");
     1751        }
    17211752
    17221753        if (trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
     
    18661897
    18671898        /* Now the hasher */
    1868         // XXX signal it to stop if it hasn't already we should never be in this situation!!
    18691899        if (trace_has_dedicated_hasher(libtrace)) {
    1870                 fprintf(stderr, "Waiting to join with the hasher\n");
    18711900                pthread_join(libtrace->hasher_thread.tid, NULL);
    1872                 fprintf(stderr, "Joined with the hasher\n");
    18731901                assert(libtrace->hasher_thread.state == THREAD_FINISHED);
    18741902        }
     
    18921920
    18931921        if (trace_has_dedicated_reporter(libtrace)) {
    1894                 fprintf(stderr, "Waiting to join with the reporter\n");
    18951922                pthread_join(libtrace->reporter_thread.tid, NULL);
    1896                 fprintf(stderr, "Joined with the reporter\n");
    18971923                assert(libtrace->reporter_thread.state == THREAD_FINISHED);
    18981924        }
     
    19021928                libtrace_message_t msg = {0};
    19031929                msg.code = MESSAGE_DO_STOP;
    1904                 fprintf(stderr, "Waiting to join with the keepalive\n");
    19051930                trace_send_message_to_thread(libtrace, &libtrace->keepalive_thread, &msg);
    19061931                pthread_join(libtrace->keepalive_thread.tid, NULL);
    1907                 fprintf(stderr, "Joined with with the keepalive\n");
    19081932        }
    19091933       
     
    22262250                                libtrace->tracetime = 0;
    22272251                        return 0;
     2252                case TRACE_OPTION_SET_CONFIG:
     2253                        libtrace->config = *((struct user_configuration *) value);
     2254                case TRACE_OPTION_GET_CONFIG:
     2255                        *((struct user_configuration *) value) = libtrace->config;
    22282256        }
    22292257        return 0;
     2258}
     2259
     2260static bool config_bool_parse(char *value, size_t nvalue) {
     2261        if (strncmp(value, "true", nvalue) == 0)
     2262                return true;
     2263        else if (strncmp(value, "false", nvalue) == 0)
     2264                return false;
     2265        else
     2266                return strtoll(value, NULL, 10) != 0;
     2267}
     2268
     2269static void config_string(struct user_configuration *uc, char *key, size_t nkey, char *value, size_t nvalue) {
     2270        assert(key);
     2271        assert(value);
     2272        assert(uc);
     2273        if (strncmp(key, "packet_cache_size", nkey) == 0
     2274                || strncmp(key, "pcs", nkey) == 0) {
     2275                uc->packet_cache_size = strtoll(value, NULL, 10);
     2276        } else if (strncmp(key, "packet_thread_cache_size", nkey) == 0
     2277                           || strncmp(key, "ptcs", nkey) == 0) {
     2278                uc->packet_thread_cache_size = strtoll(value, NULL, 10);
     2279        } else if (strncmp(key, "fixed_packet_count", nkey) == 0
     2280                  || strncmp(key, "fpc", nkey) == 0) {
     2281                uc->fixed_packet_count = config_bool_parse(value, nvalue);
     2282        } else if (strncmp(key, "burst_size", nkey) == 0
     2283                   || strncmp(key, "bs", nkey) == 0) {
     2284                uc->burst_size = strtoll(value, NULL, 10);
     2285        } else if (strncmp(key, "tick_interval", nkey) == 0
     2286                   || strncmp(key, "ti", nkey) == 0) {
     2287                uc->tick_interval = strtoll(value, NULL, 10);
     2288        } else if (strncmp(key, "tick_count", nkey) == 0
     2289                   || strncmp(key, "tc", nkey) == 0) {
     2290                uc->tick_count = strtoll(value, NULL, 10);
     2291        } else if (strncmp(key, "perpkt_threads", nkey) == 0
     2292                   || strncmp(key, "pt", nkey) == 0) {
     2293                uc->perpkt_threads = strtoll(value, NULL, 10);
     2294        } else if (strncmp(key, "hasher_queue_size", nkey) == 0
     2295                   || strncmp(key, "hqs", nkey) == 0) {
     2296                uc->hasher_queue_size = strtoll(value, NULL, 10);
     2297        } else if (strncmp(key, "hasher_polling", nkey) == 0
     2298                   || strncmp(key, "hp", nkey) == 0) {
     2299                uc->hasher_polling = config_bool_parse(value, nvalue);
     2300        } else if (strncmp(key, "reporter_polling", nkey) == 0
     2301                   || strncmp(key, "rp", nkey) == 0) {
     2302                uc->reporter_polling = config_bool_parse(value, nvalue);
     2303        } else if (strncmp(key, "reporter_thold", nkey) == 0
     2304                   || strncmp(key, "rt", nkey) == 0) {
     2305                uc->reporter_thold = strtoll(value, NULL, 10);
     2306        } else if (strncmp(key, "debug_state", nkey) == 0
     2307                   || strncmp(key, "ds", nkey) == 0) {
     2308                uc->debug_state = config_bool_parse(value, nvalue);
     2309        } else {
     2310                fprintf(stderr, "No matching value %s(=%s)\n", key, value);
     2311        }
     2312}
     2313
     2314DLLEXPORT void parse_user_config(struct user_configuration* uc, char * str) {
     2315        char *pch;
     2316        char key[100];
     2317        char value[100];
     2318        assert(str);
     2319        assert(uc);
     2320        printf ("Splitting string \"%s\" into tokens:\n",str);
     2321        pch = strtok (str," ,.-");
     2322        while (pch != NULL)
     2323        {
     2324                if (sscanf(pch, "%99[^=]=%99s", key, value) == 2) {
     2325                        config_string(uc, key, sizeof(key), value, sizeof(value));
     2326                } else {
     2327                        fprintf(stderr, "Error parsing %s\n", pch);
     2328                }
     2329                pch = strtok (NULL," ,.-");
     2330        }
     2331}
     2332
     2333DLLEXPORT void parse_user_config_file(struct user_configuration* uc, FILE *file) {
     2334        char line[1024];
     2335        while (fgets(line, sizeof(line), file) != NULL)
     2336        {
     2337                parse_user_config(uc, line);
     2338        }
    22302339}
    22312340
  • tools/traceanon/traceanon_parallel.c

    rf051c1b r5b4d121  
    198198                        case MESSAGE_STARTING:
    199199                                enc_init(enc_type,key);
     200                        break;
     201                        case MESSAGE_TICK:
     202                                trace_publish_result(trace, t, mesg->additional.uint64, NULL, RESULT_TICK);
    200203                }
    201204        }
     
    207210static void* write_out(libtrace_t *trace, libtrace_result_t *result, UNUSED libtrace_message_t *mesg) {
    208211        static uint64_t packet_count = 0; // TESTING PURPOSES, this is not going to work with a live format
     212
    209213        if (result) {
    210                 libtrace_packet_t *packet = (libtrace_packet_t*) libtrace_result_get_value(result);
    211                 assert(libtrace_result_get_key(result) == packet_count++);
    212                 if (trace_write_packet(writer,packet)==-1) {
    213                         trace_perror_output(writer,"writer");
    214                         trace_interrupt();
    215                 }
    216                 trace_free_result_packet(trace, packet);
     214                if (result->type == RESULT_PACKET) {
     215                        libtrace_packet_t *packet = (libtrace_packet_t*) libtrace_result_get_value(result);
     216                        assert(libtrace_result_get_key(result) == packet_count++);
     217                        if (trace_write_packet(writer,packet)==-1) {
     218                                trace_perror_output(writer,"writer");
     219                                trace_interrupt();
     220                        }
     221                        trace_free_result_packet(trace, packet);
     222
     223                } else {
     224                        assert(result->type == RESULT_TICK);
     225                        // Ignore it
     226                }
    217227        }
    218228        return NULL;
     
    228238        char *compress_type_str=NULL;
    229239        trace_option_compresstype_t compress_type = TRACE_OPTION_COMPRESSTYPE_NONE;
    230 
     240        struct user_configuration uc;
     241        ZERO_USER_CONFIG(uc);
    231242
    232243        if (argc<2)
     
    244255                        { "compress-type",      1, 0, 'Z' },
    245256                        { "libtrace-help",      0, 0, 'H' },
     257                        { "config",             1, 0, 'u' },
     258                    { "config-file",            1, 0, 'U' },
    246259                        { NULL,                 0, 0, 0   },
    247260                };
    248261
    249                 int c=getopt_long(argc, argv, "Z:z:sc:f:dp:H",
     262                int c=getopt_long(argc, argv, "Z:z:sc:f:dp:Hu:U:",
    250263                                long_options, &option_index);
    251264
     
    297310                                  exit(1);
    298311                                  break;
     312                        case 'u':
     313                                  parse_user_config(&uc, optarg);
     314                                  break;
     315                        case 'U':;
     316                                FILE * f = fopen(optarg, "r");
     317                                if (f != NULL) {
     318                                        parse_user_config_file(&uc, f);
     319                                } else {
     320                                        perror("Failed to open configuration file\n");
     321                                        usage(argv[0]);
     322                                }
     323                                break;
    299324                        default:
    300325                                fprintf(stderr,"unknown option: %c\n",c);
     
    391416         
    392417        int i = 1;
    393         trace_parallel_config(trace, TRACE_OPTION_SEQUENTIAL, &i);
     418        trace_parallel_config(trace, TRACE_OPTION_ORDERED, &i);
    394419        //trace_parallel_config(trace, TRACE_OPTION_SET_PERPKT_BUFFER_SIZE, &i);
    395         i = 2;
     420        i = 6;
    396421    trace_parallel_config(trace, TRACE_OPTION_SET_PERPKT_THREAD_COUNT, &i);
     422        trace_parallel_config(trace, TRACE_OPTION_SET_CONFIG, &uc);
     423        trace_set_hasher(trace, HASHER_CUSTOM, bad_hash, NULL);
    397424       
    398425        if (trace_pstart(trace, NULL, &per_packet, &write_out)==-1) {
Note: See TracChangeset for help on using the changeset viewer.