Changeset 368a1ae


Ignore:
Timestamp:
02/05/15 19:33:09 (6 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
858ce90
Parents:
75e088f
Message:

Fixes filtered and accepted stat counters

The accepted counter was an int instead of uint64_t.
The filtered counter was not thread safe.

This also tidies up the pread wrapper code - however includes some
changes from the next patch which changes its return so most
likely will not function correctly.

Location:
lib
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • lib/libtrace_int.h

    r04bf7c5 r368a1ae  
    193193 */
    194194struct libtrace_thread_t {
    195         int accepted_packets; // The number of packets accepted only used if pread
     195        uint64_t accepted_packets; // The number of packets accepted only used if pread
     196        uint64_t filtered_packets;
    196197        // is retreving packets
    197198        // Set to true once the first packet has been stored
  • lib/trace.c

    r04bf7c5 r368a1ae  
    19571957{
    19581958        assert(trace);
     1959        int i = 0;
     1960        uint64_t ret = trace->filtered_packets;
     1961        for (i = 0; i < trace->perpkt_thread_count; i++) {
     1962                ret += trace->perpkt_threads[i].filtered_packets;
     1963        }
    19591964        if (trace->format->get_filtered_packets) {
    19601965                return trace->format->get_filtered_packets(trace)+
    1961                         trace->filtered_packets;
    1962         }
    1963         return trace->filtered_packets;
     1966                        ret;
     1967        }
     1968        return ret;
    19641969}
    19651970
  • lib/trace_parallel.c

    r04bf7c5 r368a1ae  
    293293void libtrace_zero_thread(libtrace_thread_t * t) {
    294294        t->accepted_packets = 0;
     295        t->filtered_packets = 0;
    295296        t->recorded_first = false;
    296297        t->tracetime_offset_usec = 0;
     
    12551256}
    12561257
    1257 /* Read one packet from the trace into a buffer. Note that this function will
    1258  * block until a packet is read (or EOF is reached).
    1259  *
    1260  * @param libtrace      the trace
    1261  * @param t     The thread
    1262  * @param packets       an array of packets
    1263  * @param nb_packets
    1264  * @returns The number of packets read or 0 on EOF, negative value on error
    1265  *
    1266  * Note this is identical to read_packet but calls pread_packet instead of
    1267  * read packet in the format.
    1268  *
    1269  */
    1270 static inline int trace_pread_packet_wrapper(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packets, size_t nb_packets) {
     1258/* Discards packets that don't match the filter.
     1259 * Discarded packets are emptied and then moved to the end of the packet list.
     1260 *
     1261 * @param trace       The trace format, containing the filter
     1262 * @param packets     An array of packets
     1263 * @param nb_packets  The number of valid items in packets
     1264 *
     1265 * @return The number of packets that passed the filter, which are moved to
     1266 *          the start of the packets array
     1267 */
     1268static inline size_t filter_packets(libtrace_t *trace,
     1269                                    libtrace_packet_t **packets,
     1270                                    size_t nb_packets) {
     1271        size_t offset = 0;
    12711272        size_t i;
     1273
     1274        for (i = 0; i < nb_packets; ++i) {
     1275                // The filter needs the trace attached to receive the link type
     1276                packets[i]->trace = trace;
     1277                if (trace_apply_filter(trace->filter, packets[i])) {
     1278                        libtrace_packet_t *tmp;
     1279                        tmp = packets[offset];
     1280                        packets[offset++] = packets[i];
     1281                        packets[i] = tmp;
     1282                } else {
     1283                        trace_fin_packet(packets[i]);
     1284                }
     1285        }
     1286
     1287        return offset;
     1288}
     1289
     1290/* Read a batch of packets from the trace into a buffer.
     1291 * Note that this function will block until a packet is read (or EOF is reached)
     1292 *
     1293 * @param libtrace    The trace
     1294 * @param t           The thread
     1295 * @param packets     An array of packets
     1296 * @param nb_packets  The number of empty packets in packets
     1297 * @return The number of packets read, 0 on EOF (or an error/message -1,-2).
     1298 */
     1299static int trace_pread_packet_wrapper(libtrace_t *libtrace,
     1300                                      libtrace_thread_t *t,
     1301                                      libtrace_packet_t *packets[],
     1302                                      size_t nb_packets) {
     1303        int i;
    12721304        assert(nb_packets);
    1273         assert(libtrace && "You called trace_read_packet() with a NULL libtrace parameter!\n");
     1305        assert(libtrace && "libtrace is NULL in trace_read_packet()");
    12741306        if (trace_is_err(libtrace))
    12751307                return -1;
    12761308        if (!libtrace->started) {
    1277                 trace_set_err(libtrace,TRACE_ERR_BAD_STATE,"You must call libtrace_start() before trace_read_packet()\n");
     1309                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
     1310                              "You must call libtrace_start() before trace_read_packet()\n");
    12781311                return -1;
    12791312        }
    12801313
    1281 
    12821314        if (libtrace->format->pread_packets) {
    1283                 for (i = 0; i < nb_packets; ++i) {
    1284                         assert(packets[i]);
    1285                         if (!(packets[i]->buf_control==TRACE_CTRL_PACKET || packets[i]->buf_control==TRACE_CTRL_EXTERNAL)) {
    1286                                 trace_set_err(libtrace,TRACE_ERR_BAD_STATE,"Packet passed to trace_read_packet() is invalid\n");
     1315                int ret;
     1316                for (i = 0; i < (int) nb_packets; ++i) {
     1317                        assert(i[packets]);
     1318                        if (!(packets[i]->buf_control==TRACE_CTRL_PACKET ||
     1319                              packets[i]->buf_control==TRACE_CTRL_EXTERNAL)) {
     1320                                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,
     1321                                              "Packet passed to trace_read_packet() is invalid\n");
    12871322                                return -1;
    12881323                        }
    1289                         /* Finalise the packets, freeing any resources the format module
    1290                          * may have allocated it and zeroing all data associated with it.
    1291                          */
    1292                         //trace_fin_packet(packets[i]);
    1293                         /* Store the trace we are reading from into the packet opaque
    1294                          * structure */
    1295                         packets[i]->trace = libtrace;
    12961324                }
    12971325                do {
    1298                         int ret;
    1299                         ret=libtrace->format->pread_packets(libtrace, t, packets, nb_packets);
     1326                        ret=libtrace->format->pread_packets(libtrace, t,
     1327                                                            packets,
     1328                                                            nb_packets);
     1329                        /* Error, EOF or message? */
    13001330                        if (ret <= 0) {
    13011331                                return ret;
    13021332                        }
     1333
    13031334                        if (libtrace->filter) {
    1304                                 /*
    1305                                  * Discard packets that don't match the filter
    1306                                  * If that is all of the packets then pread again
    1307                                  */
    1308                                 int nb_filtered = 0;
    1309                                 libtrace_packet_t *filtered_pkts[ret];
    1310                                 int offset;
    1311                                 for (i = 0; i < ret; ++i) {
    1312                                         if (!trace_apply_filter(libtrace->filter, packets[i])){
    1313                                                 trace_fin_packet(packets[i]);
    1314                                                 packets[i]->trace = libtrace;
    1315                                                 filtered_pkts[nb_filtered++] = packets[i];
    1316                                                 packets[i] = NULL;
    1317                                         } else {
    1318                                                 if (libtrace->snaplen>0)
    1319                                                         /* Snap the packet */
    1320                                                         trace_set_capture_length(packets[i],
    1321                                                                         libtrace->snaplen);
    1322                                                 trace_packet_set_order(packets[i], trace_get_erf_timestamp(packets[i]));
    1323                                         }
    1324                                 }
    1325                                 // TODO this aint thread safe
    1326                                 libtrace->filtered_packets += nb_filtered;
    1327                                 for (i = 0, offset = 0; i < ret; ++i) {
    1328                                         if (packets[i])
    1329                                                 packets[offset++] = packets[i];
    1330                                 }
    1331                                 assert (ret - offset == nb_filtered);
    1332                                 memcpy(&packets[offset], filtered_pkts, nb_filtered * sizeof(libtrace_packet_t *));
    1333                                 t->accepted_packets -= nb_filtered;
    1334                         } else {
    1335                                 for (i = 0; i < ret; ++i) {
    1336                                         if (libtrace->snaplen>0)
    1337                                                 trace_set_capture_length(packets[i],
    1338                                                                 libtrace->snaplen);
    1339                                         trace_packet_set_order(packets[i], trace_get_erf_timestamp(packets[i]));
    1340                                 }
     1335                                int remaining;
     1336                                remaining = filter_packets(libtrace,
     1337                                                           packets, ret);
     1338                                t->filtered_packets += ret - remaining;
     1339                                ret = remaining;
     1340                        }
     1341                        for (i = 0; i < ret; ++i) {
     1342                                packets[i]->trace = libtrace;
     1343                                /* TODO IN FORMAT?? Like traditional libtrace */
     1344                                if (libtrace->snaplen>0)
     1345                                        trace_set_capture_length(packets[i],
     1346                                                        libtrace->snaplen);
     1347                                trace_packet_set_order(packets[i], trace_get_erf_timestamp(packets[i]));
    13411348                        }
    13421349                        t->accepted_packets += ret;
    1343                         //++libtrace->accepted_packets;
    1344                         return ret;
    1345                 } while(1);
    1346         }
    1347         trace_set_err(libtrace,TRACE_ERR_UNSUPPORTED,"This format does not support reading packets\n");
     1350                } while(ret == 0);
     1351                return ret;
     1352        }
     1353        trace_set_err(libtrace, TRACE_ERR_UNSUPPORTED,
     1354                      "This format does not support reading packets\n");
    13481355        return ~0U;
    13491356}
Note: See TracChangeset for help on using the changeset viewer.