Changeset 10c47a0


Ignore:
Timestamp:
02/23/15 15:01:00 (6 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
526d9d0
Parents:
5df059b
Message:

Fixes DAG DUCK reporting for parallel libtrace.
In parallel libtrace DUCK is only ever sent to the first thread.

It is now up each formats pread_packet to tag the trace along with
the error (AKA bytes read) to each packet.

Change logic in parallel libtrace to alwaus prefer pread over read if
it exists.

Fix some unresolved conflict in DPDK that I missed, that was ifdef'd out.

Location:
lib
Files:
6 edited

Legend:

Unmodified
Added
Removed
  • lib/format_dag25.c

    r5df059b r10c47a0  
    142142/* "Global" data that is stored for each DAG input trace */
    143143struct dag_format_data_t {
    144         /* Data required for regular DUCK reporting */
    145         /* TODO: This doesn't work with the 10X2S card! I don't know how
    146          * DUCK stuff works and don't know how to fix it */
     144        /* DAG device */
     145        struct dag_dev_t *device;
     146        /* Boolean flag indicating whether the trace is currently attached */
     147        int stream_attached;
     148        /* Data stored against each DAG input stream */
     149        libtrace_list_t *per_stream;
     150
     151        /* Data required for regular DUCK reporting.
     152         * We put this on a new cache line otherwise we have a lot of false
     153         * sharing caused by updating the last_pkt.
     154         * This should only ever be accessed by the first thread stream,
     155         * that includes both read and write operations.
     156         */
    147157        struct {
    148158                /* Timestamp of the last DUCK report */
     
    155165                 * DUCK format functions */
    156166                libtrace_t *dummy_duck;
    157         } duck;
    158         /* DAG device */
    159         struct dag_dev_t *device;
    160         /* Boolean flag indicating whether the trace is currently attached */
    161         int stream_attached;
    162         /* Data stored against each DAG input stream */
    163         libtrace_list_t *per_stream;
     167        } duck ALIGN_STRUCT(CACHE_LINE_SIZE);
    164168};
    165169
     
    914918        packet->trace = DUCK.dummy_duck;
    915919        DUCK.last_duck = DUCK.last_pkt;
     920        packet->error = sizeof(duckinf_t);
    916921        return sizeof(duckinf_t);
    917922}
     
    967972/* Converts a buffer containing a recently read DAG packet record into a
    968973 * libtrace packet */
    969 static int dag_prepare_packet_real(libtrace_t *libtrace,
    970                                    struct dag_per_stream_t *stream_data,
    971                                    libtrace_packet_t *packet,
    972                                    void *buffer, libtrace_rt_types_t rt_type,
    973                                    uint32_t flags)
     974static int dag_prepare_packet_stream(libtrace_t *libtrace,
     975                                     struct dag_per_stream_t *stream_data,
     976                                     libtrace_packet_t *packet,
     977                                     void *buffer, libtrace_rt_types_t rt_type,
     978                                     uint32_t flags)
    974979{
    975980        dag_record_t *erfptr;
     
    10251030        }
    10261031
    1027         packet->error = 1;
    1028 
    10291032        return 0;
    10301033}
     
    10341037                              uint32_t flags)
    10351038{
    1036         return dag_prepare_packet_real(libtrace, FORMAT_DATA_FIRST, packet,
     1039        return dag_prepare_packet_stream(libtrace, FORMAT_DATA_FIRST, packet,
    10371040                                       buffer, rt_type, flags);
    10381041}
     
    12221225        int size = 0;
    12231226        dag_record_t *erfptr = NULL;
     1227        struct timeval tv;
    12241228        int numbytes = 0;
    12251229        uint32_t flags = 0;
     
    12311235        maxwait.tv_usec = 250000;
    12321236
    1233         /* Check if we're due for a DUCK report */
    1234         size = dag_get_duckinfo(libtrace, packet);
    1235 
    1236         if (size != 0)
    1237                 return size;
     1237        /* Check if we're due for a DUCK report - only report on the first thread */
     1238        if (stream_data == FORMAT_DATA_FIRST) {
     1239                size = dag_get_duckinfo(libtrace, packet);
     1240                if (size != 0)
     1241                        return size;
     1242        }
     1243
    12381244
    12391245        /* Don't let anyone try to free our DAG memory hole! */
     
    12751281        packet->trace = libtrace;
    12761282        /* Prepare the libtrace packet */
    1277         if (dag_prepare_packet_real(libtrace, stream_data, packet, erfptr,
     1283        if (dag_prepare_packet_stream(libtrace, stream_data, packet, erfptr,
    12781284                                    TRACE_RT_DATA_ERF, flags))
    12791285                return -1;
    12801286
    1281         return packet->payload ? htons(erfptr->rlen) :
    1282                 erf_get_framing_length(packet);
     1287        /* Update the DUCK timer - don't re-order this check (false-sharing) */
     1288        if (stream_data == FORMAT_DATA_FIRST && DUCK.duck_freq != 0) {
     1289                tv = trace_get_timeval(packet);
     1290                DUCK.last_pkt = tv.tv_sec;
     1291        }
     1292
     1293        packet->error = packet->payload ? htons(erfptr->rlen) :
     1294                                          erf_get_framing_length(packet);
     1295
     1296        return packet->error;
    12831297}
    12841298
     
    13731387                        break;
    13741388                }
    1375                 if (dag_prepare_packet_real(libtrace, FORMAT_DATA_FIRST, packet,
     1389                if (dag_prepare_packet_stream(libtrace, FORMAT_DATA_FIRST, packet,
    13761390                                            erfptr, TRACE_RT_DATA_ERF, flags)) {
    13771391                        event.type = TRACE_EVENT_TERMINATE;
  • lib/format_dpdk.c

    r694823f r10c47a0  
    18241824 * @param packets Optional - If not null nb_pkts of libtrace packets which will be prepared
    18251825 */
    1826 static inline void dpdk_ready_pkts(struct dpdk_format_data_t *format_data, struct dpdk_per_lcore_t *plc,
     1826static inline void dpdk_ready_pkts(libtrace_t *libtrace, struct dpdk_per_lcore_t *plc,
    18271827                                   struct rte_mbuf **pkts, size_t nb_pkts, libtrace_packet_t **packets) {
     1828        struct dpdk_format_data_t *format_data = FORMAT(libtrace);
    18281829        struct dpdk_addt_hdr *hdr;
    18291830        size_t i;
     
    18651866#endif
    18661867
    1867         assert(RTE_PKTMBUF_HEADROOM >= sizeof(struct dpdk_addt_hdr)); // TODO static compile time assert sometime??
     1868        ct_assert(RTE_PKTMBUF_HEADROOM >= sizeof(struct dpdk_addt_hdr));
    18681869        for (i = 0 ; i < nb_pkts ; ++i) {
    18691870
     
    18731874
    18741875#if GET_MAC_CRC_CHECKSUM
    1875 <<<<<<< HEAD
    18761876                /* Add back in the CRC sum */
    1877                 pkts[i]->pkt.pkt_len += ETHER_CRC_LEN;
    1878                 pkts[i]->pkt.data_len += ETHER_CRC_LEN;
     1877                rte_pktmbuf_pkt_len(pkt) += ETHER_CRC_LEN;
     1878                rte_pktmbuf_data_len(pkt) += ETHER_CRC_LEN;
    18791879                hdr->flags |= INCLUDES_CHECKSUM;
    1880 =======
    1881     /* Add back in the CRC sum */
    1882     rte_pktmbuf_pkt_len(pkt) += ETHER_CRC_LEN;
    1883     rte_pktmbuf_data_len(pkt) += ETHER_CRC_LEN;
    1884     hdr->flags |= INCLUDES_CHECKSUM;
    1885 >>>>>>> master
    18861880#endif
    18871881
     
    19821976                        packets[i]->buffer = pkts[i];
    19831977                        packets[i]->header = pkts[i];
     1978                        packets[i]->trace = libtrace;
    19841979#if HAS_HW_TIMESTAMPS_82580
    19851980                        packets[i]->payload = (char *) pkts[i] + sizeof(struct rte_mbuf) +
     
    20422037                FORMAT(libtrace)->burst_size = nb_rx;
    20432038                FORMAT(libtrace)->burst_offset = 1;
    2044                 dpdk_ready_pkts(FORMAT(libtrace), &FORMAT(libtrace)->per_lcore[0], FORMAT(libtrace)->burst_pkts, nb_rx, NULL);
     2039                dpdk_ready_pkts(libtrace, &FORMAT(libtrace)->per_lcore[0], FORMAT(libtrace)->burst_pkts, nb_rx, NULL);
    20452040                packet->buffer = FORMAT(libtrace)->burst_pkts[0];
    20462041                dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
     
    20902085                /* Got some packets - otherwise we keep spining */
    20912086                //fprintf(stderr, "Doing P READ PACKET port=%d q=%d\n", (int) FORMAT(libtrace)->port, (int) get_thread_table_num(libtrace));
    2092                 dpdk_ready_pkts(FORMAT(libtrace), PERPKT_FORMAT(t), pkts_burst, nb_rx, packets);
     2087                dpdk_ready_pkts(libtrace, PERPKT_FORMAT(t), pkts_burst, nb_rx, packets);
    20932088                return nb_rx;
    20942089        }
     
    22122207            packet->type = TRACE_RT_DATA_DPDK;
    22132208            event.type = TRACE_EVENT_PACKET;
    2214             dpdk_ready_pkts(FORMAT(trace), &FORMAT(trace)->per_lcore[0], pkts_burst, 1, &packet);
     2209            dpdk_ready_pkts(trace, &FORMAT(trace)->per_lcore[0], pkts_burst, 1, &packet);
    22152210            event.size = 1; // TODO should be bytes read, which essentially useless anyway
    22162211
  • lib/format_linux_int.c

    re4f27d1 r10c47a0  
    300300         * we just need to get prepare_packet to set all our packet pointers
    301301         * appropriately */
    302        
     302        packet->trace = libtrace;
    303303        if (linuxnative_prepare_packet(libtrace, packet, packet->buffer,
    304304                                packet->type, flags))
  • lib/format_linux_ring.c

    re4f27d1 r10c47a0  
    525525
    526526        packet->buffer = header;
     527        packet->trace = libtrace;
    527528
    528529        /* If a snaplen was configured, automatically truncate the packet to
  • lib/libtrace_int.h

    r5ab626a r10c47a0  
    307307        enum hasher_types hasher_type;
    308308        /** The hasher function - NULL implies they don't care or balance */
    309         fn_hasher hasher; // If valid using a separate thread
     309        fn_hasher hasher;
    310310        void *hasher_data;
    311311        /** The pread_packet choosen path for the configuration */
  • lib/trace_parallel.c

    re8c0a9c r10c47a0  
    168168
    169169/**
    170  * True if the trace has dedicated hasher thread otherwise false.
    171  * This can be used once the hasher thread has been started.
    172  */
    173 static inline int trace_has_dedicated_hasher(libtrace_t * libtrace)
     170 * This can be used once the hasher thread has been started and internally after
     171 * verfiy_configuration.
     172 *
     173 * @return true if the trace has dedicated hasher thread otherwise false.
     174 */
     175static inline bool trace_has_dedicated_hasher(libtrace_t * libtrace)
    174176{
    175177        return libtrace->hasher_thread.type == THREAD_HASHER;
     
    260262
    261263/**
     264 * This is valid once a trace is initialised
     265 *
    262266 * @return True if the format supports parallel threads.
    263267 */
     
    14291433                        }
    14301434                        for (i = 0; i < ret; ++i) {
    1431                                 packets[i]->trace = libtrace;
     1435                                /* We do not mark the packet against the trace,
     1436                                 * before hand or after. After breaks DAG meta
     1437                                 * packets and before is inefficient */
     1438                                //packets[i]->trace = libtrace;
    14321439                                /* TODO IN FORMAT?? Like traditional libtrace */
    14331440                                if (libtrace->snaplen>0)
     
    14971504 */
    14981505static void verify_configuration(libtrace_t *libtrace) {
     1506        bool require_hasher = false;
     1507
     1508        /* Might we need a dedicated hasher thread? */
     1509        if (libtrace->hasher && libtrace->hasher_type != HASHER_HARDWARE) {
     1510                require_hasher = true;
     1511        }
    14991512
    15001513        if (libtrace->config.hasher_queue_size <= 0)
     
    15261539        if (libtrace->combiner.initialise == NULL && libtrace->combiner.publish == NULL)
    15271540                libtrace->combiner = combiner_unordered;
     1541
     1542
     1543        /* Figure out if we are using a dedicated hasher thread? */
     1544        if (require_hasher && libtrace->config.perpkt_threads > 1) {
     1545                libtrace->hasher_thread.type = THREAD_HASHER;
     1546        }
    15281547}
    15291548
     
    16301649        verify_configuration(libtrace);
    16311650
    1632         /* Try start the format */
    1633         if (libtrace->perpkt_thread_count > 1 &&
    1634             trace_supports_parallel(libtrace) &&
     1651        /* Try start the format - we prefer parallel over single threaded, as
     1652         * these formats should support messages better */
     1653        if (trace_supports_parallel(libtrace) &&
    16351654            !trace_has_dedicated_hasher(libtrace)) {
    1636                 printf("This format has direct support for p's\n");
     1655                printf("Using the parallel trace format\n");
    16371656                ret = libtrace->format->pstart_input(libtrace);
    16381657                libtrace->pread = trace_pread_packet_wrapper;
    16391658        } else {
     1659                printf("Using single threaded interface\n");
    16401660                if (libtrace->format->start_input) {
    16411661                        ret = libtrace->format->start_input(libtrace);
     
    16441664                        libtrace->pread = trace_pread_packet_first_in_first_served;
    16451665                else
     1666                        /* Use standard read_packet */
    16461667                        libtrace->pread = NULL;
    16471668        }
     
    16591680         * Special Case: If single threaded we don't need a hasher
    16601681         */
    1661         if (libtrace->perpkt_thread_count > 1 && libtrace->hasher
    1662             && libtrace->hasher_type != HASHER_HARDWARE) {
     1682        if (trace_has_dedicated_hasher(libtrace)) {
    16631683                ret = trace_start_thread(libtrace, &libtrace->hasher_thread,
    16641684                                   THREAD_HASHER, hasher_entry, -1,
     
    17311751                goto cleanup_threads;
    17321752        }
    1733 
    1734         /*trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "trace_pstart "
    1735                       "failed to allocate ocache.");
    1736         goto cleanup_threads;*/
    17371753
    17381754        if (libtrace_ocache_init(&libtrace->packet_freelist,
     
    19902006        } else {
    19912007                trace->hasher = NULL;
    1992                 // TODO consider how to handle freeing this
    19932008                trace->hasher_data = NULL;
    19942009        }
Note: See TracChangeset for help on using the changeset viewer.