Changeset 29bbef0 for lib/format_dpdk.c


Ignore:
Timestamp:
03/30/14 17:48:26 (7 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
f1015ad
Parents:
dad224b
Message:

My work from over summer, with a few things tidied up and updated to include recent commits/patches to bring this up to date. Still very much work in progress.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • lib/format_dpdk.c

    r2138553 r29bbef0  
    4646#include "format_helper.h"
    4747#include "libtrace_arphrd.h"
     48#include "hash_toeplitz.h"
    4849
    4950#ifdef HAVE_INTTYPES_H
     
    7273#include <rte_mempool.h>
    7374#include <rte_mbuf.h>
     75#include <rte_launch.h>
     76#include <rte_lcore.h>
     77#include <rte_per_lcore.h>
    7478
    7579/* The default size of memory buffers to use - This is the max size of standard
     
    129133
    130134/* Print verbose messages to stdout */
    131 #define DEBUG 0
     135#define DEBUG 1
    132136
    133137/* Use clock_gettime() for nanosecond resolution rather than gettimeofday()
     
    176180    DPDK_RUNNING,
    177181    DPDK_PAUSED,
     182};
     183
     184struct per_lcore_t
     185{
     186        // TODO move time stamp stuff here
     187        uint16_t queue_id;
     188        uint8_t port;
     189        uint8_t enabled;
    178190};
    179191
     
    194206    char mempool_name[MEMPOOL_NAME_LEN]; /* The name of the mempool that we are using */
    195207    unsigned int nb_blacklist; /* Number of blacklist items in are valid */
     208    uint8_t rss_key[40]; // This is the RSS KEY
    196209#if HAS_HW_TIMESTAMPS_82580
    197210    /* Timestamping only relevent to RX */
     
    200213    uint32_t wrap_count; /* Number of times the NIC clock has wrapped around completely */
    201214#endif
     215        // DPDK normally seems to have a limit of
     216        struct per_lcore_t per_lcore[RTE_MAX_LCORE];
    202217};
    203218
     
    399414     * Basically binds this thread to a fixed core, which we choose as
    400415     * the last core on the machine (assuming fewer interrupts mapped here).
    401      * "-c" controls the cpu mask 0x1=1st core 0x2=2nd 0x4=3rd and so om
     416     * "-c" controls the cpu mask 0x1=1st core 0x2=2nd 0x4=3rd and so on
    402417     * "-n" the number of memory channels into the CPU (hardware specific)
    403418     *      - Most likely to be half the number of ram slots in your machine.
     
    436451    }
    437452
    438     /* Make our mask */
    439     snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1));
     453    /* Make our mask */ //  0x1 << (my_cpu - 1)
     454    snprintf(cpu_number, sizeof(cpu_number), "%x", 0x3);
    440455    argv[2] = cpu_number;
    441456
     
    478493        return -1;
    479494    }
     495   
     496    struct rte_eth_dev_info dev_info;
     497    rte_eth_dev_info_get(0, &dev_info);
     498    printf("Device port=0\n\tmin_rx_bufsize=%d\n\tmax_rx_pktlen=%d\n\tmax rx queues=%d\n\tmax tx queues=%d",
     499                (int) dev_info.min_rx_bufsize, (int) dev_info.max_rx_pktlen, (int) dev_info.max_rx_queues, (int) dev_info.max_tx_queues);
    480500
    481501    return 0;
     
    485505    char err[500];
    486506    err[0] = 0;
     507    int i;
    487508   
    488509    libtrace->format_data = (struct dpdk_format_data_t *)
     
    504525    FORMAT(libtrace)->wrap_count = 0;
    505526#endif
    506 
     527        for (i = 0;i < RTE_MAX_LCORE; i++) {
     528                // Disabled by default
     529                FORMAT(libtrace)->per_lcore[i].enabled = 0;
     530        }
     531       
    507532    if (dpdk_init_enviroment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
    508533        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
     
    547572};
    548573
     574static int dpdk_pconfig_input (libtrace_t *libtrace,
     575                                trace_parallel_option_t option,
     576                                void *data) {
     577        switch (option) {
     578                case TRACE_OPTION_SET_HASHER:
     579                        switch (*((enum hasher_types *) data))
     580                        {
     581                                case HASHER_BALANCE:
     582                                case HASHER_UNIDIRECTIONAL:
     583                                        toeplitz_create_unikey(FORMAT(libtrace)->rss_key);
     584                                        return 0;
     585                                case HASHER_BIDIRECTIONAL:
     586                                        toeplitz_create_bikey(FORMAT(libtrace)->rss_key);
     587                                        return 0;
     588                                case HASHER_HARDWARE:
     589                                case HASHER_CUSTOM:
     590                                        // We don't support these
     591                                        return -1;
     592                        }
     593        }
     594        return -1;
     595}
    549596/**
    550597 * Note here snaplen excludes the MAC checksum. Packets over
     
    596643static struct rte_eth_conf port_conf = {
    597644        .rxmode = {
     645                .mq_mode = ETH_RSS,
    598646                .split_hdr_size = 0,
    599647                .header_split   = 0, /**< Header Split disabled */
     
    619667        .txmode = {
    620668                .mq_mode = ETH_DCB_NONE,
     669        },
     670        .rx_adv_conf = {
     671                .rss_conf = {
     672                        // .rss_key = &rss_key, // We set this per format
     673                        .rss_hf = ETH_RSS_IPV4_UDP | ETH_RSS_IPV6 | ETH_RSS_IPV4 | ETH_RSS_IPV4_TCP | ETH_RSS_IPV6_TCP | ETH_RSS_IPV6_UDP,
     674                },
    621675        },
    622676};
     
    710764     */
    711765   
     766   
     767    port_conf.rx_adv_conf.rss_conf.rss_key = format_data->rss_key;
     768   
    712769    /* This must be called first before another *eth* function
    713770     * 1 rx, 1 tx queue, port_conf sets checksum stripping etc */
     
    770827    return 0;
    771828}
     829int mapper_start(void *data); // This actually a void*
     830
     831/* Attach memory to the port and start the port or restart the ports.
     832 */
     833static int dpdk_start_port_queues (libtrace_t *libtrace, struct dpdk_format_data_t * format_data, char *err, int errlen, uint16_t rx_queues){
     834    int ret, i; /* Check return values for errors */
     835    struct rte_eth_link link_info; /* Wait for link */
     836   
     837    /* Already started */
     838    if (format_data->paused == DPDK_RUNNING)
     839        return 0;
     840
     841    /* First time started we need to alloc our memory, doing this here
     842     * rather than in enviroment setup because we don't have snaplen then */
     843    if (format_data->paused == DPDK_NEVER_STARTED) {
     844        if (format_data->snaplen == 0) {
     845            format_data->snaplen = RX_MBUF_SIZE;
     846            port_conf.rxmode.jumbo_frame = 0;
     847            port_conf.rxmode.max_rx_pkt_len = 0;
     848        } else {
     849            /* Use jumbo frames */
     850            port_conf.rxmode.jumbo_frame = 1;
     851            port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
     852        }
     853
     854        /* This is additional overhead so make sure we allow space for this */
     855#if GET_MAC_CRC_CHECKSUM
     856        format_data->snaplen += ETHER_CRC_LEN;
     857#endif
     858#if HAS_HW_TIMESTAMPS_82580
     859        format_data->snaplen += sizeof(struct hw_timestamp_82580);
     860#endif
     861
     862        /* Create the mbuf pool, which is the place our packets are allocated
     863         * from - TODO figure out if there is is a free function (I cannot see one)
     864         * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
     865         * allocate however that extra 1 packet is not used.
     866         * (I assume <= vs < error some where in DPDK code)
     867         * TX requires nb_tx_buffers + 1 in the case the queue is full
     868         * so that will fill the new buffer and wait until slots in the
     869         * ring become available.
     870         */
     871#if DEBUG
     872    printf("Creating mempool named %s\n", format_data->mempool_name);
     873#endif
     874        format_data->pktmbuf_pool =
     875            rte_mempool_create(format_data->mempool_name,
     876                       format_data->nb_rx_buf*rx_queues + format_data->nb_tx_buf + 1,
     877                       format_data->snaplen + sizeof(struct rte_mbuf)
     878                                        + RTE_PKTMBUF_HEADROOM,
     879                       8, sizeof(struct rte_pktmbuf_pool_private),
     880                       rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL,
     881                       0, MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET);
     882
     883        if (format_data->pktmbuf_pool == NULL) {
     884            snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
     885                        "pool failed: %s", strerror(rte_errno));
     886            return -1;
     887        }
     888    }
     889   
     890    /* ----------- Now do the setup for the port mapping ------------ */
     891    /* Order of calls must be
     892     * rte_eth_dev_configure()
     893     * rte_eth_tx_queue_setup()
     894     * rte_eth_rx_queue_setup()
     895     * rte_eth_dev_start()
     896     * other rte_eth calls
     897     */
     898   
     899    /* This must be called first before another *eth* function
     900     * 1 rx, 1 tx queue, port_conf sets checksum stripping etc */
     901    ret = rte_eth_dev_configure(format_data->port, rx_queues, 1, &port_conf);
     902    if (ret < 0) {
     903        snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
     904                            " %"PRIu8" : %s", format_data->port,
     905                            strerror(-ret));
     906        return -1;
     907    }
     908#if DEBUG
     909    printf("Doing dev configure\n");
     910#endif
     911    /* Initilise the TX queue a minimum value if using this port for
     912     * receiving. Otherwise a larger size if writing packets.
     913     */
     914    ret = rte_eth_tx_queue_setup(format_data->port, format_data->queue_id,
     915                        format_data->nb_tx_buf, SOCKET_ID_ANY, &tx_conf);
     916    if (ret < 0) {
     917        snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue on port"
     918                            " %"PRIu8" : %s", format_data->port,
     919                            strerror(-ret));
     920        return -1;
     921    }
     922   
     923    for (i=0; i < rx_queues; i++) {
     924#if DEBUG
     925    printf("Doing queue configure\n");
     926#endif 
     927                /* Initilise the RX queue with some packets from memory */
     928                ret = rte_eth_rx_queue_setup(format_data->port, i,
     929                                                                format_data->nb_rx_buf, SOCKET_ID_ANY,
     930                                                                &rx_conf, format_data->pktmbuf_pool);
     931                if (ret < 0) {
     932                        snprintf(err, errlen, "Intel DPDK - Cannot configure RX queue on port"
     933                                                " %"PRIu8" : %s", format_data->port,
     934                                                strerror(-ret));
     935                        return -1;
     936                }
     937        }
     938   
     939#if DEBUG
     940    printf("Doing start device\n");
     941#endif 
     942    /* Start device */
     943    ret = rte_eth_dev_start(format_data->port);
     944#if DEBUG
     945    printf("Done start device\n");
     946#endif 
     947    if (ret < 0) {
     948        snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
     949                    strerror(-ret));
     950        return -1;
     951    }
     952
     953
     954    /* Default promiscuous to on */
     955    if (format_data->promisc == -1)
     956        format_data->promisc = 1;
     957   
     958    if (format_data->promisc == 1)
     959        rte_eth_promiscuous_enable(format_data->port);
     960    else
     961        rte_eth_promiscuous_disable(format_data->port);
     962   
     963   
     964    /* We have now successfully started/unpased */
     965    format_data->paused = DPDK_RUNNING;
     966   
     967    // Can use remote launch for all
     968    /*RTE_LCORE_FOREACH_SLAVE(i) {
     969                rte_eal_remote_launch(mapper_start, (void *)libtrace, i);
     970        }*/
     971   
     972    /* Wait for the link to come up */
     973    rte_eth_link_get(format_data->port, &link_info);
     974#if DEBUG
     975    printf("Link status is %d %d %d\n", (int) link_info.link_status,
     976            (int) link_info.link_duplex, (int) link_info.link_speed);
     977#endif
     978
     979    return 0;
     980}
    772981
    773982static int dpdk_start_input (libtrace_t *libtrace) {
     
    782991    }
    783992    return 0;
     993}
     994
     995static int dpdk_pstart_input (libtrace_t *libtrace) {
     996    char err[500];
     997    int enabled_lcore_count = 0, i=0;
     998    int tot = libtrace->mapper_thread_count;
     999    err[0] = 0;
     1000       
     1001        libtrace->mapper_thread_count;
     1002       
     1003        for (i = 0; i < RTE_MAX_LCORE; i++)
     1004        {
     1005                if (rte_lcore_is_enabled(i))
     1006                        enabled_lcore_count++;
     1007        }
     1008       
     1009        tot = MIN(libtrace->mapper_thread_count, enabled_lcore_count);
     1010        tot = MIN(tot, 8);
     1011        printf("Running pstart DPDK %d %d %d %d\n", tot, libtrace->mapper_thread_count, enabled_lcore_count, rte_lcore_count());
     1012       
     1013    if (dpdk_start_port_queues(libtrace, FORMAT(libtrace), err, sizeof(err), tot) != 0) {
     1014        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
     1015        free(libtrace->format_data);
     1016        libtrace->format_data = NULL;
     1017        return -1;
     1018    }
     1019   
     1020    return 0;
     1021    return tot;
    7841022}
    7851023
     
    11161354}
    11171355
     1356
     1357static void dpdk_fin_packet(libtrace_packet_t *packet)
     1358{
     1359        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
     1360                rte_pktmbuf_free(packet->buffer);
     1361                packet->buffer = NULL;
     1362        }
     1363}
     1364
    11181365static int dpdk_read_packet (libtrace_t *libtrace, libtrace_packet_t *packet) {
    11191366    int nb_rx; /* Number of rx packets we've recevied */
     
    11451392            return dpdk_ready_pkt(libtrace, packet, pkts_burst[0]);
    11461393        }
     1394    }
     1395   
     1396    /* We'll never get here - but if we did it would be bad */
     1397    return -1;
     1398}
     1399libtrace_thread_t * get_thread_table(libtrace_t *libtrace);
     1400static int dpdk_pread_packet (libtrace_t *libtrace, libtrace_packet_t *packet) {
     1401    int nb_rx; /* Number of rx packets we've recevied */
     1402    struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) */
     1403
     1404    /* Free the last packet buffer */
     1405    if (packet->buffer != NULL) {
     1406        /* Buffer is owned by DPDK */
     1407        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
     1408            rte_pktmbuf_free(packet->buffer);
     1409            packet->buffer = NULL;
     1410        } else
     1411        /* Buffer is owned by packet i.e. has been malloc'd */
     1412        if (packet->buf_control == TRACE_CTRL_PACKET) {
     1413            free(packet->buffer);
     1414            packet->buffer = NULL;
     1415        }
     1416    }
     1417   
     1418    packet->buf_control = TRACE_CTRL_EXTERNAL;
     1419    packet->type = TRACE_RT_DATA_DPDK;
     1420   
     1421    /* Wait for a packet */
     1422    while (1) {
     1423        /* Poll for a single packet */
     1424        nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port,
     1425                            get_thread_table_num(libtrace), pkts_burst, 1);
     1426        if (nb_rx > 0) { /* Got a packet - otherwise we keep spining */
     1427                        printf("Doing P READ PACKET port=%d q=%d\n", (int) FORMAT(libtrace)->port, (int) get_thread_table_num(libtrace));
     1428            return dpdk_ready_pkt(libtrace, packet, pkts_burst[0]);
     1429        }
     1430        // Check the message queue this could be (Well it shouldn't but anyway) be less than 0
     1431        if (libtrace_message_queue_count(&(get_thread_table(libtrace)->messages)) > 0) {
     1432                        printf("Extra message yay");
     1433                        return -2;
     1434                }
    11471435    }
    11481436   
     
    13251613        dpdk_read_packet,           /* read_packet */
    13261614        dpdk_prepare_packet,    /* prepare_packet */
    1327         NULL,                               /* fin_packet */
     1615        dpdk_fin_packet,                                    /* fin_packet */
    13281616        dpdk_write_packet,          /* write_packet */
    13291617        dpdk_get_link_type,         /* get_link_type */
     
    13481636        dpdk_trace_event,               /* trace_event */
    13491637    dpdk_help,              /* help */
     1638    dpdk_pstart_input, /* pstart_input */
     1639        dpdk_pread_packet, /* pread_packet */
     1640        dpdk_pause_input, /* ppause */
     1641        dpdk_fin_input, /* p_fin */
     1642        dpdk_pconfig_input, /* pconfig_input */
    13501643        NULL
    13511644};
Note: See TracChangeset for help on using the changeset viewer.