Changeset 50ce607 for lib/format_dpdk.c


Ignore:
Timestamp:
07/18/14 14:20:32 (7 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
b4b6b75
Parents:
8af0d01
Message:

Adds per thread storage to for the format to use against libtrace_threads.
Passes threads as arguments to reads to save overhead of looking these up.
Various changes to the DPDK system including registering a thread to allow our threads to be start with different DPDK thread numbers for thread local memory caches.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • lib/format_dpdk.c

    r8af0d01 r50ce607  
    4141 */
    4242
     43#define _GNU_SOURCE
     44
    4345#include "config.h"
    4446#include "libtrace.h"
     
    7678#include <rte_lcore.h>
    7779#include <rte_per_lcore.h>
     80#include <pthread.h>
    7881
    7982/* The default size of memory buffers to use - This is the max size of standard
     
    111114#define MBUF_PKTDATA(x) ((char *) x + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
    112115#define FORMAT(x) ((struct dpdk_format_data_t*)(x->format_data))
     116#define PERPKT_FORMAT(x) ((struct dpdk_per_lcore_t*)(x->format_data))
     117
    113118#define TV_TO_NS(tv) ((uint64_t) tv.tv_sec*1000000000ull + \
    114119                        (uint64_t) tv.tv_usec*1000ull)
     
    182187};
    183188
    184 struct per_lcore_t
     189struct dpdk_per_lcore_t
    185190{
    186191        // TODO move time stamp stuff here
    187192        uint16_t queue_id;
    188193        uint8_t port;
    189         uint8_t enabled;
    190194};
    191195
     
    213217    uint32_t wrap_count; /* Number of times the NIC clock has wrapped around completely */
    214218#endif
    215         // DPDK normally seems to have a limit of
    216         struct per_lcore_t per_lcore[RTE_MAX_LCORE];
     219        // DPDK normally seems to have a limit of 8 queues for a given card
     220        struct dpdk_per_lcore_t per_lcore[RTE_MAX_LCORE];
    217221};
    218222
     
    387391#endif
    388392
     393/**
     394 * Expects to be called from the master lcore and moves it to the given dpdk id
     395 * @param core (zero indexed) If core is on the physical system affinity is bound otherwise
     396 *               affinity is set to all cores. Must be less than RTE_MAX_LCORE
     397 *               and not already in use.
     398 * @return 0 is successful otherwise -1 on error.
     399 */
     400static inline int dpdk_move_master_lcore(size_t core) {
     401    struct rte_config *cfg = rte_eal_get_configuration();
     402    cpu_set_t cpuset;
     403    int i;
     404
     405    assert (core < RTE_MAX_LCORE);
     406    assert (rte_get_master_lcore() == rte_lcore_id());
     407
     408    if (core == rte_lcore_id())
     409        return 0;
     410
     411    // Make sure we are not overwriting someone else
     412    assert(!rte_lcore_is_enabled(core));
     413
     414    // Move the core
     415    cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
     416    cfg->lcore_role[core] = ROLE_RTE;
     417    lcore_config[core].thread_id = lcore_config[rte_lcore_id()].thread_id;
     418    rte_eal_get_configuration()->master_lcore = core;
     419    RTE_PER_LCORE(_lcore_id) = core;
     420
     421    // Now change the affinity
     422    CPU_ZERO(&cpuset);
     423
     424    if (lcore_config[core].detected) {
     425        CPU_SET(core, &cpuset);
     426    } else {
     427        for (i = 0; i < RTE_MAX_LCORE; ++i) {
     428            if (lcore_config[i].detected)
     429                CPU_SET(i, &cpuset);
     430        }
     431    }
     432
     433    i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
     434    if (i != 0) {
     435        // TODO proper libtrace style error here!!
     436        fprintf(stderr, "pthread_setaffinity_np failed\n");
     437        return -1;
     438    }
     439    return 0;
     440}
     441
     442
    389443static inline int dpdk_init_enviroment(char * uridata, struct dpdk_format_data_t * format_data,
    390444                                        char * err, int errlen) {
     
    395449    long nb_cpu; /* The number of CPUs in the system */
    396450    long my_cpu; /* The CPU number we want to bind to */
     451    int i;
     452    struct rte_config *cfg = rte_eal_get_configuration();
    397453   
    398454#if DEBUG
     
    402458#endif
    403459    /* Using proc-type auto allows this to be either primary or secondary
    404      * Secondary allows two intances of libtrace to be used on different
     460     * Secondary allows two instances of libtrace to be used on different
    405461     * ports. However current version of DPDK doesn't support this on the
    406      * same card (My understanding is this should work with two seperate
     462     * same card (My understanding is this should work with two separate
    407463     * cards).
    408464     *
     
    415471    int argc = sizeof(argv) / sizeof(argv[0]) - 1;
    416472   
    417     /* This initilises the Enviroment Abstraction Layer (EAL)
     473    /* This initialises the Environment Abstraction Layer (EAL)
    418474     * If we had slave workers these are put into WAITING state
    419475     *
     
    457513    }
    458514
    459     /* Make our mask */ //  0x1 << (my_cpu - 1)
    460     snprintf(cpu_number, sizeof(cpu_number), "%x", 0x3);
     515    /* Make our mask with all cores turned on this is so that DPDK to gets CPU
     516       info older versions */
     517    snprintf(cpu_number, sizeof(cpu_number), "%x", ~(UINT32_MAX<<MIN(31, nb_cpu)));
     518    //snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1));
    461519
    462520        /* Give this a name */
     
    470528        return -1;
    471529    }
     530
     531    // These are still running but will never do anything with DPDK v1.7 we
     532    // should remove this XXX in the future
     533    for(i = 0; i < RTE_MAX_LCORE; ++i) {
     534        if (rte_lcore_is_enabled(i) && i != rte_get_master_lcore()) {
     535            cfg->lcore_role[i] = ROLE_OFF;
     536            cfg->lcore_count--;
     537        }
     538    }
     539    // Only the master should be running
     540    assert(cfg->lcore_count == 1);
     541
     542    dpdk_move_master_lcore(my_cpu-1);
     543
    472544#if DEBUG
    473545    dump_configuration();
     
    512584    char err[500];
    513585    err[0] = 0;
    514     int i;
    515586   
    516587    libtrace->format_data = (struct dpdk_format_data_t *)
     
    532603    FORMAT(libtrace)->wrap_count = 0;
    533604#endif
    534         for (i = 0;i < RTE_MAX_LCORE; i++) {
    535                 // Disabled by default
    536                 FORMAT(libtrace)->per_lcore[i].enabled = 0;
    537         }
    538605       
    539606    if (dpdk_init_enviroment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
     
    598665                                        return -1;
    599666                        }
     667        break;
    600668        }
    601669        return -1;
     
    868936}
    869937
    870 /* Attach memory to the port and start the port or restart the ports.
    871  */
    872 static int dpdk_start_port_queues (libtrace_t *libtrace, struct dpdk_format_data_t * format_data, char *err, int errlen, uint16_t rx_queues){
     938/* Attach memory to the port and start (or restart) the port/s.
     939 */
     940static int dpdk_start_port_queues (libtrace_t *libtrace, struct dpdk_format_data_t *format_data, char *err, int errlen, uint16_t rx_queues){
    873941    int ret, i; /* Check return values for errors */
    874942    struct rte_eth_link link_info; /* Wait for link */
     
    900968
    901969        /* Create the mbuf pool, which is the place our packets are allocated
    902          * from - TODO figure out if there is is a free function (I cannot see one) 
     970         * from - TODO figure out if there is is a free function (I cannot see one)
    903971         * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
    904972         * allocate however that extra 1 packet is not used.
     
    9681036                                                                format_data->nb_rx_buf, SOCKET_ID_ANY,
    9691037                                                                &rx_conf, format_data->pktmbuf_pool);
     1038        /* Init per_thread data structures */
     1039        format_data->per_lcore[i].port = format_data->port;
     1040        format_data->per_lcore[i].queue_id = i;
     1041
    9701042                if (ret < 0) {
    9711043                        snprintf(err, errlen, "Intel DPDK - Cannot configure RX queue on port"
     
    9771049   
    9781050#if DEBUG
    979     printf("Doing start device\n");
     1051    fprintf(stderr, "Doing start device\n");
    9801052#endif 
    9811053    /* Start device */
     
    10321104}
    10331105
    1034 static inline size_t dpdk_get_max_rx_queues(uint8_t port_id) {
     1106static inline size_t dpdk_get_max_rx_queues (uint8_t port_id) {
    10351107    struct rte_eth_dev_info dev_info;
    1036     rte_eth_dev_info_get(0, &dev_info);
     1108    rte_eth_dev_info_get(port_id, &dev_info);
    10371109    return dev_info.max_rx_queues;
     1110}
     1111
     1112static inline size_t dpdk_processor_count () {
     1113    long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
     1114    if (nb_cpu <= 0)
     1115        return 1;
     1116    else
     1117        return (size_t) nb_cpu;
    10381118}
    10391119
    10401120static int dpdk_pstart_input (libtrace_t *libtrace) {
    10411121    char err[500];
    1042     int enabled_lcore_count = 0, i=0;
     1122    int i=0, phys_cores=0;
    10431123    int tot = libtrace->perpkt_thread_count;
    10441124    err[0] = 0;
    1045        
    1046         libtrace->perpkt_thread_count;
    1047        
    1048         for (i = 0; i < RTE_MAX_LCORE; i++)
    1049         {
    1050                 if (rte_lcore_is_enabled(i))
    1051                         enabled_lcore_count++;
    1052         }
    1053        
    1054         tot = MIN(libtrace->perpkt_thread_count, enabled_lcore_count);
    1055         tot = MIN(tot, dpdk_get_max_rx_queues(FORMAT(libtrace)->port));
    1056         fprintf(stderr, "Running pstart DPDK %d %d %d %d\n", tot, libtrace->perpkt_thread_count, enabled_lcore_count, rte_lcore_count());
     1125
     1126    if (rte_lcore_id() != rte_get_master_lcore())
     1127        fprintf(stderr, "Warning dpdk_pstart_input should be called from the master DPDK thread!\n");
     1128
     1129    // If the master is not on the last thread we move it there
     1130    if (rte_get_master_lcore() != RTE_MAX_LCORE - 1) {
     1131        // Consider error handling here
     1132        dpdk_move_master_lcore(RTE_MAX_LCORE - 1) == -1;
     1133    }
     1134
     1135    // Don't exceed the number of cores in the system/detected by dpdk
     1136    // We don't have to force this but performance wont be good if we don't
     1137    for (i = 0; i < RTE_MAX_LCORE; ++i) {
     1138        if (lcore_config[i].detected) {
     1139            if (rte_lcore_is_enabled(i))
     1140                fprintf(stderr, "Found core %d already in use!\n", i);
     1141            else
     1142                phys_cores++;
     1143        }
     1144    }
     1145
     1146        tot = MIN(libtrace->perpkt_thread_count, dpdk_get_max_rx_queues(FORMAT(libtrace)->port));
     1147    tot = MIN(tot, phys_cores);
     1148
     1149        fprintf(stderr, "Running pstart DPDK tot=%d req=%d phys=%d\n", tot, libtrace->perpkt_thread_count, phys_cores);
    10571150       
    10581151    if (dpdk_start_port_queues(libtrace, FORMAT(libtrace), err, sizeof(err), tot) != 0) {
     
    10621155        return -1;
    10631156    }
    1064    
     1157
     1158    // Make sure we only start the number that we should
     1159    libtrace->perpkt_thread_count = tot;
    10651160    return 0;
    1066     return tot;
     1161}
     1162
     1163
     1164/**
     1165 * Register a thread with the DPDK system,
     1166 * When we start DPDK in parallel libtrace we move the 'main thread' to the
     1167 * MAXIMUM CPU core slot (32) and remove any affinity restrictions DPDK
     1168 * gives it.
     1169 *
     1170 * We then allow a mapper thread to be started on every real core as DPDK would
     1171 * we also bind these to the corresponding CPU cores.
     1172 *
     1173 * @param libtrace A pointer to the trace
     1174 * @param reading True if the thread will be used to read packets, i.e. will
     1175 *                call pread_packet(), false if thread used to process packet
     1176 *                in any other manner including statistics functions.
     1177 */
     1178static int dpdk_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t, bool reading)
     1179{
     1180    struct rte_config *cfg = rte_eal_get_configuration();
     1181    int i;
     1182    int new_id = -1;
     1183
     1184    // If 'reading packets' fill in cores from 0 up and bind affinity
     1185    // otherwise start from the MAX core (which is also the master) and work backwards
     1186    // in this case physical cores on the system will not exist so we don't bind
     1187    // these to any particular physical core
     1188    if (reading) {
     1189        for (i = 0; i < RTE_MAX_LCORE; ++i) {
     1190            if (!rte_lcore_is_enabled(i)) {
     1191                new_id = i;
     1192                if (!lcore_config[i].detected)
     1193                    fprintf(stderr, "Warning the number of 'reading' threads exceed cores on machine!!\n");
     1194                break;
     1195            }
     1196        }
     1197    } else {
     1198        for (i = RTE_MAX_LCORE-1; i >= 0; --i) {
     1199            if (!rte_lcore_is_enabled(i)) {
     1200                new_id = i;
     1201                break;
     1202            }
     1203        }
     1204    }
     1205
     1206    if (new_id == -1) {
     1207        assert(cfg->lcore_count == RTE_MAX_LCORE);
     1208        // TODO proper libtrace style error here!!
     1209        fprintf(stderr, "Too many threads for DPDK!!\n");
     1210        return -1;
     1211    }
     1212
     1213    // Enable the core in global DPDK structs
     1214    cfg->lcore_role[new_id] = ROLE_RTE;
     1215    cfg->lcore_count++;
     1216    // Set TLS to reflect our new number
     1217    assert(rte_lcore_id() == 0); // I think new threads are going get a default thread number of 0
     1218    fprintf(stderr, "original id%d", rte_lcore_id());
     1219    RTE_PER_LCORE(_lcore_id) = new_id;
     1220    fprintf(stderr, " new id%d\n", rte_lcore_id());
     1221
     1222    if (reading) {
     1223        // Set affinity bind to corresponding core
     1224        cpu_set_t cpuset;
     1225        CPU_ZERO(&cpuset);
     1226        CPU_SET(rte_lcore_id(), &cpuset);
     1227        i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
     1228        if (i != 0) {
     1229            fprintf(stderr, "Warning pthread_setaffinity_np failed\n");
     1230            return -1;
     1231        }
     1232    }
     1233
     1234    // Map our TLS to the thread data
     1235    if (reading) {
     1236        if(t->type == THREAD_PERPKT) {
     1237            t->format_data = &FORMAT(libtrace)->per_lcore[t->perpkt_num];
     1238        } else {
     1239            t->format_data = &FORMAT(libtrace)->per_lcore[0];
     1240        }
     1241    }
     1242}
     1243
     1244
     1245/**
     1246 * Unregister a thread with the DPDK system.
     1247 *
     1248 * Only previously registered threads should be calling this just before
     1249 * they are destroyed.
     1250 */
     1251static int dpdk_punregister_thread(libtrace_t libtrace, libtrace_thread_t *t UNUSED)
     1252{
     1253    struct rte_config *cfg = rte_eal_get_configuration();
     1254
     1255    assert(rte_lcore_id() >= 0 && rte_lcore_id() < RTE_MAX_LCORE);
     1256
     1257    // Skip if master!!
     1258    if (rte_lcore_id() == rte_get_master_lcore()) {
     1259        fprintf(stderr, "INFO: we are skipping unregistering the master lcore\n");
     1260        return 0;
     1261    }
     1262
     1263    // Disable this core in global DPDK structs
     1264    cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
     1265    cfg->lcore_count--;
     1266    RTE_PER_LCORE(_lcore_id) = -1; // Might make the world burn if used again
     1267    assert(cfg->lcore_count >= 1); // We cannot unregister the master LCORE!!
     1268    return 0;
    10671269}
    10681270
     
    14421644    return -1;
    14431645}
    1444 libtrace_thread_t * get_thread_table(libtrace_t *libtrace);
    1445 static int dpdk_pread_packet (libtrace_t *libtrace, libtrace_packet_t *packet) {
     1646
     1647static int dpdk_pread_packet (libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packet) {
    14461648    int nb_rx; /* Number of rx packets we've recevied */
    14471649    struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) */
     
    14501652    if (packet->buffer != NULL) {
    14511653        /* Buffer is owned by DPDK */
    1452         if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
     1654        if ( packet->buf_control == TRACE_CTRL_EXTERNAL) {
    14531655            rte_pktmbuf_free(packet->buffer);
    14541656            packet->buffer = NULL;
     
    14671669    while (1) {
    14681670        /* Poll for a single packet */
    1469         nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port,
    1470                             get_thread_table_num(libtrace), pkts_burst, 1);
     1671        nb_rx = rte_eth_rx_burst(PERPKT_FORMAT(t)->port,
     1672                            PERPKT_FORMAT(t)->queue_id, pkts_burst, 1);
    14711673        if (nb_rx > 0) { /* Got a packet - otherwise we keep spining */
    14721674                        //fprintf(stderr, "Doing P READ PACKET port=%d q=%d\n", (int) FORMAT(libtrace)->port, (int) get_thread_table_num(libtrace));
     
    14741676        }
    14751677        // Check the message queue this could be (Well it shouldn't but anyway) be less than 0
    1476         if (libtrace_message_queue_count(&(get_thread_table(libtrace)->messages)) > 0) {
     1678        if (libtrace_message_queue_count(&t->messages) > 0) {
    14771679                        printf("Extra message yay");
    14781680                        return -2;
     
    16871889        dpdk_pause_input, /* ppause */
    16881890        dpdk_fin_input, /* p_fin */
    1689         dpdk_pconfig_input /* pconfig_input */
     1891        dpdk_pconfig_input, /* pconfig_input */
     1892    dpdk_pregister_thread, /* pregister_thread */
     1893    dpdk_punregister_thread /* unpregister_thread */
    16901894};
    16911895
Note: See TracChangeset for help on using the changeset viewer.