Changeset 526d9d0
- Timestamp:
- 02/24/15 17:18:21 (6 years ago)
- Branches:
- 4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
- Children:
- 116f970
- Parents:
- 10c47a0
- Location:
- lib
- Files:
-
- 6 edited
Legend:
- Unmodified
- Added
- Removed
-
lib/format_dag25.c
r10c47a0 r526d9d0 1447 1447 tmp = FORMAT_DATA_HEAD; 1448 1448 1449 /* We don't filter packets on the card itself */ 1450 stat->filtered_valid = 1; 1451 stat->filtered = 0; 1452 1453 /* Dropped, filtered the */ 1449 /* Dropped packets */ 1454 1450 stat->dropped_valid = 1; 1455 1451 stat->dropped = 0; … … 1512 1508 libtrace_list_node_t *node; 1513 1509 1514 if (reader) { 1515 if (t->type == THREAD_PERPKT) { 1516 1517 node = libtrace_list_get_index(FORMAT_DATA->per_stream, 1518 t->perpkt_num); 1519 if (node == NULL) { 1520 trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, 1521 "Too few streams supplied for the" 1522 " number of threads lanuched"); 1523 return -1; 1524 } 1525 stream_data = node->data; 1526 1527 /* Pass the per thread data to the thread */ 1528 t->format_data = stream_data; 1529 1530 /* Attach and start the DAG stream */ 1531 printf("t%u: starting and attaching stream #%u\n", 1532 t->perpkt_num, stream_data->dagstream); 1533 if (dag_start_input_stream(libtrace, stream_data) < 0) 1534 return -1; 1535 } else { 1536 /* TODO: Figure out why t->type != THREAD_PERPKT in 1537 * order to figure out what this line does */ 1538 t->format_data = FORMAT_DATA_FIRST; 1539 } 1510 if (reader && t->type == THREAD_PERPKT) { 1511 fprintf(stderr, "t%u: registered reader thread", t->perpkt_num); 1512 node = libtrace_list_get_index(FORMAT_DATA->per_stream, 1513 t->perpkt_num); 1514 if (node == NULL) { 1515 trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, 1516 "Too few streams supplied for the" 1517 " number of threads lanuched"); 1518 return -1; 1519 } 1520 stream_data = node->data; 1521 1522 /* Pass the per thread data to the thread */ 1523 t->format_data = stream_data; 1524 1525 /* Attach and start the DAG stream */ 1526 printf("t%u: starting and attaching stream #%u\n", 1527 t->perpkt_num, stream_data->dagstream); 1528 if (dag_start_input_stream(libtrace, stream_data) < 0) 1529 return -1; 1540 1530 } 1541 1531 -
lib/format_linux_common.c
re4f27d1 r526d9d0 596 596 597 597 /* filtered count == dev received - socket received */ 598 if (FORMAT_DATA->filter == NULL) { 599 stat->filtered_valid = 1; 600 stat->filtered = 0; 601 } else if (FORMAT_DATA->stats_valid && dev_stats.if_name[0]) { 602 stat->filtered_valid = 1; 603 stat->filtered = DEV_DIFF(rx_packets) - 604 FORMAT_DATA->stats.tp_packets; 605 if (stat->filtered > UINT64_MAX - 100000) { 606 stat->filtered = 0; 598 if (FORMAT_DATA->filter != NULL && 599 FORMAT_DATA->stats_valid && 600 dev_stats.if_name[0]) { 601 uint64_t filtered = DEV_DIFF(rx_packets) - 602 FORMAT_DATA->stats.tp_packets; 603 /* Check the value is sane, due to timing it could be below 0 */ 604 if (filtered < UINT64_MAX - 100000) { 605 stat->filtered += filtered; 607 606 } 608 607 } -
lib/libtrace.h.in
r5ab626a r526d9d0 249 249 typedef struct libtrace_filter_t libtrace_filter_t; 250 250 251 /** Opaque structure holding information about libtrace thread */ 251 252 typedef struct libtrace_thread_t libtrace_thread_t; 252 253 -
lib/libtrace_int.h
r10c47a0 r526d9d0 836 836 uint64_t (*get_dropped_packets)(libtrace_t *trace); 837 837 838 /** Returns statistics about a trace. Flags are all set to 0 when 839 * invoked. 838 /** Returns statistics about a trace. 840 839 * 841 840 * @param trace The libtrace object 842 * @param stat A zeroed structure ready to be filled. 841 * @param stat [in,out] A statistics structure ready to be filled 842 * 843 * The filtered and accepted statistics will be set to the values 844 * stored in the library. All other statistics are not set. 845 * 846 * @note If filtering of packets is performed by a trace and the number 847 * of filtered packets is unknown this should be marked as invalid by 848 * the format. 843 849 */ 844 850 void (*get_statistics)(libtrace_t *trace, libtrace_stat_t *stat); … … 955 961 void (*punregister_thread)(libtrace_t *libtrace, libtrace_thread_t *t); 956 962 957 /** 958 * Return statistics for a single thread. 963 /** Returns statistics for a single thread. 964 * 965 * @param trace The libtrace object 966 * @param t The thread to return statistics for 967 * @param stat [in,out] A statistics structure ready to be filled 968 * 969 * The filtered and accepted statistics will be set to the values 970 * stored in the library. All other statistics are not set. 971 * 972 * @note If filtering of packets is performed by a trace and the number 973 * of filtered packets is unknown this should be marked as invalid by 974 * the format. 959 975 */ 960 976 void (*get_thread_statistics)(libtrace_t *libtrace, -
lib/trace.c
r694823f r526d9d0 2027 2027 assert(trace); 2028 2028 int i = 0; 2029 uint64_t ret = trace->accepted_packets; 2029 uint64_t ret = 0; 2030 /* We always add to a thread's accepted count before dispatching the 2031 * packet to the user. However if the underlying trace is single 2032 * threaded it will also be increasing the global count. So if we 2033 * find perpkt ignore the global count. 2034 */ 2030 2035 for (i = 0; i < trace->perpkt_thread_count; i++) { 2031 2036 ret += trace->perpkt_threads[i].accepted_packets; 2032 2037 } 2033 return ret ;2038 return ret ? ret : trace->accepted_packets; 2034 2039 } 2035 2040 … … 2061 2066 LIBTRACE_STAT_FIELDS; 2062 2067 #undef X 2063 if (trace->format->get_statistics) { 2064 trace->format->get_statistics(trace, stat); 2065 ret = trace_get_accepted_packets(trace); 2066 if (ret != UINT64_MAX) { 2067 stat->accepted_valid = 1; 2068 stat->accepted = ret; 2069 } 2070 /* Now add on any library filtered packets */ 2071 if (stat->filtered_valid) { 2072 stat->filtered += trace->filtered_packets; 2073 for (i = 0; i < trace->perpkt_thread_count; i++) { 2074 stat->filtered += trace->perpkt_threads[i].filtered_packets; 2075 } 2076 } 2077 return stat; 2078 } 2068 /* Both accepted and filtered are stored against in the library */ 2079 2069 ret = trace_get_accepted_packets(trace); 2080 2070 if (ret != UINT64_MAX) { … … 2082 2072 stat->accepted = ret; 2083 2073 } 2084 ret = trace_get_received_packets(trace); 2085 if (ret != UINT64_MAX) { 2086 stat->received_valid = 1; 2087 stat->received = ret; 2088 } 2089 /* Fallback to the old way */ 2090 ret = trace_get_dropped_packets(trace); 2091 if (ret != UINT64_MAX) { 2092 stat->dropped_valid = 1; 2093 stat->dropped = ret; 2094 } 2095 ret = trace_get_filtered_packets(trace); 2096 if (ret != UINT64_MAX) { 2097 stat->filtered_valid = 1; 2098 stat->filtered = ret; 2074 2075 stat->filtered_valid = 1; 2076 stat->filtered = trace->filtered_packets; 2077 for (i = 0; i < trace->perpkt_thread_count; i++) { 2078 stat->filtered += trace->perpkt_threads[i].filtered_packets; 2079 } 2080 2081 if (trace->format->get_statistics) { 2082 trace->format->get_statistics(trace, stat); 2083 } else { 2084 /* Fallback to the old way */ 2085 ret = trace_get_received_packets(trace); 2086 if (ret != UINT64_MAX) { 2087 stat->received_valid = 1; 2088 stat->received = ret; 2089 } 2090 ret = trace_get_dropped_packets(trace); 2091 if (ret != UINT64_MAX) { 2092 stat->dropped_valid = 1; 2093 stat->dropped = ret; 2094 } 2099 2095 } 2100 2096 return stat; … … 2112 2108 LIBTRACE_STAT_FIELDS; 2113 2109 #undef X 2114 if (trace->format->get_thread_statistics) { 2110 stat->accepted_valid = 1; 2111 stat->accepted = t->accepted_packets; 2112 stat->filtered_valid = 1; 2113 stat->filtered = t->filtered_packets; 2114 if (!trace_has_dedicated_hasher(trace) && trace->format->get_thread_statistics) { 2115 2115 trace->format->get_thread_statistics(trace, t, stat); 2116 }2117 if (t->accepted_packets != UINT64_MAX) {2118 stat->accepted_valid = 1;2119 stat->accepted = t->accepted_packets;2120 }2121 /* Now add on any library filtered packets */2122 if (stat->filtered_valid) {2123 stat->filtered += t->filtered_packets;2124 2116 } 2125 2117 return; -
lib/trace_parallel.c
r10c47a0 r526d9d0 173 173 * @return true if the trace has dedicated hasher thread otherwise false. 174 174 */ 175 staticinline bool trace_has_dedicated_hasher(libtrace_t * libtrace)175 inline bool trace_has_dedicated_hasher(libtrace_t * libtrace) 176 176 { 177 177 return libtrace->hasher_thread.type == THREAD_HASHER; … … 412 412 return READ_MESSAGE; 413 413 } 414 t->accepted_packets++; 414 415 *packet = (*trace->per_pkt)(trace, *packet, NULL, t); 415 416 trace_fin_packet(*packet); … … 732 733 assert(packet); 733 734 734 if (libtrace_halt) // Signal to die has been sent - TODO 735 if (libtrace_halt) { 736 packet->error = 0; 735 737 break; 738 } 736 739 737 740 // Check for messages that we expect MESSAGE_DO_PAUSE, (internal messages only) … … 750 753 break; 751 754 case MESSAGE_DO_STOP: 752 // Stop called after pause753 755 assert(trace->started == false); 754 756 assert(trace->state == STATE_FINSHED); 757 /* Mark the current packet as EOF */ 758 packet->error = 0; 755 759 break; 756 760 default: … … 761 765 } 762 766 763 if ((packet->error = trace_read_packet(trace, packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {767 if ((packet->error = trace_read_packet(trace, packet)) <1) { 764 768 break; /* We are EOF or error'd either way we stop */ 765 769 } … … 1443 1447 trace_packet_set_order(packets[i], trace_get_erf_timestamp(packets[i])); 1444 1448 } 1445 t->accepted_packets += ret;1446 1449 } while(ret == 0); 1447 1450 return ret; … … 1681 1684 */ 1682 1685 if (trace_has_dedicated_hasher(libtrace)) { 1686 libtrace->hasher_thread.type = THREAD_EMPTY; 1683 1687 ret = trace_start_thread(libtrace, &libtrace->hasher_thread, 1684 1688 THREAD_HASHER, hasher_entry, -1,
Note: See TracChangeset
for help on using the changeset viewer.