Ignore:
Timestamp:
12/19/14 15:47:34 (7 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
1960910
Parents:
6e41e73
Message:

Improves speed of the DPDK format(and parallel libtrace) and fixes some DPDK bugs

Fixes bug with PCI address being parsed as a decimal instead of hex.
Fixes bug so DPDK Breaks out of loop when libtrace_halt is called

For performance

  • Rearranges the header format used to simplify code which might show a small speed up
  • Batching packets is now supported thoughout the parallel framework
  • DPDK now always reads packets in bursts if possible, including in single threaded mode
  • Calls to retrive system time only needs to occur once for a batch of packets
  • The CPU core used to run DPDK is/are now picked based upon the NUMA node the NIC is attached to
  • A delay has been added to reduce the memory load of polling after unsuccessful attempts this tends to improve performance
File:
1 edited

Legend:

Unmodified
Added
Removed
  • lib/trace_parallel.c

    r6e41e73 rd7fd648  
    389389/**
    390390 * Dispatches packets to their correct place and applies any translations
    391  * as needed
     391 * as needed.
     392 *
    392393 * @param trace
    393394 * @param t
    394395 * @param packet (in, out) this will be set to NULL if the user doesn't return the packet for reuse
    395  * @return -1 if an error or EOF has occured and the trace should end otherwise 0 to continue as normal
     396 * @return -1 if an error or EOF has occured and the trace should end, otherwise a postive number (or 0)
     397 * representing the number of packets returned, these will be at the beginning of the array.
    396398 */
    397399static inline int dispatch_packets(libtrace_t *trace, libtrace_thread_t *t, libtrace_packet_t **packets,
    398400                                   size_t nb_packets) {
    399401        libtrace_message_t message;
    400         size_t i;
     402        size_t i, empty = 0;
    401403        for (i = 0; i < nb_packets; ++i) {
    402404                if (packets[i]->error > 0) {
    403405                        packets[i] = (*trace->per_pkt)(trace, packets[i], NULL, t);
     406                        trace_fin_packet(packets[i]);
    404407                } else if (packets[i]->error == READ_TICK) {
    405408                        message.code = MESSAGE_TICK;
     
    417420                        return -1;
    418421                }
    419                 // -2 is a message its not worth checking now just finish this lot and we'll check
    420                 // when we loop next
     422                if (packets[i]) {
     423                        // Move full slots to front
     424                        if (empty != i) {
     425                                packets[empty] = packets[i];
     426                                packets[i] = NULL;
     427                        }
     428                        ++empty;
     429                        // Finish packets while still in CPU cache
     430                }
     431        }
     432        return empty;
     433}
     434
     435static inline int dispatch_packet(libtrace_t *trace, libtrace_thread_t *t, libtrace_packet_t **packet) {
     436        libtrace_message_t message;
     437        if ((*packet)->error > 0) {
     438                *packet = (*trace->per_pkt)(trace, *packet, NULL, t);
     439                trace_fin_packet(*packet);
     440        } else if ((*packet)->error == READ_TICK) {
     441                message.code = MESSAGE_TICK;
     442                message.additional.uint64 = trace_packet_get_order(*packet);
     443                message.sender = t;
     444                (*trace->per_pkt)(trace, NULL, &message, t);
     445        } else if ((*packet)->error != READ_MESSAGE) {
     446                return -1;
    421447        }
    422448        return 0;
     
    433459        size_t nb_packets;
    434460        size_t i;
    435 
     461        int ret;
     462
     463        /* Fill it with empty packets */
    436464        memset(&packets, 0, sizeof(void*) * trace->config.burst_size);
     465        libtrace_ocache_alloc(&trace->packet_freelist, (void **) packets, trace->config.burst_size, trace->config.burst_size);
     466
    437467        // Force this thread to wait until trace_pstart has been completed
    438468        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
     
    496526
    497527                if (trace->perpkt_thread_count == 1) {
     528                        assert(packets[0]);
     529                        packets[0]->error = trace_read_packet(trace, packets[0]);
     530                        if (dispatch_packet(trace, t, &packets[0]) != 0)
     531                                break;
    498532                        if (!packets[0]) {
    499533                                libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packets[0], 1, 1);
    500534                        }
    501                         assert(packets[0]);
    502                         packets[0]->error = trace_read_packet(trace, packets[0]);
    503                         nb_packets = 1;
    504535                } else {
    505536                        nb_packets = trace_pread_packet(trace, t, packets, trace->config.burst_size);
    506                 }
    507                 // Loop through the packets we just read
    508                 if (dispatch_packets(trace, t, packets, nb_packets) == -1)
    509                         break;
     537                        // Loop through the packets we just read and refill
     538                        ret = dispatch_packets(trace, t, packets, nb_packets);
     539                        if (ret == -1)
     540                                break;
     541                        else if (ret != nb_packets) {
     542                                // Refill the empty packets
     543                                printf("Refilling packets ret=%d nb_packets=%z\n", ret, nb_packets);
     544                                libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packets[ret], nb_packets - ret, nb_packets - ret);
     545                        }
     546                }
    510547        }
    511548
     
    754791{
    755792        size_t i = 0;
    756         bool tick_hit = false;
    757 
    758         nb_packets = fill_array_with_empty_packets(libtrace, packets, nb_packets);
     793        //bool tick_hit = false;
    759794
    760795        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
     
    13201355 * block until a packet is read (or EOF is reached).
    13211356 *
    1322  * @param libtrace      the libtrace opaque pointer
    1323  * @param packet        the packet opaque pointer
    1324  * @returns 0 on EOF, negative value on error
     1357 * @param libtrace      the trace
     1358 * @param t     The thread
     1359 * @param packets       an array of packets
     1360 * @param nb_packets
     1361 * @returns The number of packets read or 0 on EOF, negative value on error
    13251362 *
    13261363 * Note this is identical to read_packet but calls pread_packet instead of
     
    13281365 *
    13291366 */
    1330 static inline int trace_pread_packet_wrapper(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packet) {
    1331 
     1367static inline int trace_pread_packet_wrapper(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packets, size_t nb_packets) {
     1368        size_t i;
     1369        assert(nb_packets);
    13321370        assert(libtrace && "You called trace_read_packet() with a NULL libtrace parameter!\n");
    13331371        if (trace_is_err(libtrace))
     
    13371375                return -1;
    13381376        }
    1339         if (!(packet->buf_control==TRACE_CTRL_PACKET || packet->buf_control==TRACE_CTRL_EXTERNAL)) {
    1340                 trace_set_err(libtrace,TRACE_ERR_BAD_STATE,"Packet passed to trace_read_packet() is invalid\n");
    1341                 return -1;
    1342         }
    1343         assert(packet);
    1344 
    1345         if (libtrace->format->pread_packet) {
    1346                 do {
    1347                         size_t ret;
    1348                         /* Finalise the packet, freeing any resources the format module
     1377
     1378
     1379        if (libtrace->format->pread_packets) {
     1380                for (i = 0; i < nb_packets; ++i) {
     1381                        assert(packets[i]);
     1382                        if (!(packets[i]->buf_control==TRACE_CTRL_PACKET || packets[i]->buf_control==TRACE_CTRL_EXTERNAL)) {
     1383                                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,"Packet passed to trace_read_packet() is invalid\n");
     1384                                return -1;
     1385                        }
     1386                        /* Finalise the packets, freeing any resources the format module
    13491387                         * may have allocated it and zeroing all data associated with it.
    1350                          */
    1351                         trace_fin_packet(packet);
     1388                         *./
     1389                        //trace_fin_packet(packets[i]);
    13521390                        /* Store the trace we are reading from into the packet opaque
    13531391                         * structure */
    1354                         packet->trace = libtrace;
    1355                         ret=libtrace->format->pread_packet(libtrace, t, packet);
    1356                         if (ret==(size_t)-1 || ret==(size_t)-2 || ret==0) {
     1392                        packets[i]->trace = libtrace;
     1393                }
     1394                do {
     1395                        int ret;
     1396                        ret=libtrace->format->pread_packets(libtrace, t, packets, nb_packets);
     1397                        if (ret <= 0) {
    13571398                                return ret;
    13581399                        }
    13591400                        if (libtrace->filter) {
    1360                                 /* If the filter doesn't match, read another
    1361                                  * packet
     1401                                /*
     1402                                 * Discard packets that don't match the filter
     1403                                 * If that is all of the packets then pread again
    13621404                                 */
    1363                                 if (!trace_apply_filter(libtrace->filter,packet)){
    1364                                         ++libtrace->filtered_packets;
    1365                                         continue;
     1405                                int nb_filtered = 0;
     1406                                libtrace_packet_t *filtered_pkts[ret];
     1407                                int offset;
     1408                                for (i = 0; i < ret; ++i) {
     1409                                        if (!trace_apply_filter(libtrace->filter, packets[i])){
     1410                                                trace_fin_packet(packets[i]);
     1411                                                packets[i]->trace = libtrace;
     1412                                                filtered_pkts[nb_filtered++] = packets[i];
     1413                                                packets[i] = NULL;
     1414                                        } else if (libtrace->snaplen>0) {
     1415                                                /* Snap the packet */
     1416                                                trace_set_capture_length(packets[i],
     1417                                                                libtrace->snaplen);
     1418                                                trace_packet_set_order(packets[i], trace_get_erf_timestamp(packets[i]));
     1419                                        }
     1420                                }
     1421                                // TODO this aint thread safe
     1422                                libtrace->filtered_packets += nb_filtered;
     1423                                for (i = 0, offset = 0; i < ret; ++i) {
     1424                                        if (packets[i])
     1425                                                packets[offset++] = packets[i];
     1426                                }
     1427                                assert (ret - offset == nb_filtered);
     1428                                memcpy(&packets[offset], filtered_pkts, nb_filtered * sizeof(libtrace_packet_t *));
     1429                                t->accepted_packets -= nb_filtered;
     1430                        } else {
     1431                                for (i = 0; i < ret; ++i) {
     1432                                        trace_set_capture_length(packets[i],
     1433                                                        libtrace->snaplen);
     1434                                        trace_packet_set_order(packets[i], trace_get_erf_timestamp(packets[i]));
    13661435                                }
    13671436                        }
    1368                         if (libtrace->snaplen>0) {
    1369                                 /* Snap the packet */
    1370                                 trace_set_capture_length(packet,
    1371                                                 libtrace->snaplen);
    1372                         }
    1373 
    1374                         ++t->accepted_packets;
    1375                         // TODO look into this better
    1376                         trace_packet_set_order(packet, trace_get_erf_timestamp(packet));
    1377                         //trace_packet_set_order(packet, libtrace->accepted_packets);
     1437                        t->accepted_packets += ret;
    13781438                        //++libtrace->accepted_packets;
    13791439                        return ret;
     
    13851445
    13861446/**
    1387  * Read packets from the parallel trace
     1447 * Selects the correct source for packets, either a parallel source
     1448 * or internal splitting
     1449 *
     1450 * @param libtrace
     1451 * @param t
     1452 * @param packets An array pre-filled with empty finilised packets
     1453 * @param nb_packets The number of packets in the array
     1454 *
    13881455 * @return the number of packets read, null packets indicate messages. Check packet->error before
    13891456 * assuming a packet is valid.
     
    13951462        assert(nb_packets);
    13961463
    1397         for (i = 0; i < nb_packets; i++) {
    1398                 // Cleanup the packet passed back
    1399                 if (packets[i])
    1400                         trace_fin_packet(packets[i]);
    1401         }
    1402 
    14031464        if (trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
    1404                 if (!packets[0])
    1405                         libtrace_ocache_alloc(&libtrace->packet_freelist, (void **)packets, 1, 1);
    1406                 packets[0]->error = trace_pread_packet_wrapper(libtrace, t, *packets);
    1407                 ret = 1;
     1465                ret = trace_pread_packet_wrapper(libtrace, t, packets, nb_packets);
     1466                /* Put the error into the first packet */
     1467                if ((int) ret <= 0) {
     1468                        packets[0]->error = ret;
     1469                        ret = 1;
     1470                }
    14081471        } else if (trace_has_dedicated_hasher(libtrace)) {
    14091472                ret = trace_pread_packet_hasher_thread(libtrace, t, packets, nb_packets);
Note: See TracChangeset for help on using the changeset viewer.