Ignore:
Timestamp:
06/18/14 16:12:46 (7 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
85e87b5
Parents:
82facc5
Message:

Adds a configuration option for the tick messages.
Adds the trace_information structure which contains information about traces.
Updates trace_rt_stats to use both of these.

Replaced libtrace_t->joined internally with a state

File:
1 edited

Legend:

Unmodified
Added
Removed
  • lib/trace_parallel.c

    r82facc5 rb13b939  
    492492        // Doing this inside the lock ensures the first packet is always
    493493        // recorded first
    494         store_first_packet(libtrace, *packet, t);
     494        if ((*packet)->error > 0)
     495                store_first_packet(libtrace, *packet, t);
    495496
    496497        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     
    912913        libtrace_message_t message = {0};
    913914        libtrace_t *trace = (libtrace_t *)data;
    914         int delay_usec = 1000000; // ! second hard coded !!
    915915        uint64_t next_release;
    916916        fprintf(stderr, "keepalive thread is starting\n");
    917         // TODO mark this thread as running against the libtrace object
     917
    918918        gettimeofday(&prev, NULL);
    919919        message.code = MESSAGE_TICK;
    920920        while (trace->state != STATE_FINSHED) {
    921921                fd_set rfds;
    922                 next_release = tv_to_usec(&prev) + delay_usec;
     922                next_release = tv_to_usec(&prev) + (trace->tick_interval * 1000);
    923923                gettimeofday(&next, NULL);
    924924                if (next_release > tv_to_usec(&next)) {
     
    11651165
    11661166        // If we are using a hasher start it
    1167         if (libtrace->hasher && libtrace->hasher_thread.type == THREAD_HASHER) {
     1167        if (libtrace->hasher || libtrace->hasher_thread.type == THREAD_HASHER) {
    11681168                libtrace_thread_t *t = &libtrace->hasher_thread;
    11691169                t->trace = libtrace;
     
    12331233                threads_started = trace_start_perpkt_threads(libtrace);
    12341234
    1235         libtrace->keepalive_thread.type = THREAD_KEEPALIVE;
    1236         libtrace->keepalive_thread.state = THREAD_RUNNING;
    1237         libtrace_message_queue_init(&libtrace->keepalive_thread.messages, sizeof(libtrace_message_t));
    1238         assert(pthread_create(&libtrace->keepalive_thread.tid, NULL, keepalive_entry, (void *) libtrace) == 0);
     1235        if (libtrace->tick_interval > 0) {
     1236                libtrace->keepalive_thread.type = THREAD_KEEPALIVE;
     1237                libtrace->keepalive_thread.state = THREAD_RUNNING;
     1238                libtrace_message_queue_init(&libtrace->keepalive_thread.messages, sizeof(libtrace_message_t));
     1239                assert(pthread_create(&libtrace->keepalive_thread.tid, NULL, keepalive_entry, (void *) libtrace) == 0);
     1240        }
    12391241
    12401242        // Revert back - Allow signals again
     
    15101512        }
    15111513        libtrace->state = STATE_FINSHED;
    1512         // Wait for the ticker thread
    15131514       
    1514         if (libtrace->keepalive_thread.state != THREAD_FINISHED) {
     1515        // Wait for the tick (keepalive) thread if its been started
     1516        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE && libtrace->keepalive_thread.state == THREAD_RUNNING) {
    15151517                libtrace_message_t msg = {0};
    15161518                msg.code = MESSAGE_DO_STOP;
     
    15181520                trace_send_message_to_thread(libtrace, &libtrace->keepalive_thread, &msg);
    15191521                pthread_join(libtrace->keepalive_thread.tid, NULL);
    1520         }
    1521         fprintf(stderr, "Joined with with the keepalive\n");
    1522        
    1523 
     1522                fprintf(stderr, "Joined with with the keepalive\n");
     1523        }
    15241524        // Lets mark this as done for now
    1525         libtrace->joined = true;
     1525        libtrace->state = STATE_JOINED;
    15261526}
    15271527
     
    17721772                while ((live_count == libtrace->perpkt_thread_count) || (live_count &&
    17731773                                ((flags & REDUCE_SEQUENTIAL && min_key == libtrace->expected_key) ||
    1774                                 libtrace->joined))) {
     1774                                libtrace->state == STATE_JOINED))) {
    17751775                        /* Get the minimum queue and then do stuff */
    17761776                        libtrace_result_t r;
     
    18581858        UNUSED int ret = -1;
    18591859        switch (option) {
     1860                case TRACE_OPTION_TICK_INTERVAL:
     1861                        libtrace->tick_interval = *((int *) value);
     1862                        return 1;
    18601863                case TRACE_OPTION_SET_HASHER:
    18611864                        return trace_set_hasher(libtrace, (enum hasher_types) *((int *) value), NULL, NULL);
     
    19291932        }
    19301933}
     1934
     1935DLLEXPORT libtrace_info_t *trace_get_information(libtrace_t * libtrace) {
     1936        if (libtrace->format)
     1937                return &libtrace->format->info;
     1938        else
     1939                return NULL;
     1940}
Note: See TracChangeset for help on using the changeset viewer.