Changeset be3f75b


Ignore:
Timestamp:
07/30/14 19:17:14 (6 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
f9a70ca
Parents:
a49a9eb
Message:

Fixs statistic counters for parallel traces

Location:
lib
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • lib/libtrace_int.h

    ra49a9eb rbe3f75b  
    204204 */
    205205struct libtrace_thread_t {
     206        int accepted_packets; // The number of packets accepted only used if pread
     207        // is retreving packets
     208        // Set to true once the first packet has been stored
     209        bool recorded_first;
     210        // For thread safety reason we actually must store this here
     211        int64_t tracetime_offset_usec;
     212        void* user_data; // TLS for the user to use
     213        void* format_data; // TLS for the format to use
     214        libtrace_message_queue_t messages; // Message handling
     215        libtrace_ringbuffer_t rbuffer; // Input
     216        libtrace_vector_t vector; // Output
     217        libtrace_queue_t deque; // Real Output type makes more sense
    206218        libtrace_t * trace;
    207219        void* ret;
    208220        enum thread_types type;
    209221        enum thread_states state;
    210         void* user_data; // TLS for the user to use
    211         void* format_data; // TLS for the format to use
    212222        pthread_t tid;
    213223        int perpkt_num; // A number from 0-X that represents this perpkt threads number
    214224                                // in the table, intended to quickly identify this thread
    215225                                // -1 represents NA (such as the case this is not a perpkt thread)
    216         libtrace_ringbuffer_t rbuffer; // Input
    217         libtrace_vector_t vector; // Output
    218         libtrace_queue_t deque; // Real Output type makes more sense
    219         libtrace_message_queue_t messages; // Message handling
    220         // Temp storage for time sensitive results
    221         uint64_t tmp_key;
    222         void *tmp_data;
    223         pthread_spinlock_t tmp_spinlock;
    224         // Set to true once the first packet has been stored
    225         bool recorded_first;
    226         // For thread safety reason we actually must store this here
    227         int64_t tracetime_offset_usec;
    228226};
    229227
     
    347345        struct first_packets first_packets;
    348346        int tracetime;
     347
     348        /*
     349         * Caches statistic counters in the case that our trace is
     350         * paused or stopped before this counter is taken
     351         */
     352        uint64_t dropped_packets;
     353        uint64_t received_packets;
    349354};
    350355
  • lib/trace.c

    ra49a9eb rbe3f75b  
    286286        libtrace->first_packets.count = 0;
    287287        libtrace->first_packets.packets = NULL;
     288        libtrace->dropped_packets = UINT64_MAX;
     289        libtrace->accepted_packets = UINT64_MAX;
    288290
    289291        /* Parse the URI to determine what sort of trace we are dealing with */
     
    19401942{
    19411943        assert(trace);
     1944        uint64_t ret;
     1945
    19421946        if (trace->format->get_received_packets) {
    1943                 return trace->format->get_received_packets(trace);
    1944         }
    1945         return (uint64_t)-1;
     1947                if ((ret = trace->format->get_received_packets(trace)) != UINT64_MAX)
     1948                        return ret;
     1949        }
     1950        // Read this cached value taken before the trace was closed
     1951        return trace->received_packets;
    19461952}
    19471953
     
    19671973{
    19681974        assert(trace);
     1975        uint64_t ret;
     1976
    19691977        if (trace->format->get_dropped_packets) {
    1970                 return trace->format->get_dropped_packets(trace);
    1971         }
    1972         return (uint64_t)-1;
     1978                if ((ret = trace->format->get_dropped_packets(trace)) != UINT64_MAX)
     1979                        return ret;
     1980        }
     1981        // Read this cached value taken before the trace was closed
     1982        return trace->dropped_packets;
    19731983}
    19741984
     
    19761986{
    19771987        assert(trace);
    1978         return trace->accepted_packets;
     1988        int i = 0;
     1989        uint64_t ret = trace->accepted_packets;
     1990        for (i = 0; i < trace->perpkt_thread_count; i++) {
     1991                ret += trace->perpkt_threads[i].accepted_packets;
     1992        }
     1993        return ret;
    19791994}
    19801995
  • lib/trace_parallel.c

    ra49a9eb rbe3f75b  
    279279        t->recorded_first = false;
    280280        t->perpkt_num = -1;
     281        t->accepted_packets = 0;
    281282}
    282283
     
    12481249                                                libtrace->snaplen);
    12491250                        }
    1250                         trace_packet_set_order(packet, libtrace->accepted_packets);
    1251                         ++libtrace->accepted_packets;
     1251                       
     1252                        ++t->accepted_packets;
     1253                        // TODO look into this better
     1254                        trace_packet_set_order(packet, trace_get_erf_timestamp(packet));
     1255                        //trace_packet_set_order(packet, libtrace->accepted_packets);
     1256                        //++libtrace->accepted_packets;
    12521257                        return ret;
    12531258                } while(1);
     
    14701475                libtrace_deque_init(&t->deque, sizeof(libtrace_result_t));
    14711476                libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
    1472                 t->tmp_key = 0;
    1473                 t->tmp_data = NULL;
    14741477                t->recorded_first = false;
    1475                 ASSERT_RET(pthread_spin_init(&t->tmp_spinlock, 0), == 0);
    14761478                t->tracetime_offset_usec = 0;;
    14771479        }
     
    16021604
    16031605        if (trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
     1606                uint64_t tmp_stats;
     1607                libtrace->dropped_packets = trace_get_dropped_packets(libtrace);
     1608                libtrace->received_packets = trace_get_received_packets(libtrace);
     1609                if (libtrace->format->get_filtered_packets) {
     1610                        if ((tmp_stats = libtrace->format->get_filtered_packets(libtrace)) != UINT64_MAX) {
     1611                                libtrace->filtered_packets += tmp_stats;
     1612                        }
     1613                }
    16041614                libtrace->started = false;
    16051615                if (libtrace->format->ppause_input)
Note: See TracChangeset for help on using the changeset viewer.