Changeset 4ce6fca


Ignore:
Timestamp:
03/09/15 18:34:04 (6 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
2330f32
Parents:
6a082f8
Message:

Update DPDK to maintain memory per stream.

This means a memory pool can not become exhausted before a stream has a chance to read.

Also refactors the logic of registering a thread into smaller functions. Allowing some
checks to be performed earlier.

I've updated the dpdk to also use its own lock since structures could be shared
by multiple traces.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • lib/format_dpdk.c

    r6c09048 r4ce6fca  
    266266#endif
    267267
     268static pthread_mutex_t dpdk_lock = PTHREAD_MUTEX_INITIALIZER;
     269/* Memory pools Per NUMA node */
     270static struct rte_mempool * mem_pools[4][RTE_MAX_LCORE] = {{0}};
     271
    268272/* As per Intel 82580 specification - mismatch in 82580 datasheet
    269273 * it states ts is stored in Big Endian, however its actually Little */
     
    283287        uint16_t queue_id;
    284288        uint64_t ts_last_sys; /* System timestamp of our most recent packet in nanoseconds */
     289        struct rte_mempool *mempool;
     290        int lcore;
    285291#if HAS_HW_TIMESTAMPS_82580
    286292        /* Timestamping only relevent to RX */
     
    10671073}
    10681074
     1075/** Reserve a DPDK lcore ID for a thread globally.
     1076 *
     1077 * @param real If true allocate a real lcore, otherwise allocate a core which
     1078 * does not exist on the local machine.
     1079 * @param socket the prefered NUMA socket - only used if a real core is requested
     1080 * @return a valid core, which can later be used with dpdk_register_lcore() or a
     1081 * -1 if have run out of cores.
     1082 *
     1083 * If any thread is reading or freeing packets we need to register it here
     1084 * due to TLS caches in the memory pools.
     1085 */
     1086static int dpdk_reserve_lcore(bool real, int socket) {
     1087        int new_id = -1;
     1088        int i;
     1089        struct rte_config *cfg = rte_eal_get_configuration();
     1090
     1091        pthread_mutex_lock(&dpdk_lock);
     1092        /* If 'reading packets' fill in cores from 0 up and bind affinity
     1093         * otherwise start from the MAX core (which is also the master) and work backwards
     1094         * in this case physical cores on the system will not exist so we don't bind
     1095         * these to any particular physical core */
     1096        if (real) {
     1097#if HAVE_LIBNUMA
     1098                for (i = 0; i < RTE_MAX_LCORE; ++i) {
     1099                        if (!rte_lcore_is_enabled(i) && numa_node_of_cpu(i) == socket) {
     1100                                new_id = i;
     1101                                if (!lcore_config[i].detected)
     1102                                        new_id = -1;
     1103                                break;
     1104                        }
     1105                }
     1106#endif
     1107                /* Retry without the the numa restriction */
     1108                if (new_id == -1) {
     1109                        for (i = 0; i < RTE_MAX_LCORE; ++i) {
     1110                                if (!rte_lcore_is_enabled(i)) {
     1111                                        new_id = i;
     1112                                        if (!lcore_config[i].detected)
     1113                                                fprintf(stderr, "Warning the"
     1114                                                        " number of 'reading' "
     1115                                                        "threads exceed cores\n");
     1116                                        break;
     1117                                }
     1118                        }
     1119                }
     1120        } else {
     1121                for (i = RTE_MAX_LCORE-1; i >= 0; --i) {
     1122                        if (!rte_lcore_is_enabled(i)) {
     1123                                new_id = i;
     1124                                break;
     1125                        }
     1126                }
     1127        }
     1128
     1129        if (new_id != -1) {
     1130                /* Enable the core in global DPDK structs */
     1131                cfg->lcore_role[new_id] = ROLE_RTE;
     1132                cfg->lcore_count++;
     1133        }
     1134
     1135        pthread_mutex_unlock(&dpdk_lock);
     1136        return new_id;
     1137}
     1138
     1139/** Register a thread as a lcore
     1140 * @param libtrace any error is set against libtrace on exit
     1141 * @param real If this is a true lcore we will bind its affinty to the
     1142 * requested core.
     1143 * @param lcore The lcore as retrieved from dpdk_reserve_lcore()
     1144 * @return 0, if successful otherwise -1 if an error occured (details are stored
     1145 * in libtrace)
     1146 *
     1147 * @note This must be called from the thread being registered.
     1148 */
     1149static int dpdk_register_lcore(libtrace_t *libtrace, bool real, int lcore) {
     1150        int ret;
     1151        RTE_PER_LCORE(_lcore_id) = lcore;
     1152
     1153        /* Set affinity bind to corresponding core */
     1154        if (real) {
     1155                cpu_set_t cpuset;
     1156                CPU_ZERO(&cpuset);
     1157                CPU_SET(rte_lcore_id(), &cpuset);
     1158                ret = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
     1159                if (ret != 0) {
     1160                        trace_set_err(libtrace, errno, "Warning "
     1161                                      "pthread_setaffinity_np failed");
     1162                        return -1;
     1163                }
     1164        }
     1165
     1166        return 0;
     1167}
     1168
     1169/** Allocates a new dpdk packet buffer memory pool.
     1170 *
     1171 * @param n The number of threads
     1172 * @param pkt_size The packet size we need ot store
     1173 * @param socket_id The NUMA socket id
     1174 * @param A new mempool, if NULL query the DPDK library for the error code
     1175 * see rte_mempool_create() documentation.
     1176 *
     1177 * This allocates a new pool or recycles an existing memory pool.
     1178 * Call dpdk_free_memory() to free the memory.
     1179 * We cannot delete memory so instead we store the pools, allowing them to be
     1180 * re-used.
     1181 */
     1182static struct rte_mempool *dpdk_alloc_memory(unsigned n,
     1183                                             unsigned pkt_size,
     1184                                             int socket_id) {
     1185        struct rte_mempool *ret;
     1186        size_t j,k;
     1187        char name[MEMPOOL_NAME_LEN];
     1188
     1189        /* Add on packet size overheads */
     1190        pkt_size += sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
     1191
     1192        pthread_mutex_lock(&dpdk_lock);
     1193
     1194        if (socket_id == SOCKET_ID_ANY || socket_id > 4) {
     1195                /* Best guess go for zero */
     1196                socket_id = 0;
     1197        }
     1198
     1199        /* Find a valid pool */
     1200        for (j = 0; j < RTE_MAX_LCORE && mem_pools[socket_id][j]; ++j) {
     1201                if (mem_pools[socket_id][j]->size >= n &&
     1202                    mem_pools[socket_id][j]->elt_size >= pkt_size) {
     1203                        break;
     1204                }
     1205        }
     1206
     1207        /* Find the end (+1) of the list */
     1208        for (k = j; k < RTE_MAX_LCORE && mem_pools[socket_id][k]; ++k) {}
     1209
     1210        if (mem_pools[socket_id][j]) {
     1211                ret = mem_pools[socket_id][j];
     1212                mem_pools[socket_id][j] = mem_pools[socket_id][k-1];
     1213                mem_pools[socket_id][k-1] = NULL;
     1214                mem_pools[socket_id][j] = NULL;
     1215        } else {
     1216                static uint32_t test = 10;
     1217                test++;
     1218                snprintf(name, MEMPOOL_NAME_LEN,
     1219                         "libtrace_pool_%"PRIu32, test);
     1220
     1221                ret = rte_mempool_create(name, n, pkt_size,
     1222                                         128, sizeof(struct rte_pktmbuf_pool_private),
     1223                                         rte_pktmbuf_pool_init, NULL,
     1224                                         rte_pktmbuf_init, NULL,
     1225                                         socket_id, 0);
     1226        }
     1227
     1228        pthread_mutex_unlock(&dpdk_lock);
     1229        return ret;
     1230}
     1231
     1232/** Stores the memory against the DPDK library.
     1233 *
     1234 * @param mempool The mempool to free
     1235 * @param socket_id The NUMA socket this mempool was allocated upon.
     1236 *
     1237 * Because we cannot free a memory pool, we verify it's full (i.e. unused) and
     1238 * store the memory shared globally against the format.
     1239 */
     1240static void dpdk_free_memory(struct rte_mempool *mempool, int socket_id) {
     1241        size_t i;
     1242        pthread_mutex_lock(&dpdk_lock);
     1243
     1244        /* We should have all entries back in the mempool */
     1245        rte_mempool_audit(mempool);
     1246        if (!rte_mempool_full(mempool)) {
     1247                fprintf(stderr, "DPDK memory pool not empty %d of %d, please "
     1248                        "free all packets before finishing a trace\n",
     1249                        rte_mempool_count(mempool), mempool->size);
     1250        }
     1251
     1252        /* Find the end (+1) of the list */
     1253        for (i = 0; i < RTE_MAX_LCORE && mem_pools[socket_id][i]; ++i) {}
     1254
     1255        if (i >= RTE_MAX_LCORE) {
     1256                fprintf(stderr, "Too many memory pools, dropping this one\n");
     1257        } else {
     1258                mem_pools[socket_id][i] = mempool;
     1259        }
     1260
     1261        pthread_mutex_unlock(&dpdk_lock);
     1262}
     1263
    10691264/* Attach memory to the port and start (or restart) the port/s.
    10701265 */
     
    11121307                fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name);
    11131308#endif
    1114                 format_data->pktmbuf_pool =
    1115                                 rte_mempool_create(format_data->mempool_name,
    1116                                                    (format_data->nb_rx_buf * rx_queues + format_data->nb_tx_buf + 1)*2,
    1117                                                    format_data->snaplen + sizeof(struct rte_mbuf)
    1118                                                    + RTE_PKTMBUF_HEADROOM,
    1119                                                    128, sizeof(struct rte_pktmbuf_pool_private),
    1120                                                    rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL,
    1121                                                    format_data->nic_numa_node, 0);
     1309                format_data->pktmbuf_pool = dpdk_alloc_memory(format_data->nb_tx_buf*2,
     1310                                                              format_data->snaplen,
     1311                                                              format_data->nic_numa_node);
    11221312
    11231313                if (format_data->pktmbuf_pool == NULL) {
     
    11661356        /* Attach memory to our RX queues */
    11671357        for (i=0; i < rx_queues; i++) {
     1358                dpdk_per_stream_t *stream;
    11681359#if DEBUG
    1169                 fprintf(stderr, "Doing queue configure %d\n", i);
    1170 #endif
    1171 
    1172                 dpdk_per_stream_t *stream;
     1360                fprintf(stderr, "Configuring queue %d\n", i);
     1361#endif
     1362
    11731363                /* Add storage for the stream */
    11741364                if (libtrace_list_get_size(format_data->per_stream) <= (size_t) i)
    11751365                        libtrace_list_push_back(format_data->per_stream, &empty_stream);
    1176 
    11771366                stream = libtrace_list_get_index(format_data->per_stream, i)->data;
    11781367                stream->queue_id = i;
     1368
     1369                /* TODO we don't use this in the single threaded framework -
     1370                 * So we should not reserve this */
     1371                stream->lcore = dpdk_reserve_lcore(true, format_data->nic_numa_node);
     1372
     1373                if (stream->lcore == -1) {
     1374                        snprintf(err, errlen, "Intel DPDK - Failed to reserve a lcore"
     1375                                 ". Too many threads?");
     1376                        return -1;
     1377                }
     1378
     1379                if (stream->mempool == NULL) {
     1380                        stream->mempool = dpdk_alloc_memory(
     1381                                                  format_data->nb_rx_buf*2,
     1382                                                  format_data->snaplen,
     1383                                                  rte_lcore_to_socket_id(stream->lcore));
     1384
     1385                        if (stream->mempool == NULL) {
     1386                                snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
     1387                                         "pool failed: %s", strerror(rte_errno));
     1388                                return -1;
     1389                        }
     1390                }
    11791391
    11801392                /* Initialise the RX queue with some packets from memory */
     
    11841396                                             format_data->nic_numa_node,
    11851397                                             &rx_conf,
    1186                                              format_data->pktmbuf_pool);
     1398                                             stream->mempool);
    11871399                if (ret < 0) {
    11881400                        snprintf(err, errlen, "Intel DPDK - Cannot configure"
     
    13311543static int dpdk_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t, bool reading)
    13321544{
    1333         struct rte_config *cfg = rte_eal_get_configuration();
    1334         int i;
    1335         int new_id = -1;
    1336 
    1337         /* If 'reading packets' fill in cores from 0 up and bind affinity
    1338          * otherwise start from the MAX core (which is also the master) and work backwards
    1339          * in this case physical cores on the system will not exist so we don't bind
    1340          * these to any particular physical core */
    1341         pthread_mutex_lock(&libtrace->libtrace_lock);
    1342         if (reading) {
    1343 #if HAVE_LIBNUMA
    1344                 for (i = 0; i < RTE_MAX_LCORE; ++i) {
    1345                         if (!rte_lcore_is_enabled(i) && numa_node_of_cpu(i) == FORMAT(libtrace)->nic_numa_node) {
    1346                                 new_id = i;
    1347                                 if (!lcore_config[i].detected)
    1348                                         new_id = -1;
    1349                                 break;
    1350                         }
    1351                 }
    1352 #endif
    1353                 /* Retry without the the numa restriction */
    1354                 if (new_id == -1) {
    1355                         for (i = 0; i < RTE_MAX_LCORE; ++i) {
    1356                                 if (!rte_lcore_is_enabled(i)) {
    1357                                         new_id = i;
    1358                                         if (!lcore_config[i].detected)
    1359                                                 fprintf(stderr, "Warning the"
    1360                                                         " number of 'reading' "
    1361                                                         "threads exceed cores\n");
    1362                                         break;
    1363                                 }
    1364                         }
    1365                 }
    1366         } else {
    1367                 for (i = RTE_MAX_LCORE-1; i >= 0; --i) {
    1368                         if (!rte_lcore_is_enabled(i)) {
    1369                                 new_id = i;
    1370                                 break;
    1371                         }
    1372                 }
    1373         }
    1374 
    1375         if (new_id == -1) {
    1376                 assert(cfg->lcore_count == RTE_MAX_LCORE);
    1377                 trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Too many threads"
    1378                               " for DPDK");
    1379                 pthread_mutex_unlock(&libtrace->libtrace_lock);
    1380                 return -1;
    1381         }
    1382 
    1383         /* Enable the core in global DPDK structs */
    1384         cfg->lcore_role[new_id] = ROLE_RTE;
    1385         cfg->lcore_count++;
    1386          /* I think new threads are going get a default thread number of 0 */
    1387         assert(rte_lcore_id() == 0);
    1388         fprintf(stderr, "original id%d", rte_lcore_id());
    1389         RTE_PER_LCORE(_lcore_id) = new_id;
     1545#if DEBUG
    13901546        char name[99];
    13911547        pthread_getname_np(pthread_self(),
    13921548                           name, sizeof(name));
    1393 
    1394         fprintf(stderr, "%s new id%d\n", name, rte_lcore_id());
    1395 
     1549#endif
    13961550        if (reading) {
    1397                 /* Set affinity bind to corresponding core */
    1398                 cpu_set_t cpuset;
    1399                 CPU_ZERO(&cpuset);
    1400                 CPU_SET(rte_lcore_id(), &cpuset);
    1401                 i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
    1402                 if (i != 0) {
    1403                         trace_set_err(libtrace, errno, "Warning "
    1404                                       "pthread_setaffinity_np failed");
    1405                         pthread_mutex_unlock(&libtrace->libtrace_lock);
    1406                         return -1;
    1407                 }
    1408         }
    1409 
    1410         /* Map our TLS to the thread data */
    1411         if (reading) {
     1551                dpdk_per_stream_t *stream;
     1552                /* Attach our thread */
    14121553                if(t->type == THREAD_PERPKT) {
    14131554                        t->format_data = libtrace_list_get_index(FORMAT(libtrace)->per_stream, t->perpkt_num)->data;
     
    14201561                        t->format_data = FORMAT_DATA_FIRST(libtrace);
    14211562                }
    1422         }
    1423         pthread_mutex_unlock(&libtrace->libtrace_lock);
     1563                stream = t->format_data;
     1564#if DEBUG
     1565                fprintf(stderr, "%s new id memory:%s cpu-core:%d\n", name, stream->mempool->name, rte_lcore_id());
     1566#endif
     1567                return dpdk_register_lcore(libtrace, true, stream->lcore);
     1568        } else {
     1569                int lcore = dpdk_reserve_lcore(reading, 0);
     1570                if (lcore == -1) {
     1571                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Too many threads"
     1572                                      " for DPDK");
     1573                        return -1;
     1574                }
     1575#if DEBUG
     1576                fprintf(stderr, "%s new id cpu-core:%d\n", name, rte_lcore_id());
     1577#endif
     1578                return dpdk_register_lcore(libtrace, false, lcore);
     1579        }
     1580
    14241581        return 0;
    14251582}
     
    14361593
    14371594        assert(rte_lcore_id() < RTE_MAX_LCORE);
    1438         pthread_mutex_lock(&libtrace->libtrace_lock);
     1595        pthread_mutex_lock(&dpdk_lock);
    14391596        /* Skip if master */
    14401597        if (rte_lcore_id() == rte_get_master_lcore()) {
    14411598                fprintf(stderr, "INFO: we are skipping unregistering the master lcore\n");
    1442                 pthread_mutex_unlock(&libtrace->libtrace_lock);
     1599                pthread_mutex_unlock(&dpdk_lock);
    14431600                return;
    14441601        }
     
    14491606        RTE_PER_LCORE(_lcore_id) = -1; // Might make the world burn if used again
    14501607        assert(cfg->lcore_count >= 1); // We cannot unregister the master LCORE!!
    1451         pthread_mutex_unlock(&libtrace->libtrace_lock);
     1608        pthread_mutex_unlock(&dpdk_lock);
    14521609        return;
    14531610}
     
    15231680
    15241681static int dpdk_fin_input(libtrace_t * libtrace) {
     1682        libtrace_list_node_t * n;
    15251683        /* Free our memory structures */
    15261684        if (libtrace->format_data != NULL) {
    1527                 /* Close the device completely, device cannot be restarted */
     1685
    15281686                if (FORMAT(libtrace)->port != 0xFF)
    15291687                        rte_eth_dev_callback_unregister(FORMAT(libtrace)->port,
     
    15311689                                                        dpdk_lsc_callback,
    15321690                                                        FORMAT(libtrace));
     1691                /* Close the device completely, device cannot be restarted */
    15331692                rte_eth_dev_close(FORMAT(libtrace)->port);
     1693
     1694                dpdk_free_memory(FORMAT(libtrace)->pktmbuf_pool,
     1695                                 FORMAT(libtrace)->nic_numa_node);
     1696
     1697                for (n = FORMAT(libtrace)->per_stream->head; n ; n = n->next) {
     1698                        dpdk_per_stream_t * stream = n->data;
     1699                        if (stream->mempool)
     1700                                dpdk_free_memory(stream->mempool,
     1701                                                 rte_lcore_to_socket_id(stream->lcore));
     1702                }
     1703
    15341704                libtrace_list_deinit(FORMAT(libtrace)->per_stream);
    15351705                /* filter here if we used it */
     
    19042074        int nb_rx; /* Number of rx packets we've recevied */
    19052075        struct rte_mbuf* pkts_burst[nb_packets]; /* Array of pointer(s) */
    1906         size_t i;
     2076        int i;
    19072077        dpdk_per_stream_t *stream = t->format_data;
    19082078
Note: See TracChangeset for help on using the changeset viewer.