Changeset 526d9d0


Ignore:
Timestamp:
02/24/15 17:18:21 (6 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
116f970
Parents:
10c47a0
Message:

Move the accepted packet count to dispatching packets for threads.
Accounting for the doubled count when using a single threaded format.

Rework statistics logic slightly to remove duplicated code.

Location:
lib
Files:
6 edited

Legend:

Unmodified
Added
Removed
  • lib/format_dag25.c

    r10c47a0 r526d9d0  
    14471447        tmp = FORMAT_DATA_HEAD;
    14481448
    1449         /* We don't filter packets on the card itself */
    1450         stat->filtered_valid = 1;
    1451         stat->filtered = 0;
    1452 
    1453         /* Dropped, filtered the  */
     1449        /* Dropped packets */
    14541450        stat->dropped_valid = 1;
    14551451        stat->dropped = 0;
     
    15121508        libtrace_list_node_t *node;
    15131509
    1514         if (reader) {
    1515                 if (t->type == THREAD_PERPKT) {
    1516 
    1517                         node = libtrace_list_get_index(FORMAT_DATA->per_stream,
    1518                                                         t->perpkt_num);
    1519                         if (node == NULL) {
    1520                                 trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
    1521                                               "Too few streams supplied for the"
    1522                                               " number of threads lanuched");
    1523                                 return -1;
    1524                         }
    1525                         stream_data = node->data;
    1526 
    1527                         /* Pass the per thread data to the thread */
    1528                         t->format_data = stream_data;
    1529 
    1530                         /* Attach and start the DAG stream */
    1531                         printf("t%u: starting and attaching stream #%u\n",
    1532                                t->perpkt_num, stream_data->dagstream);
    1533                         if (dag_start_input_stream(libtrace, stream_data) < 0)
    1534                                 return -1;
    1535                 } else {
    1536                         /* TODO: Figure out why t->type != THREAD_PERPKT in
    1537                          * order to figure out what this line does */
    1538                         t->format_data = FORMAT_DATA_FIRST;
    1539                 }
     1510        if (reader && t->type == THREAD_PERPKT) {
     1511                fprintf(stderr, "t%u: registered reader thread", t->perpkt_num);
     1512                node = libtrace_list_get_index(FORMAT_DATA->per_stream,
     1513                                                t->perpkt_num);
     1514                if (node == NULL) {
     1515                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
     1516                                      "Too few streams supplied for the"
     1517                                      " number of threads lanuched");
     1518                        return -1;
     1519                }
     1520                stream_data = node->data;
     1521
     1522                /* Pass the per thread data to the thread */
     1523                t->format_data = stream_data;
     1524
     1525                /* Attach and start the DAG stream */
     1526                printf("t%u: starting and attaching stream #%u\n",
     1527                       t->perpkt_num, stream_data->dagstream);
     1528                if (dag_start_input_stream(libtrace, stream_data) < 0)
     1529                        return -1;
    15401530        }
    15411531
  • lib/format_linux_common.c

    re4f27d1 r526d9d0  
    596596
    597597        /* filtered count == dev received - socket received */
    598         if (FORMAT_DATA->filter == NULL) {
    599                 stat->filtered_valid = 1;
    600                 stat->filtered = 0;
    601         } else if (FORMAT_DATA->stats_valid && dev_stats.if_name[0]) {
    602                 stat->filtered_valid = 1;
    603                 stat->filtered = DEV_DIFF(rx_packets) -
    604                                  FORMAT_DATA->stats.tp_packets;
    605                 if (stat->filtered > UINT64_MAX - 100000) {
    606                         stat->filtered = 0;
     598        if (FORMAT_DATA->filter != NULL &&
     599            FORMAT_DATA->stats_valid &&
     600            dev_stats.if_name[0]) {
     601                uint64_t filtered = DEV_DIFF(rx_packets) -
     602                                    FORMAT_DATA->stats.tp_packets;
     603                /* Check the value is sane, due to timing it could be below 0 */
     604                if (filtered < UINT64_MAX - 100000) {
     605                        stat->filtered += filtered;
    607606                }
    608607        }
  • lib/libtrace.h.in

    r5ab626a r526d9d0  
    249249typedef struct libtrace_filter_t libtrace_filter_t;
    250250
     251/** Opaque structure holding information about libtrace thread */
    251252typedef struct libtrace_thread_t libtrace_thread_t;
    252253
  • lib/libtrace_int.h

    r10c47a0 r526d9d0  
    836836        uint64_t (*get_dropped_packets)(libtrace_t *trace);
    837837
    838         /** Returns statistics about a trace. Flags are all set to 0 when
    839          * invoked.
     838        /** Returns statistics about a trace.
    840839         *
    841840         * @param trace The libtrace object
    842          * @param stat A zeroed structure ready to be filled.
     841         * @param stat [in,out] A statistics structure ready to be filled
     842         *
     843         * The filtered and accepted statistics will be set to the values
     844         * stored in the library. All other statistics are not set.
     845         *
     846         * @note If filtering of packets is performed by a trace and the number
     847         * of filtered packets is unknown this should be marked as invalid by
     848         * the format.
    843849         */
    844850        void (*get_statistics)(libtrace_t *trace, libtrace_stat_t *stat);
     
    955961        void (*punregister_thread)(libtrace_t *libtrace, libtrace_thread_t *t);
    956962
    957         /**
    958          * Return statistics for a single thread.
     963        /** Returns statistics for a single thread.
     964         *
     965         * @param trace The libtrace object
     966         * @param t The thread to return statistics for
     967         * @param stat [in,out] A statistics structure ready to be filled
     968         *
     969         * The filtered and accepted statistics will be set to the values
     970         * stored in the library. All other statistics are not set.
     971         *
     972         * @note If filtering of packets is performed by a trace and the number
     973         * of filtered packets is unknown this should be marked as invalid by
     974         * the format.
    959975         */
    960976        void (*get_thread_statistics)(libtrace_t *libtrace,
  • lib/trace.c

    r694823f r526d9d0  
    20272027        assert(trace);
    20282028        int i = 0;
    2029         uint64_t ret = trace->accepted_packets;
     2029        uint64_t ret = 0;
     2030        /* We always add to a thread's accepted count before dispatching the
     2031         * packet to the user. However if the underlying trace is single
     2032         * threaded it will also be increasing the global count. So if we
     2033         * find perpkt ignore the global count.
     2034         */
    20302035        for (i = 0; i < trace->perpkt_thread_count; i++) {
    20312036                ret += trace->perpkt_threads[i].accepted_packets;
    20322037        }
    2033         return ret;
     2038        return ret ? ret : trace->accepted_packets;
    20342039}
    20352040
     
    20612066        LIBTRACE_STAT_FIELDS;
    20622067#undef X
    2063         if (trace->format->get_statistics) {
    2064                 trace->format->get_statistics(trace, stat);
    2065                 ret = trace_get_accepted_packets(trace);
    2066                 if (ret != UINT64_MAX) {
    2067                         stat->accepted_valid = 1;
    2068                         stat->accepted = ret;
    2069                 }
    2070                 /* Now add on any library filtered packets */
    2071                 if (stat->filtered_valid) {
    2072                         stat->filtered += trace->filtered_packets;
    2073                         for (i = 0; i < trace->perpkt_thread_count; i++) {
    2074                                 stat->filtered += trace->perpkt_threads[i].filtered_packets;
    2075                         }
    2076                 }
    2077                 return stat;
    2078         }
     2068        /* Both accepted and filtered are stored against in the library */
    20792069        ret = trace_get_accepted_packets(trace);
    20802070        if (ret != UINT64_MAX) {
     
    20822072                stat->accepted = ret;
    20832073        }
    2084         ret = trace_get_received_packets(trace);
    2085         if (ret != UINT64_MAX) {
    2086                 stat->received_valid = 1;
    2087                 stat->received = ret;
    2088         }
    2089         /* Fallback to the old way */
    2090         ret = trace_get_dropped_packets(trace);
    2091         if (ret != UINT64_MAX) {
    2092                 stat->dropped_valid = 1;
    2093                 stat->dropped = ret;
    2094         }
    2095         ret = trace_get_filtered_packets(trace);
    2096         if (ret != UINT64_MAX) {
    2097                 stat->filtered_valid = 1;
    2098                 stat->filtered = ret;
     2074
     2075        stat->filtered_valid = 1;
     2076        stat->filtered = trace->filtered_packets;
     2077        for (i = 0; i < trace->perpkt_thread_count; i++) {
     2078                stat->filtered += trace->perpkt_threads[i].filtered_packets;
     2079        }
     2080
     2081        if (trace->format->get_statistics) {
     2082                trace->format->get_statistics(trace, stat);
     2083        } else {
     2084                /* Fallback to the old way */
     2085                ret = trace_get_received_packets(trace);
     2086                if (ret != UINT64_MAX) {
     2087                        stat->received_valid = 1;
     2088                        stat->received = ret;
     2089                }
     2090                ret = trace_get_dropped_packets(trace);
     2091                if (ret != UINT64_MAX) {
     2092                        stat->dropped_valid = 1;
     2093                        stat->dropped = ret;
     2094                }
    20992095        }
    21002096        return stat;
     
    21122108        LIBTRACE_STAT_FIELDS;
    21132109#undef X
    2114         if (trace->format->get_thread_statistics) {
     2110        stat->accepted_valid = 1;
     2111        stat->accepted = t->accepted_packets;
     2112        stat->filtered_valid = 1;
     2113        stat->filtered = t->filtered_packets;
     2114        if (!trace_has_dedicated_hasher(trace) && trace->format->get_thread_statistics) {
    21152115                trace->format->get_thread_statistics(trace, t, stat);
    2116         }
    2117         if (t->accepted_packets != UINT64_MAX) {
    2118                 stat->accepted_valid = 1;
    2119                 stat->accepted = t->accepted_packets;
    2120         }
    2121         /* Now add on any library filtered packets */
    2122         if (stat->filtered_valid) {
    2123                 stat->filtered += t->filtered_packets;
    21242116        }
    21252117        return;
  • lib/trace_parallel.c

    r10c47a0 r526d9d0  
    173173 * @return true if the trace has dedicated hasher thread otherwise false.
    174174 */
    175 static inline bool trace_has_dedicated_hasher(libtrace_t * libtrace)
     175inline bool trace_has_dedicated_hasher(libtrace_t * libtrace)
    176176{
    177177        return libtrace->hasher_thread.type == THREAD_HASHER;
     
    412412                                return READ_MESSAGE;
    413413                }
     414                t->accepted_packets++;
    414415                *packet = (*trace->per_pkt)(trace, *packet, NULL, t);
    415416                trace_fin_packet(*packet);
     
    732733                assert(packet);
    733734
    734                 if (libtrace_halt) // Signal to die has been sent - TODO
     735                if (libtrace_halt) {
     736                        packet->error = 0;
    735737                        break;
     738                }
    736739
    737740                // Check for messages that we expect MESSAGE_DO_PAUSE, (internal messages only)
     
    750753                                        break;
    751754                                case MESSAGE_DO_STOP:
    752                                         // Stop called after pause
    753755                                        assert(trace->started == false);
    754756                                        assert(trace->state == STATE_FINSHED);
     757                                        /* Mark the current packet as EOF */
     758                                        packet->error = 0;
    755759                                        break;
    756760                                default:
     
    761765                }
    762766
    763                 if ((packet->error = trace_read_packet(trace, packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
     767                if ((packet->error = trace_read_packet(trace, packet)) <1) {
    764768                        break; /* We are EOF or error'd either way we stop  */
    765769                }
     
    14431447                                trace_packet_set_order(packets[i], trace_get_erf_timestamp(packets[i]));
    14441448                        }
    1445                         t->accepted_packets += ret;
    14461449                } while(ret == 0);
    14471450                return ret;
     
    16811684         */
    16821685        if (trace_has_dedicated_hasher(libtrace)) {
     1686                libtrace->hasher_thread.type = THREAD_EMPTY;
    16831687                ret = trace_start_thread(libtrace, &libtrace->hasher_thread,
    16841688                                   THREAD_HASHER, hasher_entry, -1,
Note: See TracChangeset for help on using the changeset viewer.