Changeset ddfc46d


Ignore:
Timestamp:
02/26/15 17:37:59 (6 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
b81f0a1
Parents:
43d3e73
Message:

Update DPDK to be stream based and remove some duplicate code

Also includes a whitespace cleanup spaces -> tabs to
be inline with other files.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • lib/format_dpdk.c

    r10c47a0 rddfc46d  
    204204#define PERPKT_FORMAT(x) ((struct dpdk_per_lcore_t*)(x->format_data))
    205205
     206#define FORMAT_DATA_HEAD(x) FORMAT(x)->per_stream->head
     207#define FORMAT_DATA_FIRST(x) ((dpdk_per_stream_t *)FORMAT_DATA_HEAD(x)->data)
     208
    206209#define TV_TO_NS(tv) ((uint64_t) tv.tv_sec*1000000000ull + \
    207210                        (uint64_t) tv.tv_usec*1000ull)
     
    266269 * it states ts is stored in Big Endian, however its actually Little */
    267270struct hw_timestamp_82580 {
    268     uint64_t reserved;
    269     uint64_t timestamp; /* Little Endian only lower 40 bits are valid */
     271        uint64_t reserved;
     272        uint64_t timestamp; /* Little Endian only lower 40 bits are valid */
    270273};
    271274
    272275enum paused_state {
    273     DPDK_NEVER_STARTED,
    274     DPDK_RUNNING,
    275     DPDK_PAUSED,
     276        DPDK_NEVER_STARTED,
     277        DPDK_RUNNING,
     278        DPDK_PAUSED,
    276279};
    277280
    278 struct dpdk_per_lcore_t
     281struct dpdk_per_stream_t
    279282{
    280283        uint16_t queue_id;
    281         uint8_t port;
    282284        uint64_t ts_last_sys; /* System timestamp of our most recent packet in nanoseconds */
    283285#if HAS_HW_TIMESTAMPS_82580
     
    286288        uint32_t wrap_count; /* Number of times the NIC clock has wrapped around completely */
    287289#endif
    288 };
     290} ALIGN_STRUCT(CACHE_LINE_SIZE);
     291
     292typedef struct dpdk_per_stream_t dpdk_per_stream_t;
    289293
    290294/* Used by both input and output however some fields are not used
    291295 * for output */
    292296struct dpdk_format_data_t {
    293     int8_t promisc; /* promiscuous mode - RX only */
    294     uint8_t port; /* Always 0 we only whitelist a single port - Shared TX & RX */
    295     uint8_t nb_ports; /* Total number of usable ports on system should be 1 */
    296     uint8_t paused; /* See paused_state */
    297     uint16_t queue_id; /* Always 0 we use a single queue - Shared TX & RX */
    298     uint16_t link_speed; /* Link speed 10,100,1000,10000 etc. */
    299     int snaplen; /* The snap length for the capture - RX only */
    300     /* We always have to setup both rx and tx queues even if we don't want them */
    301     int nb_rx_buf; /* The number of packet buffers in the rx ring */
    302     int nb_tx_buf; /* The number of packet buffers in the tx ring */
    303     int nic_numa_node; /* The NUMA node that the NIC is attached to */
    304     struct rte_mempool * pktmbuf_pool; /* Our packet memory pool */
     297        int8_t promisc; /* promiscuous mode - RX only */
     298        uint8_t port; /* Always 0 we only whitelist a single port - Shared TX & RX */
     299        uint8_t nb_ports; /* Total number of usable ports on system should be 1 */
     300        uint8_t paused; /* See paused_state */
     301        uint16_t link_speed; /* Link speed 10,100,1000,10000 etc. */
     302        int snaplen; /* The snap length for the capture - RX only */
     303        /* We always have to setup both rx and tx queues even if we don't want them */
     304        int nb_rx_buf; /* The number of packet buffers in the rx ring */
     305        int nb_tx_buf; /* The number of packet buffers in the tx ring */
     306        int nic_numa_node; /* The NUMA node that the NIC is attached to */
     307        struct rte_mempool * pktmbuf_pool; /* Our packet memory pool */
    305308#if DPDK_USE_BLACKLIST
    306     struct rte_pci_addr blacklist[BLACK_LIST_SIZE]; /* Holds our device blacklist */
     309        struct rte_pci_addr blacklist[BLACK_LIST_SIZE]; /* Holds our device blacklist */
    307310        unsigned int nb_blacklist; /* Number of blacklist items in are valid */
    308311#endif
    309     char mempool_name[MEMPOOL_NAME_LEN]; /* The name of the mempool that we are using */
    310     uint8_t rss_key[40]; // This is the RSS KEY
    311     /* To improve performance we always batch reading packets, in a burst */
    312     struct rte_mbuf* burst_pkts[BURST_SIZE];
    313     int burst_size; /* The total number read in the burst */
    314     int burst_offset; /* The offset we are into the burst */
    315         // DPDK normally seems to have a limit of 8 queues for a given card
    316         struct dpdk_per_lcore_t per_lcore[RTE_MAX_LCORE];
     312        char mempool_name[MEMPOOL_NAME_LEN]; /* The name of the mempool that we are using */
     313        uint8_t rss_key[40]; // This is the RSS KEY
     314        /* To improve single-threaded performance we always batch reading
     315         * packets, in a burst, otherwise the parallel library does this for us */
     316        struct rte_mbuf* burst_pkts[BURST_SIZE];
     317        int burst_size; /* The total number read in the burst */
     318        int burst_offset; /* The offset we are into the burst */
     319
     320        /* Our parallel streams */
     321        libtrace_list_t *per_stream;
    317322};
    318323
    319324enum dpdk_addt_hdr_flags {
    320     INCLUDES_CHECKSUM = 0x1,
    321     INCLUDES_HW_TIMESTAMP = 0x2, /* Used with 82580 driver */
     325        INCLUDES_CHECKSUM = 0x1,
     326        INCLUDES_HW_TIMESTAMP = 0x2, /* Used with 82580 driver */
    322327};
    323328
     
    338343 */
    339344struct dpdk_addt_hdr {
    340     uint64_t timestamp;
    341     uint8_t flags;
    342     uint8_t direction;
    343     uint8_t reserved1;
    344     uint8_t reserved2;
    345     uint32_t cap_len; /* The size to say the capture is */
     345        uint64_t timestamp;
     346        uint8_t flags;
     347        uint8_t direction;
     348        uint8_t reserved1;
     349        uint8_t reserved2;
     350        uint32_t cap_len; /* The size to say the capture is */
    346351};
    347352
     
    412417 */
    413418static int parse_pciaddr(char * str, struct rte_pci_addr * addr, long * core) {
    414     int matches;
    415     assert(str);
    416     matches = sscanf(str, "%4"SCNx16":%2"SCNx8":%2"SCNx8".%2"SCNx8"-%ld",
    417                      &addr->domain, &addr->bus, &addr->devid, &addr->function, core);
    418     if (matches >= 4) {
    419         return 0;
    420     } else {
    421         return -1;
    422     }
     419        int matches;
     420        assert(str);
     421        matches = sscanf(str, "%4"SCNx16":%2"SCNx8":%2"SCNx8".%2"SCNx8"-%ld",
     422                         &addr->domain, &addr->bus, &addr->devid,
     423                         &addr->function, core);
     424        if (matches >= 4) {
     425                return 0;
     426        } else {
     427                return -1;
     428        }
    423429}
    424430
     
    456462static inline void dump_configuration()
    457463{
    458     struct rte_config * global_config;
    459     long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
    460 
    461     if (nb_cpu <= 0) {
    462         perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
    463         nb_cpu = 1; /* fallback to just 1 core */
    464     }
    465     if (nb_cpu > RTE_MAX_LCORE)
    466         nb_cpu = RTE_MAX_LCORE;
    467 
    468     global_config = rte_eal_get_configuration();
    469 
    470     if (global_config != NULL) {
    471         int i;
    472         fprintf(stderr, "Intel DPDK setup\n"
    473                "---Version      : %s\n"
    474                "---Master LCore : %"PRIu32"\n"
    475                "---LCore Count  : %"PRIu32"\n",
    476                rte_version(),
    477                global_config->master_lcore, global_config->lcore_count);
    478 
    479         for (i = 0 ; i < nb_cpu; i++) {
    480             fprintf(stderr, "   ---Core %d : %s\n", i,
    481                    global_config->lcore_role[i] == ROLE_RTE ? "on" : "off");
    482         }
    483 
    484         const char * proc_type;
    485         switch (global_config->process_type) {
    486             case RTE_PROC_AUTO:
    487                 proc_type = "auto";
    488                 break;
    489             case RTE_PROC_PRIMARY:
    490                 proc_type = "primary";
    491                 break;
    492             case RTE_PROC_SECONDARY:
    493                 proc_type = "secondary";
    494                 break;
    495             case RTE_PROC_INVALID:
    496                 proc_type = "invalid";
    497                 break;
    498             default:
    499                 proc_type = "something worse than invalid!!";
    500         }
    501         fprintf(stderr, "---Process Type : %s\n", proc_type);
    502     }
     464        struct rte_config * global_config;
     465        long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
     466
     467        if (nb_cpu <= 0) {
     468                perror("sysconf(_SC_NPROCESSORS_ONLN) failed."
     469                       " Falling back to the first core.");
     470                nb_cpu = 1; /* fallback to just 1 core */
     471        }
     472        if (nb_cpu > RTE_MAX_LCORE)
     473                nb_cpu = RTE_MAX_LCORE;
     474
     475        global_config = rte_eal_get_configuration();
     476
     477        if (global_config != NULL) {
     478                int i;
     479                fprintf(stderr, "Intel DPDK setup\n"
     480                        "---Version      : %s\n"
     481                        "---Master LCore : %"PRIu32"\n"
     482                        "---LCore Count  : %"PRIu32"\n",
     483                        rte_version(),
     484                        global_config->master_lcore, global_config->lcore_count);
     485
     486                for (i = 0 ; i < nb_cpu; i++) {
     487                        fprintf(stderr, "   ---Core %d : %s\n", i,
     488                                global_config->lcore_role[i] == ROLE_RTE ? "on" : "off");
     489                }
     490
     491                const char * proc_type;
     492                switch (global_config->process_type) {
     493                case RTE_PROC_AUTO:
     494                        proc_type = "auto";
     495                        break;
     496                case RTE_PROC_PRIMARY:
     497                        proc_type = "primary";
     498                        break;
     499                case RTE_PROC_SECONDARY:
     500                        proc_type = "secondary";
     501                        break;
     502                case RTE_PROC_INVALID:
     503                        proc_type = "invalid";
     504                        break;
     505                default:
     506                        proc_type = "something worse than invalid!!";
     507                }
     508                fprintf(stderr, "---Process Type : %s\n", proc_type);
     509        }
    503510
    504511}
     
    512519 * @return 0 is successful otherwise -1 on error.
    513520 */
    514 static inline int dpdk_move_master_lcore(size_t core) {
    515     struct rte_config *cfg = rte_eal_get_configuration();
    516     cpu_set_t cpuset;
    517     int i;
    518 
    519     assert (core < RTE_MAX_LCORE);
    520     assert (rte_get_master_lcore() == rte_lcore_id());
    521 
    522     if (core == rte_lcore_id())
     521static inline int dpdk_move_master_lcore(libtrace_t *libtrace, size_t core) {
     522        struct rte_config *cfg = rte_eal_get_configuration();
     523        cpu_set_t cpuset;
     524        int i;
     525
     526        assert (core < RTE_MAX_LCORE);
     527        assert (rte_get_master_lcore() == rte_lcore_id());
     528
     529        if (core == rte_lcore_id())
     530                return 0;
     531
     532        /* Make sure we are not overwriting someone else */
     533        assert(!rte_lcore_is_enabled(core));
     534
     535        /* Move the core */
     536        cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
     537        cfg->lcore_role[core] = ROLE_RTE;
     538        lcore_config[core].thread_id = lcore_config[rte_lcore_id()].thread_id;
     539        rte_eal_get_configuration()->master_lcore = core;
     540        RTE_PER_LCORE(_lcore_id) = core;
     541
     542        /* Now change the affinity, either mapped to a single core or all accepted */
     543        CPU_ZERO(&cpuset);
     544
     545        if (lcore_config[core].detected) {
     546                CPU_SET(core, &cpuset);
     547        } else {
     548                for (i = 0; i < RTE_MAX_LCORE; ++i) {
     549                        if (lcore_config[i].detected)
     550                                CPU_SET(i, &cpuset);
     551                }
     552        }
     553
     554        i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
     555        if (i != 0) {
     556                trace_set_err(libtrace, errno, "pthread_setaffinity_np failed\n");
     557                return -1;
     558        }
    523559        return 0;
    524 
    525     // Make sure we are not overwriting someone else
    526     assert(!rte_lcore_is_enabled(core));
    527 
    528     // Move the core
    529     cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
    530     cfg->lcore_role[core] = ROLE_RTE;
    531     lcore_config[core].thread_id = lcore_config[rte_lcore_id()].thread_id;
    532     rte_eal_get_configuration()->master_lcore = core;
    533     RTE_PER_LCORE(_lcore_id) = core;
    534 
    535     // Now change the affinity
    536     CPU_ZERO(&cpuset);
    537 
    538     if (lcore_config[core].detected) {
    539         CPU_SET(core, &cpuset);
    540     } else {
    541         for (i = 0; i < RTE_MAX_LCORE; ++i) {
    542             if (lcore_config[i].detected)
    543                 CPU_SET(i, &cpuset);
    544         }
    545     }
    546 
    547     i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
    548     if (i != 0) {
    549         // TODO proper libtrace style error here!!
    550         fprintf(stderr, "pthread_setaffinity_np failed\n");
    551         return -1;
    552     }
    553     return 0;
    554 }
    555 
     560}
    556561
    557562/**
     
    584589
    585590static inline int dpdk_init_environment(char * uridata, struct dpdk_format_data_t * format_data,
    586                                         char * err, int errlen) {
    587     int ret; /* Returned error codes */
    588     struct rte_pci_addr use_addr; /* The only address that we don't blacklist */
    589     char cpu_number[10] = {0}; /* The CPU mask we want to bind to */
    590     char mem_map[20] = {0}; /* The memory name */
    591     long nb_cpu; /* The number of CPUs in the system */
    592     long my_cpu; /* The CPU number we want to bind to */
    593     int i;
    594     struct rte_config *cfg = rte_eal_get_configuration();
     591                                        char * err, int errlen) {
     592        int ret; /* Returned error codes */
     593        struct rte_pci_addr use_addr; /* The only address that we don't blacklist */
     594        char cpu_number[10] = {0}; /* The CPU mask we want to bind to */
     595        char mem_map[20] = {0}; /* The memory name */
     596        long nb_cpu; /* The number of CPUs in the system */
     597        long my_cpu; /* The CPU number we want to bind to */
     598        int i;
     599        struct rte_config *cfg = rte_eal_get_configuration();
    595600        struct saved_getopts save_opts;
    596601
    597 #if DEBUG
    598     rte_set_log_level(RTE_LOG_DEBUG);
    599 #else
    600     rte_set_log_level(RTE_LOG_WARNING);
    601 #endif
    602     /*
    603      * Using unique file prefixes mean separate memory is used, unlinking
    604      * the two processes. However be careful we still cannot access a
    605      * port that already in use.
    606      *
    607      * Using unique file prefixes mean separate memory is used, unlinking
    608      * the two processes. However be careful we still cannot access a
    609      * port that already in use.
    610      */
    611     char* argv[] = {"libtrace",
    612                     "-c", cpu_number,
    613                     "-n", "1",
    614                     "--proc-type", "auto",
    615                     "--file-prefix", mem_map,
    616                     "-m", "256",
     602        /* This initialises the Environment Abstraction Layer (EAL)
     603         * If we had slave workers these are put into WAITING state
     604         *
     605         * Basically binds this thread to a fixed core, which we choose as
     606         * the last core on the machine (assuming fewer interrupts mapped here).
     607         * "-c" controls the cpu mask 0x1=1st core 0x2=2nd 0x4=3rd and so on
     608         * "-n" the number of memory channels into the CPU (hardware specific)
     609         *      - Most likely to be half the number of ram slots in your machine.
     610         *        We could count ram slots by "dmidecode -t 17 | grep -c 'Size:'"
     611         * Controls where in memory packets are stored such that they are spread
     612         * across the channels. We just use 1 to be safe.
     613         *
     614         * Using unique file prefixes mean separate memory is used, unlinking
     615         * the two processes. However be careful we still cannot access a
     616         * port that already in use.
     617         */
     618        char* argv[] = {"libtrace",
     619                        "-c", cpu_number,
     620                        "-n", "1",
     621                        "--proc-type", "auto",
     622                        "--file-prefix", mem_map,
     623                        "-m", "512",
    617624#if DPDK_USE_LOG_LEVEL
    618625#       if DEBUG
    619                     "--log-level", "8", /* RTE_LOG_DEBUG */
     626                        "--log-level", "8", /* RTE_LOG_DEBUG */
    620627#       else
    621                     "--log-level", "5", /* RTE_LOG_WARNING */
     628                        "--log-level", "5", /* RTE_LOG_WARNING */
    622629#       endif
    623630#endif
    624                     NULL};
    625     int argc = sizeof(argv) / sizeof(argv[0]) - 1;
    626 
    627     /* This initialises the Environment Abstraction Layer (EAL)
    628      * If we had slave workers these are put into WAITING state
    629      *
    630      * Basically binds this thread to a fixed core, which we choose as
    631      * the last core on the machine (assuming fewer interrupts mapped here).
    632      * "-c" controls the cpu mask 0x1=1st core 0x2=2nd 0x4=3rd and so on
    633      * "-n" the number of memory channels into the CPU (hardware specific)
    634      *      - Most likely to be half the number of ram slots in your machine.
    635      *        We could count ram slots by "dmidecode -t 17 | grep -c 'Size:'"
    636      * Controls where in memory packets are stored and should spread across
    637      * the channels. We just use 1 to be safe.
    638      */
    639 
    640     /* Get the number of cpu cores in the system and use the last core
    641      * on the correct numa node */
    642     nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
    643     if (nb_cpu <= 0) {
    644         perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
    645         nb_cpu = 1; /* fallback to the first core */
    646     }
    647     if (nb_cpu > RTE_MAX_LCORE)
    648         nb_cpu = RTE_MAX_LCORE;
    649 
    650     my_cpu = -1;
    651     /* This allows the user to specify the core - we would try to do this
    652      * automatically but it's hard to tell that this is secondary
    653      * before running rte_eal_init(...). Currently we are limited to 1
    654      * instance per core due to the way memory is allocated. */
    655     if (parse_pciaddr(uridata, &use_addr, &my_cpu) != 0) {
    656         snprintf(err, errlen, "Failed to parse URI");
    657         return -1;
    658     }
     631                        NULL};
     632        int argc = sizeof(argv) / sizeof(argv[0]) - 1;
     633
     634#if DEBUG
     635        rte_set_log_level(RTE_LOG_DEBUG);
     636#else
     637        rte_set_log_level(RTE_LOG_WARNING);
     638#endif
     639
     640        /* Get the number of cpu cores in the system and use the last core
     641         * on the correct numa node */
     642        nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
     643        if (nb_cpu <= 0) {
     644                perror("sysconf(_SC_NPROCESSORS_ONLN) failed."
     645                       " Falling back to the first core.");
     646                nb_cpu = 1; /* fallback to the first core */
     647        }
     648        if (nb_cpu > RTE_MAX_LCORE)
     649                nb_cpu = RTE_MAX_LCORE;
     650
     651        my_cpu = -1;
     652        /* This allows the user to specify the core - we would try to do this
     653         * automatically but it's hard to tell that this is secondary
     654         * before running rte_eal_init(...). Currently we are limited to 1
     655         * instance per core due to the way memory is allocated. */
     656        if (parse_pciaddr(uridata, &use_addr, &my_cpu) != 0) {
     657                snprintf(err, errlen, "Failed to parse URI");
     658                return -1;
     659        }
    659660
    660661#if HAVE_LIBNUMA
     
    681682
    682683
    683     snprintf(format_data->mempool_name, MEMPOOL_NAME_LEN,
    684                 "libtrace_pool_%"PRIu32, (uint32_t) nb_cpu);
    685 
    686     if (!(my_cpu > 0 && my_cpu <= nb_cpu)) {
    687         snprintf(err, errlen,
    688           "Intel DPDK - User defined a bad CPU number %"PRIu32" must be"
    689           " between 1 and %"PRIu32, (uint32_t) my_cpu, (uint32_t) nb_cpu);
    690         return -1;
    691     }
    692 
    693     /* Make our mask with all cores turned on this is so that DPDK to gets CPU
    694       info older versions */
    695     snprintf(cpu_number, sizeof(cpu_number), "%x", ~(UINT32_MAX<<MIN(31, nb_cpu)));
    696     //snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1));
     684        snprintf(format_data->mempool_name, MEMPOOL_NAME_LEN,
     685                 "libtrace_pool_%"PRIu32, (uint32_t) nb_cpu);
     686
     687        if (!(my_cpu > 0 && my_cpu <= nb_cpu)) {
     688                snprintf(err, errlen,
     689                         "Intel DPDK - User defined a bad CPU number %"PRIu32" must be"
     690                         " between 1 and %"PRIu32, (uint32_t) my_cpu, (uint32_t) nb_cpu);
     691                return -1;
     692        }
     693
     694        /* Make our mask with all cores turned on this is so that DPDK to
     695         * gets CPU info older versions */
     696        snprintf(cpu_number, sizeof(cpu_number), "%x", ~(UINT32_MAX<<MIN(31, nb_cpu)));
     697        //snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1));
    697698
    698699#if !DPDK_USE_BLACKLIST
    699     /* Black list all ports besides the one that we want to use */
     700        /* Black list all ports besides the one that we want to use */
    700701        if ((ret = whitelist_device(format_data, &use_addr)) < 0) {
    701702                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
    702                         " are you sure the address is correct?: %s", strerror(-ret));
     703                        " are you sure the address is correct?: %s", strerror(-ret));
    703704                return -1;
    704705        }
     
    707708        /* Give the memory map a unique name */
    708709        snprintf(mem_map, sizeof(mem_map), "libtrace-%d", (int) getpid());
    709     /* rte_eal_init it makes a call to getopt so we need to reset the
    710     * global optind variable of getopt otherwise this fails */
     710        /* rte_eal_init it makes a call to getopt so we need to reset the
     711        * global optind variable of getopt otherwise this fails */
    711712        save_getopts(&save_opts);
    712     optind = 1;
    713     if ((ret = rte_eal_init(argc, argv)) < 0) {
    714         snprintf(err, errlen,
    715           "Intel DPDK - Initialisation of EAL failed: %s", strerror(-ret));
    716         return -1;
    717     }
     713        optind = 1;
     714        if ((ret = rte_eal_init(argc, argv)) < 0) {
     715                snprintf(err, errlen,
     716                         "Intel DPDK - Initialisation of EAL failed: %s", strerror(-ret));
     717                return -1;
     718        }
    718719        restore_getopts(&save_opts);
    719     // These are still running but will never do anything with DPDK v1.7 we
    720     // should remove this XXX in the future
    721     for(i = 0; i < RTE_MAX_LCORE; ++i) {
    722             if (rte_lcore_is_enabled(i) && i != (int) rte_get_master_lcore()) {
    723             cfg->lcore_role[i] = ROLE_OFF;
    724             cfg->lcore_count--;
    725         }
    726     }
    727     // Only the master should be running
    728     assert(cfg->lcore_count == 1);
    729 
    730     dpdk_move_master_lcore(my_cpu-1);
     720        // These are still running but will never do anything with DPDK v1.7 we
     721        // should remove this XXX in the future
     722        for(i = 0; i < RTE_MAX_LCORE; ++i) {
     723                if (rte_lcore_is_enabled(i) && i != (int) rte_get_master_lcore()) {
     724                        cfg->lcore_role[i] = ROLE_OFF;
     725                        cfg->lcore_count--;
     726                }
     727        }
     728        // Only the master should be running
     729        assert(cfg->lcore_count == 1);
     730
     731        // TODO XXX TODO
     732        dpdk_move_master_lcore(NULL, my_cpu-1);
    731733
    732734#if DEBUG
    733     dump_configuration();
     735        dump_configuration();
    734736#endif
    735737
    736738#if DPDK_USE_PMD_INIT
    737     /* This registers all available NICs with Intel DPDK
    738     * These are not loaded until rte_eal_pci_probe() is called.
    739     */
    740     if ((ret = rte_pmd_init_all()) < 0) {
    741         snprintf(err, errlen,
    742           "Intel DPDK - rte_pmd_init_all failed: %s", strerror(-ret));
    743         return -1;
    744     }
     739        /* This registers all available NICs with Intel DPDK
     740        * These are not loaded until rte_eal_pci_probe() is called.
     741        */
     742        if ((ret = rte_pmd_init_all()) < 0) {
     743                snprintf(err, errlen,
     744                         "Intel DPDK - rte_pmd_init_all failed: %s", strerror(-ret));
     745                return -1;
     746        }
    745747#endif
    746748
    747749#if DPDK_USE_BLACKLIST
    748     /* Blacklist all ports besides the one that we want to use */
     750        /* Blacklist all ports besides the one that we want to use */
    749751        if ((ret = blacklist_devices(format_data, &use_addr)) < 0) {
    750752                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
    751                         " are you sure the address is correct?: %s", strerror(-ret));
     753                        " are you sure the address is correct?: %s", strerror(-ret));
    752754                return -1;
    753755        }
     
    755757
    756758#if DPDK_USE_PCI_PROBE
    757     /* This loads DPDK drivers against all ports that are not blacklisted */
     759        /* This loads DPDK drivers against all ports that are not blacklisted */
    758760        if ((ret = rte_eal_pci_probe()) < 0) {
    759         snprintf(err, errlen,
    760             "Intel DPDK - rte_eal_pci_probe failed: %s", strerror(-ret));
    761         return -1;
    762     }
    763 #endif
    764 
    765     format_data->nb_ports = rte_eth_dev_count();
    766 
    767     if (format_data->nb_ports != 1) {
    768         snprintf(err, errlen,
    769             "Intel DPDK - rte_eth_dev_count returned %d but it should be 1",
    770             format_data->nb_ports);
    771         return -1;
    772     }
    773 
    774     struct rte_eth_dev_info dev_info;
    775     rte_eth_dev_info_get(0, &dev_info);
    776     fprintf(stderr, "Device port=0\n\tmin_rx_bufsize=%d\n\tmax_rx_pktlen=%d\n\tmax rx queues=%d\n\tmax tx queues=%d",
    777                 (int) dev_info.min_rx_bufsize, (int) dev_info.max_rx_pktlen, (int) dev_info.max_rx_queues, (int) dev_info.max_tx_queues);
    778 
    779     return 0;
     761                snprintf(err, errlen,
     762                         "Intel DPDK - rte_eal_pci_probe failed: %s", strerror(-ret));
     763                return -1;
     764        }
     765#endif
     766
     767        format_data->nb_ports = rte_eth_dev_count();
     768
     769        if (format_data->nb_ports != 1) {
     770                snprintf(err, errlen,
     771                         "Intel DPDK - rte_eth_dev_count returned %d but it should be 1",
     772                         format_data->nb_ports);
     773                return -1;
     774        }
     775
     776        struct rte_eth_dev_info dev_info;
     777        rte_eth_dev_info_get(0, &dev_info);
     778        fprintf(stderr, "Device port=0\n\tmin_rx_bufsize=%d\n\tmax_rx_pktlen=%d\n\tmax rx queues=%d\n\tmax tx queues=%d",
     779                (int) dev_info.min_rx_bufsize, (int) dev_info.max_rx_pktlen, (int) dev_info.max_rx_queues, (int) dev_info.max_tx_queues);
     780
     781        return 0;
    780782}
    781783
    782784static int dpdk_init_input (libtrace_t *libtrace) {
    783     char err[500];
    784     err[0] = 0;
    785 
    786     libtrace->format_data = (struct dpdk_format_data_t *)
    787                             malloc(sizeof(struct dpdk_format_data_t));
    788     FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
    789     FORMAT(libtrace)->queue_id = 0; /* Single queue per port */
    790     FORMAT(libtrace)->nb_ports = 0;
    791     FORMAT(libtrace)->snaplen = 0; /* Use default */
    792     FORMAT(libtrace)->nb_rx_buf = NB_RX_MBUF;
    793     FORMAT(libtrace)->nb_tx_buf = MIN_NB_BUF;
    794     FORMAT(libtrace)->nic_numa_node = -1;
    795     FORMAT(libtrace)->promisc = -1;
    796     FORMAT(libtrace)->pktmbuf_pool = NULL;
     785        dpdk_per_stream_t stream = {0};
     786        char err[500];
     787        err[0] = 0;
     788
     789        libtrace->format_data = (struct dpdk_format_data_t *)
     790                                malloc(sizeof(struct dpdk_format_data_t));
     791        FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
     792        FORMAT(libtrace)->nb_ports = 0;
     793        FORMAT(libtrace)->snaplen = 0; /* Use default */
     794        FORMAT(libtrace)->nb_rx_buf = NB_RX_MBUF;
     795        FORMAT(libtrace)->nb_tx_buf = MIN_NB_BUF;
     796        FORMAT(libtrace)->nic_numa_node = -1;
     797        FORMAT(libtrace)->promisc = -1;
     798        FORMAT(libtrace)->pktmbuf_pool = NULL;
    797799#if DPDK_USE_BLACKLIST
    798     FORMAT(libtrace)->nb_blacklist = 0;
    799 #endif
    800     FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
    801     FORMAT(libtrace)->mempool_name[0] = 0;
    802     memset(FORMAT(libtrace)->burst_pkts, 0, sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
    803     FORMAT(libtrace)->burst_size = 0;
    804     FORMAT(libtrace)->burst_offset = 0;
    805 
    806     if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
    807         trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
    808         free(libtrace->format_data);
    809         libtrace->format_data = NULL;
    810         return -1;
    811     }
    812     return 0;
    813 };
     800        FORMAT(libtrace)->nb_blacklist = 0;
     801#endif
     802        FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
     803        FORMAT(libtrace)->mempool_name[0] = 0;
     804        memset(FORMAT(libtrace)->burst_pkts, 0,
     805               sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
     806        FORMAT(libtrace)->burst_size = 0;
     807        FORMAT(libtrace)->burst_offset = 0;
     808
     809        /* Make our first stream */
     810        FORMAT(libtrace)->per_stream = libtrace_list_init(sizeof(struct dpdk_per_stream_t));
     811        libtrace_list_push_back(FORMAT(libtrace)->per_stream, &stream);
     812
     813        if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
     814                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
     815                free(libtrace->format_data);
     816                libtrace->format_data = NULL;
     817                return -1;
     818        }
     819        return 0;
     820}
    814821
    815822static int dpdk_init_output(libtrace_out_t *libtrace)
    816823{
    817     char err[500];
    818     err[0] = 0;
    819 
    820     libtrace->format_data = (struct dpdk_format_data_t *)
    821                             malloc(sizeof(struct dpdk_format_data_t));
    822     FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
    823     FORMAT(libtrace)->queue_id = 0; /* Single queue per port */
    824     FORMAT(libtrace)->nb_ports = 0;
    825     FORMAT(libtrace)->snaplen = 0; /* Use default */
    826     FORMAT(libtrace)->nb_rx_buf = MIN_NB_BUF;
    827     FORMAT(libtrace)->nb_tx_buf = NB_TX_MBUF;
    828     FORMAT(libtrace)->nic_numa_node = -1;
    829     FORMAT(libtrace)->promisc = -1;
    830     FORMAT(libtrace)->pktmbuf_pool = NULL;
     824        char err[500];
     825        err[0] = 0;
     826
     827        libtrace->format_data = (struct dpdk_format_data_t *)
     828                                malloc(sizeof(struct dpdk_format_data_t));
     829        FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
     830        FORMAT(libtrace)->nb_ports = 0;
     831        FORMAT(libtrace)->snaplen = 0; /* Use default */
     832        FORMAT(libtrace)->nb_rx_buf = MIN_NB_BUF;
     833        FORMAT(libtrace)->nb_tx_buf = NB_TX_MBUF;
     834        FORMAT(libtrace)->nic_numa_node = -1;
     835        FORMAT(libtrace)->promisc = -1;
     836        FORMAT(libtrace)->pktmbuf_pool = NULL;
    831837#if DPDK_USE_BLACKLIST
    832     FORMAT(libtrace)->nb_blacklist = 0;
    833 #endif
    834     FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
    835     FORMAT(libtrace)->mempool_name[0] = 0;
    836     memset(FORMAT(libtrace)->burst_pkts, 0, sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
    837     FORMAT(libtrace)->burst_size = 0;
    838     FORMAT(libtrace)->burst_offset = 0;
    839 
    840     if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
    841         trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
    842         free(libtrace->format_data);
    843         libtrace->format_data = NULL;
     838        FORMAT(libtrace)->nb_blacklist = 0;
     839#endif
     840        FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
     841        FORMAT(libtrace)->mempool_name[0] = 0;
     842        memset(FORMAT(libtrace)->burst_pkts, 0, sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
     843        FORMAT(libtrace)->burst_size = 0;
     844        FORMAT(libtrace)->burst_offset = 0;
     845
     846        if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
     847                trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
     848                free(libtrace->format_data);
     849                libtrace->format_data = NULL;
     850                return -1;
     851        }
     852        return 0;
     853}
     854
     855static int dpdk_pconfig_input (libtrace_t *libtrace,
     856                               trace_parallel_option_t option,
     857                               void *data) {
     858        switch (option) {
     859        case TRACE_OPTION_SET_HASHER:
     860                switch (*((enum hasher_types *) data))
     861                {
     862                case HASHER_BALANCE:
     863                case HASHER_UNIDIRECTIONAL:
     864                        toeplitz_create_unikey(FORMAT(libtrace)->rss_key);
     865                        return 0;
     866                case HASHER_BIDIRECTIONAL:
     867                        toeplitz_create_bikey(FORMAT(libtrace)->rss_key);
     868                        return 0;
     869                case HASHER_HARDWARE:
     870                case HASHER_CUSTOM:
     871                        // We don't support these
     872                        return -1;
     873                }
     874                break;
     875        }
    844876        return -1;
    845     }
    846     return 0;
    847 };
    848 
    849 static int dpdk_pconfig_input (libtrace_t *libtrace,
    850                                 trace_parallel_option_t option,
    851                                 void *data) {
    852         switch (option) {
    853                 case TRACE_OPTION_SET_HASHER:
    854                         switch (*((enum hasher_types *) data))
    855                         {
    856                                 case HASHER_BALANCE:
    857                                 case HASHER_UNIDIRECTIONAL:
    858                                         toeplitz_create_unikey(FORMAT(libtrace)->rss_key);
    859                                         return 0;
    860                                 case HASHER_BIDIRECTIONAL:
    861                                         toeplitz_create_bikey(FORMAT(libtrace)->rss_key);
    862                                         return 0;
    863                                 case HASHER_HARDWARE:
    864                                 case HASHER_CUSTOM:
    865                                         // We don't support these
    866                                         return -1;
    867                         }
    868         break;
    869         }
    870         return -1;
    871 }
     877}
     878
    872879/**
    873880 * Note here snaplen excludes the MAC checksum. Packets over
     
    881888 */
    882889static int dpdk_config_input (libtrace_t *libtrace,
    883                                         trace_option_t option,
    884                                         void *data) {
    885     switch (option) {
     890                              trace_option_t option,
     891                              void *data) {
     892        switch (option) {
    886893        case TRACE_OPTION_SNAPLEN:
    887             /* Only support changing snaplen before a call to start is
    888             * made */
    889             if (FORMAT(libtrace)->paused == DPDK_NEVER_STARTED)
    890                 FORMAT(libtrace)->snaplen=*(int*)data;
    891             else
    892                 return -1;
    893             return 0;
    894                 case TRACE_OPTION_PROMISC:
    895                         FORMAT(libtrace)->promisc=*(int*)data;
    896             return 0;
     894                /* Only support changing snaplen before a call to start is
     895                * made */
     896                if (FORMAT(libtrace)->paused == DPDK_NEVER_STARTED)
     897                        FORMAT(libtrace)->snaplen=*(int*)data;
     898                else
     899                        return -1;
     900                return 0;
     901        case TRACE_OPTION_PROMISC:
     902                FORMAT(libtrace)->promisc=*(int*)data;
     903                return 0;
    897904        case TRACE_OPTION_FILTER:
    898             /* TODO filtering */
    899             break;
     905                /* TODO filtering */
     906                break;
    900907        case TRACE_OPTION_META_FREQ:
    901             break;
     908                break;
    902909        case TRACE_OPTION_EVENT_REALTIME:
    903             break;
     910                break;
    904911        /* Avoid default: so that future options will cause a warning
    905912         * here to remind us to implement it, or flag it as
    906913         * unimplementable
    907914         */
    908     }
     915        }
    909916
    910917        /* Don't set an error - trace_config will try to deal with the
    911918         * option and will set an error if it fails */
    912     return -1;
     919        return -1;
    913920}
    914921
     
    961968                .wthresh = 4,/* RX_WTHRESH writeback */
    962969        },
    963     .rx_free_thresh = 0,
    964     .rx_drop_en = 0, /* Drop packets oldest packets if out of space */
     970        .rx_free_thresh = 0,
     971        .rx_drop_en = 0, /* Drop packets oldest packets if out of space */
    965972};
    966973
    967974static const struct rte_eth_txconf tx_conf = {
    968975        .tx_thresh = {
    969         /**
    970         * TX_PTHRESH prefetch
    971         * Set on the NIC, if the number of unprocessed descriptors to queued on
    972         * the card fall below this try grab at least hthresh more unprocessed
    973         * descriptors.
    974         */
     976                /*
     977                * TX_PTHRESH prefetch
     978                * Set on the NIC, if the number of unprocessed descriptors to queued on
     979                * the card fall below this try grab at least hthresh more unprocessed
     980                * descriptors.
     981                */
    975982                .pthresh = 36,
    976983
    977         /* TX_HTHRESH host
    978         * Set on the NIC, the batch size to prefetch unprocessed tx descriptors.
    979         */
     984                /* TX_HTHRESH host
     985                * Set on the NIC, the batch size to prefetch unprocessed tx descriptors.
     986                */
    980987                .hthresh = 0,
    981988
    982         /* TX_WTHRESH writeback
    983         * Set on the NIC, the number of sent descriptors before writing back
    984         * status to confirm the transmission. This is done more efficiently as
    985         * a bulk DMA-transfer rather than writing one at a time.
    986         * Similar to tx_free_thresh however this is applied to the NIC, where
    987         * as tx_free_thresh is when DPDK will check these. This is extended
    988         * upon by tx_rs_thresh (10Gbit cards) which doesn't write all
    989         * descriptors rather only every n'th item, reducing DMA memory bandwidth.
    990         */
     989                /* TX_WTHRESH writeback
     990                * Set on the NIC, the number of sent descriptors before writing back
     991                * status to confirm the transmission. This is done more efficiently as
     992                * a bulk DMA-transfer rather than writing one at a time.
     993                * Similar to tx_free_thresh however this is applied to the NIC, where
     994                * as tx_free_thresh is when DPDK will check these. This is extended
     995                * upon by tx_rs_thresh (10Gbit cards) which doesn't write all
     996                * descriptors rather only every n'th item, reducing DMA memory bandwidth.
     997                */
    991998                .wthresh = 4,
    992999        },
    9931000
    994     /* Used internally by DPDK rather than passed to the NIC. The number of
    995     * packet descriptors to send before checking for any responses written
    996     * back (to confirm the transmission). Default = 32 if set to 0)
    997     */
     1001        /* Used internally by DPDK rather than passed to the NIC. The number of
     1002        * packet descriptors to send before checking for any responses written
     1003        * back (to confirm the transmission). Default = 32 if set to 0)
     1004        */
    9981005        .tx_free_thresh = 0,
    9991006
    1000     /* This is the Report Status threshold, used by 10Gbit cards,
    1001     * This signals the card to only write back status (such as
    1002     * transmission successful) after this minimum number of transmit
    1003     * descriptors are seen. The default is 32 (if set to 0) however if set
    1004     * to greater than 1 TX wthresh must be set to zero, because this is kindof
    1005     * a replacement. See the dpdk programmers guide for more restrictions.
    1006     */
     1007        /* This is the Report Status threshold, used by 10Gbit cards,
     1008        * This signals the card to only write back status (such as
     1009        * transmission successful) after this minimum number of transmit
     1010        * descriptors are seen. The default is 32 (if set to 0) however if set
     1011        * to greater than 1 TX wthresh must be set to zero, because this is kindof
     1012        * a replacement. See the dpdk programmers guide for more restrictions.
     1013        */
    10071014        .tx_rs_thresh = 1,
    10081015};
     
    10601067}
    10611068
    1062 /* Attach memory to the port and start the port or restart the port.
    1063  */
    1064 static int dpdk_start_port (struct dpdk_format_data_t * format_data, char *err, int errlen){
    1065     int ret; /* Check return values for errors */
    1066     struct rte_eth_link link_info; /* Wait for link */
    1067     unsigned cpu_numa_node = rte_lcore_to_socket_id(rte_lcore_id());
    1068 
    1069     /* Already started */
    1070     if (format_data->paused == DPDK_RUNNING)
    1071         return 0;
    1072 
    1073     /* First time started we need to alloc our memory, doing this here
    1074      * rather than in environment setup because we don't have snaplen then */
    1075     if (format_data->paused == DPDK_NEVER_STARTED) {
    1076         if (format_data->snaplen == 0) {
    1077             format_data->snaplen = RX_MBUF_SIZE;
    1078             port_conf.rxmode.jumbo_frame = 0;
    1079             port_conf.rxmode.max_rx_pkt_len = 0;
    1080         } else {
    1081             /* Use jumbo frames */
    1082             port_conf.rxmode.jumbo_frame = 1;
    1083             port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
    1084         }
    1085 
    1086         /* This is additional overhead so make sure we allow space for this */
     1069/* Attach memory to the port and start (or restart) the port/s.
     1070 */
     1071static int dpdk_start_streams(struct dpdk_format_data_t *format_data,
     1072                              char *err, int errlen, uint16_t rx_queues) {
     1073        int ret, i;
     1074        struct rte_eth_link link_info; /* Wait for link */
     1075        dpdk_per_stream_t empty_stream = {0};
     1076
     1077        /* Already started */
     1078        if (format_data->paused == DPDK_RUNNING)
     1079                return 0;
     1080
     1081        /* First time started we need to alloc our memory, doing this here
     1082         * rather than in environment setup because we don't have snaplen then */
     1083        if (format_data->paused == DPDK_NEVER_STARTED) {
     1084                if (format_data->snaplen == 0) {
     1085                        format_data->snaplen = RX_MBUF_SIZE;
     1086                        port_conf.rxmode.jumbo_frame = 0;
     1087                        port_conf.rxmode.max_rx_pkt_len = 0;
     1088                } else {
     1089                        /* Use jumbo frames */
     1090                        port_conf.rxmode.jumbo_frame = 1;
     1091                        port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
     1092                }
     1093
    10871094#if GET_MAC_CRC_CHECKSUM
    1088         format_data->snaplen += ETHER_CRC_LEN;
     1095                /* This is additional overhead so make sure we allow space for this */
     1096                format_data->snaplen += ETHER_CRC_LEN;
    10891097#endif
    10901098#if HAS_HW_TIMESTAMPS_82580
    1091         format_data->snaplen += sizeof(struct hw_timestamp_82580);
    1092 #endif
    1093 
    1094         /* Create the mbuf pool, which is the place our packets are allocated
    1095          * from - TODO figure out if there is is a free function (I cannot see one)
    1096          * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
    1097          * allocate however that extra 1 packet is not used.
    1098          * (I assume <= vs < error some where in DPDK code)
    1099          * TX requires nb_tx_buffers + 1 in the case the queue is full
    1100          * so that will fill the new buffer and wait until slots in the
    1101          * ring become available.
     1099                format_data->snaplen += sizeof(struct hw_timestamp_82580);
     1100#endif
     1101
     1102                /* Create the mbuf pool, which is the place packets are allocated
     1103                 * from - There is no free function (I cannot see one).
     1104                 * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
     1105                 * allocate however that extra 1 packet is not used.
     1106                 * (I assume <= vs < error some where in DPDK code)
     1107                 * TX requires nb_tx_buffers + 1 in the case the queue is full
     1108                 * so that will fill the new buffer and wait until slots in the
     1109                 * ring become available.
     1110                 */
     1111#if DEBUG
     1112                fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name);
     1113#endif
     1114                format_data->pktmbuf_pool =
     1115                                rte_mempool_create(format_data->mempool_name,
     1116                                                   (format_data->nb_rx_buf * rx_queues + format_data->nb_tx_buf + 1)*2,
     1117                                                   format_data->snaplen + sizeof(struct rte_mbuf)
     1118                                                   + RTE_PKTMBUF_HEADROOM,
     1119                                                   128, sizeof(struct rte_pktmbuf_pool_private),
     1120                                                   rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL,
     1121                                                   format_data->nic_numa_node, 0);
     1122
     1123                if (format_data->pktmbuf_pool == NULL) {
     1124                        snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
     1125                                 "pool failed: %s", strerror(rte_errno));
     1126                        return -1;
     1127                }
     1128        }
     1129
     1130        /* ----------- Now do the setup for the port mapping ------------ */
     1131        /* Order of calls must be
     1132         * rte_eth_dev_configure()
     1133         * rte_eth_tx_queue_setup()
     1134         * rte_eth_rx_queue_setup()
     1135         * rte_eth_dev_start()
     1136         * other rte_eth calls
    11021137         */
     1138
     1139        /* This must be called first before another *eth* function
     1140         * 1+ rx, 1 tx queues, port_conf sets checksum stripping etc */
     1141        ret = rte_eth_dev_configure(format_data->port, rx_queues, 1, &port_conf);
     1142        if (ret < 0) {
     1143                snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
     1144                         " %"PRIu8" : %s", format_data->port,
     1145                         strerror(-ret));
     1146                return -1;
     1147        }
    11031148#if DEBUG
    1104     fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name);
    1105 #endif
    1106     format_data->pktmbuf_pool =
    1107             rte_mempool_create(format_data->mempool_name,
    1108                        (format_data->nb_rx_buf + format_data->nb_tx_buf + 1),
    1109                        format_data->snaplen + sizeof(struct rte_mbuf)
    1110                                         + RTE_PKTMBUF_HEADROOM,
    1111                        128, sizeof(struct rte_pktmbuf_pool_private),
    1112                        rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL,
    1113                        cpu_numa_node, 0/*MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET*/);
    1114 
    1115     if (format_data->pktmbuf_pool == NULL) {
    1116             snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf"
    1117                         "pool failed: %s NODE %u", strerror(rte_errno), cpu_numa_node);
    1118             return -1;
    1119         }
    1120     }
    1121 
    1122     /* ----------- Now do the setup for the port mapping ------------ */
    1123     /* Order of calls must be
    1124      * rte_eth_dev_configure()
    1125      * rte_eth_tx_queue_setup()
    1126      * rte_eth_rx_queue_setup()
    1127      * rte_eth_dev_start()
    1128      * other rte_eth calls
    1129      */
    1130 
    1131 
    1132     port_conf.rx_adv_conf.rss_conf.rss_key = format_data->rss_key;
    1133 
    1134     /* This must be called first before another *eth* function
    1135      * 1 rx, 1 tx queue, port_conf sets checksum stripping etc */
    1136     ret = rte_eth_dev_configure(format_data->port, 1, 1, &port_conf);
    1137     if (ret < 0) {
    1138         snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
    1139                             " %"PRIu8" : %s", format_data->port,
    1140                             strerror(-ret));
    1141         return -1;
    1142     }
    1143     /* Initialise the TX queue a minimum value if using this port for
    1144      * receiving. Otherwise a larger size if writing packets.
    1145      */
    1146     ret = rte_eth_tx_queue_setup(format_data->port, format_data->queue_id,
    1147                         format_data->nb_tx_buf, SOCKET_ID_ANY, &tx_conf);
    1148     if (ret < 0) {
    1149         snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue on port"
    1150                             " %"PRIu8" : %s", format_data->port,
    1151                             strerror(-ret));
    1152         return -1;
    1153     }
    1154     /* Initialise the RX queue with some packets from memory */
    1155     ret = rte_eth_rx_queue_setup(format_data->port, format_data->queue_id,
    1156                                  format_data->nb_rx_buf, cpu_numa_node,
    1157                                  &rx_conf, format_data->pktmbuf_pool);
    1158     if (ret < 0) {
    1159         snprintf(err, errlen, "Intel DPDK - Cannot configure RX queue on port"
    1160                     " %"PRIu8" : %s", format_data->port,
    1161                     strerror(-ret));
    1162         return -1;
    1163     }
    1164 
    1165     /* Start device */
    1166     ret = rte_eth_dev_start(format_data->port);
    1167     if (ret < 0) {
    1168         snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
    1169                     strerror(-ret));
    1170         return -1;
    1171     }
    1172 
    1173     /* Default promiscuous to on */
    1174     if (format_data->promisc == -1)
    1175         format_data->promisc = 1;
    1176 
    1177     if (format_data->promisc == 1)
    1178         rte_eth_promiscuous_enable(format_data->port);
    1179     else
    1180         rte_eth_promiscuous_disable(format_data->port);
     1149        fprintf(stderr, "Doing dev configure\n");
     1150#endif
     1151        /* Initialise the TX queue a minimum value if using this port for
     1152         * receiving. Otherwise a larger size if writing packets.
     1153         */
     1154        ret = rte_eth_tx_queue_setup(format_data->port,
     1155                                     0 /* queue XXX */,
     1156                                     format_data->nb_tx_buf,
     1157                                     SOCKET_ID_ANY,
     1158                                     &tx_conf);
     1159        if (ret < 0) {
     1160                snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue"
     1161                         " on port %"PRIu8" : %s", format_data->port,
     1162                         strerror(-ret));
     1163                return -1;
     1164        }
     1165
     1166        /* Attach memory to our RX queues */
     1167        for (i=0; i < rx_queues; i++) {
     1168#if DEBUG
     1169                fprintf(stderr, "Doing queue configure %d\n", i);
     1170#endif
     1171
     1172                dpdk_per_stream_t *stream;
     1173                /* Add storage for the stream */
     1174                if (libtrace_list_get_size(format_data->per_stream) <= (size_t) i)
     1175                        libtrace_list_push_back(format_data->per_stream, &empty_stream);
     1176
     1177                stream = libtrace_list_get_index(format_data->per_stream, i)->data;
     1178                stream->queue_id = i;
     1179
     1180                /* Initialise the RX queue with some packets from memory */
     1181                ret = rte_eth_rx_queue_setup(format_data->port,
     1182                                             stream->queue_id,
     1183                                             format_data->nb_rx_buf,
     1184                                             format_data->nic_numa_node,
     1185                                             &rx_conf,
     1186                                             format_data->pktmbuf_pool);
     1187                if (ret < 0) {
     1188                        snprintf(err, errlen, "Intel DPDK - Cannot configure"
     1189                                 " RX queue on port %"PRIu8" : %s",
     1190                                 format_data->port,
     1191                                 strerror(-ret));
     1192                        return -1;
     1193                }
     1194        }
     1195
     1196#if DEBUG
     1197        fprintf(stderr, "Doing start device\n");
     1198#endif
     1199        /* Start device */
     1200        ret = rte_eth_dev_start(format_data->port);
     1201        if (ret < 0) {
     1202                snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
     1203                         strerror(-ret));
     1204                return -1;
     1205        }
     1206
     1207        /* Default promiscuous to on */
     1208        if (format_data->promisc == -1)
     1209                format_data->promisc = 1;
     1210
     1211        if (format_data->promisc == 1)
     1212                rte_eth_promiscuous_enable(format_data->port);
     1213        else
     1214                rte_eth_promiscuous_disable(format_data->port);
     1215
     1216        /* We have now successfully started/unpased */
     1217        format_data->paused = DPDK_RUNNING;
     1218
    11811219
    11821220        /* Register a callback for link state changes */
     
    11851223                                            dpdk_lsc_callback,
    11861224                                            format_data);
    1187         /* If this fails it is not a show stopper */
    11881225#if DEBUG
    1189         fprintf(stderr, "rte_eth_dev_callback_register failed %d : %s\n",
    1190                 ret, strerror(-ret));
    1191 #endif
    1192 
    1193     /* Get the current link status */
    1194     rte_eth_link_get_nowait(format_data->port, &link_info);
    1195     format_data->link_speed = link_info.link_speed;
     1226        if (ret)
     1227                fprintf(stderr, "rte_eth_dev_callback_register failed %d : %s\n",
     1228                        ret, strerror(-ret));
     1229#endif
     1230
     1231        /* Get the current link status */
     1232        rte_eth_link_get_nowait(format_data->port, &link_info);
     1233        format_data->link_speed = link_info.link_speed;
    11961234#if DEBUG
    1197     fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
    1198             (int) link_info.link_duplex, (int) link_info.link_speed);
    1199 #endif
    1200     /* We have now successfully started/unpaused */
    1201     format_data->paused = DPDK_RUNNING;
    1202 
    1203     return 0;
    1204 }
    1205 
    1206 /* Attach memory to the port and start (or restart) the port/s.
    1207  */
    1208 static int dpdk_start_port_queues (struct dpdk_format_data_t *format_data, char *err, int errlen, uint16_t rx_queues) {
    1209     int ret, i; /* Check return values for errors */
    1210     struct rte_eth_link link_info; /* Wait for link */
    1211 
    1212     /* Already started */
    1213     if (format_data->paused == DPDK_RUNNING)
     1235        fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
     1236                (int) link_info.link_duplex, (int) link_info.link_speed);
     1237#endif
     1238
    12141239        return 0;
    1215 
    1216     /* First time started we need to alloc our memory, doing this here
    1217      * rather than in environment setup because we don't have snaplen then */
    1218     if (format_data->paused == DPDK_NEVER_STARTED) {
    1219         if (format_data->snaplen == 0) {
    1220             format_data->snaplen = RX_MBUF_SIZE;
    1221             port_conf.rxmode.jumbo_frame = 0;
    1222             port_conf.rxmode.max_rx_pkt_len = 0;
    1223         } else {
    1224             /* Use jumbo frames */
    1225             port_conf.rxmode.jumbo_frame = 1;
    1226             port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
    1227         }
    1228 
    1229         /* This is additional overhead so make sure we allow space for this */
    1230 #if GET_MAC_CRC_CHECKSUM
    1231         format_data->snaplen += ETHER_CRC_LEN;
    1232 #endif
    1233 #if HAS_HW_TIMESTAMPS_82580
    1234         format_data->snaplen += sizeof(struct hw_timestamp_82580);
    1235 #endif
    1236 
    1237         /* Create the mbuf pool, which is the place our packets are allocated
    1238          * from - TODO figure out if there is a free function (I cannot see one)
    1239          * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
    1240          * allocate however that extra 1 packet is not used.
    1241          * (I assume <= vs < error some where in DPDK code)
    1242          * TX requires nb_tx_buffers + 1 in the case the queue is full
    1243          * so that will fill the new buffer and wait until slots in the
    1244          * ring become available.
    1245          */
     1240}
     1241
     1242static int dpdk_start_input (libtrace_t *libtrace) {
     1243        char err[500];
     1244        err[0] = 0;
     1245
     1246        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), 1) != 0) {
     1247                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
     1248                free(libtrace->format_data);
     1249                libtrace->format_data = NULL;
     1250                return -1;
     1251        }
     1252        return 0;
     1253}
     1254
     1255static inline size_t dpdk_get_max_rx_queues (uint8_t port_id) {
     1256        struct rte_eth_dev_info dev_info;
     1257        rte_eth_dev_info_get(port_id, &dev_info);
     1258        return dev_info.max_rx_queues;
     1259}
     1260
     1261static inline size_t dpdk_processor_count () {
     1262        long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
     1263        if (nb_cpu <= 0)
     1264                return 1;
     1265        else
     1266                return (size_t) nb_cpu;
     1267}
     1268
     1269static int dpdk_pstart_input (libtrace_t *libtrace) {
     1270        char err[500];
     1271        int i=0, phys_cores=0;
     1272        int tot = libtrace->perpkt_thread_count;
     1273        err[0] = 0;
     1274
     1275        if (rte_lcore_id() != rte_get_master_lcore())
     1276                fprintf(stderr, "Warning dpdk_pstart_input should be called"
     1277                        " from the master DPDK thread!\n");
     1278
     1279        /* If the master is not on the last thread we move it there */
     1280        if (rte_get_master_lcore() != RTE_MAX_LCORE - 1) {
     1281                if (dpdk_move_master_lcore(libtrace, RTE_MAX_LCORE - 1) != 0)
     1282                        return -1;
     1283        }
     1284
     1285        /* Don't exceed the number of cores in the system/detected by dpdk
     1286         * We don't have to force this but performance wont be good if we don't */
     1287        for (i = 0; i < RTE_MAX_LCORE; ++i) {
     1288                if (lcore_config[i].detected) {
     1289                        if (rte_lcore_is_enabled(i))
     1290                                fprintf(stderr, "Found core %d already in use!\n", i);
     1291                        else
     1292                                phys_cores++;
     1293                }
     1294        }
     1295
     1296        tot = MIN(libtrace->perpkt_thread_count,
     1297                  dpdk_get_max_rx_queues(FORMAT(libtrace)->port));
     1298        tot = MIN(tot, phys_cores);
     1299
    12461300#if DEBUG
    1247     fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name);
    1248 #endif
    1249     format_data->pktmbuf_pool =
    1250             rte_mempool_create(format_data->mempool_name,
    1251                        (format_data->nb_rx_buf * rx_queues + format_data->nb_tx_buf + 1)*2,
    1252                        format_data->snaplen + sizeof(struct rte_mbuf)
    1253                                         + RTE_PKTMBUF_HEADROOM,
    1254                        128, sizeof(struct rte_pktmbuf_pool_private),
    1255                        rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL,
    1256                        format_data->nic_numa_node, 0);
    1257 
    1258         if (format_data->pktmbuf_pool == NULL) {
    1259             snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
    1260                         "pool failed: %s", strerror(rte_errno));
    1261             return -1;
    1262         }
    1263     }
    1264 
    1265     /* ----------- Now do the setup for the port mapping ------------ */
    1266     /* Order of calls must be
    1267      * rte_eth_dev_configure()
    1268      * rte_eth_tx_queue_setup()
    1269      * rte_eth_rx_queue_setup()
    1270      * rte_eth_dev_start()
    1271      * other rte_eth calls
    1272      */
    1273 
    1274     /* This must be called first before another *eth* function
    1275      * 1 rx, 1 tx queue, port_conf sets checksum stripping etc */
    1276     ret = rte_eth_dev_configure(format_data->port, rx_queues, 1, &port_conf);
    1277     if (ret < 0) {
    1278         snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
    1279                             " %"PRIu8" : %s", format_data->port,
    1280                             strerror(-ret));
    1281         return -1;
    1282     }
    1283 #if DEBUG
    1284     fprintf(stderr, "Doing dev configure\n");
    1285 #endif
    1286     /* Initialise the TX queue a minimum value if using this port for
    1287      * receiving. Otherwise a larger size if writing packets.
    1288      */
    1289     ret = rte_eth_tx_queue_setup(format_data->port, format_data->queue_id,
    1290                         format_data->nb_tx_buf, SOCKET_ID_ANY, &tx_conf);
    1291     if (ret < 0) {
    1292         snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue on port"
    1293                             " %"PRIu8" : %s", format_data->port,
    1294                             strerror(-ret));
    1295         return -1;
    1296     }
    1297 
    1298     for (i=0; i < rx_queues; i++) {
    1299 #if DEBUG
    1300     fprintf(stderr, "Doing queue configure\n");
    1301 #endif
    1302 
    1303                 /* Initialise the RX queue with some packets from memory */
    1304                 ret = rte_eth_rx_queue_setup(format_data->port, i,
    1305                                              format_data->nb_rx_buf, format_data->nic_numa_node,
    1306                                              &rx_conf, format_data->pktmbuf_pool);
    1307         /* Init per_thread data structures */
    1308         format_data->per_lcore[i].port = format_data->port;
    1309         format_data->per_lcore[i].queue_id = i;
    1310 
    1311                 if (ret < 0) {
    1312                         snprintf(err, errlen, "Intel DPDK - Cannot configure RX queue on port"
    1313                                                 " %"PRIu8" : %s", format_data->port,
    1314                                                 strerror(-ret));
    1315                         return -1;
    1316                 }
    1317         }
    1318 
    1319 #if DEBUG
    1320     fprintf(stderr, "Doing start device\n");
    1321 #endif
    1322     /* Start device */
    1323     ret = rte_eth_dev_start(format_data->port);
    1324 #if DEBUG
    1325     fprintf(stderr, "Done start device\n");
    1326 #endif
    1327     if (ret < 0) {
    1328         snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
    1329                     strerror(-ret));
    1330         return -1;
    1331     }
    1332 
    1333 
    1334     /* Default promiscuous to on */
    1335     if (format_data->promisc == -1)
    1336         format_data->promisc = 1;
    1337 
    1338     if (format_data->promisc == 1)
    1339         rte_eth_promiscuous_enable(format_data->port);
    1340     else
    1341         rte_eth_promiscuous_disable(format_data->port);
    1342 
    1343 
    1344     /* We have now successfully started/unpased */
    1345     format_data->paused = DPDK_RUNNING;
    1346 
    1347     // Can use remote launch for all
    1348     /*RTE_LCORE_FOREACH_SLAVE(i) {
    1349                 rte_eal_remote_launch(perpkt_threads_entry, (void *)libtrace, i);
    1350         }*/
    1351 
    1352     /* Register a callback for link state changes */
    1353     ret = rte_eth_dev_callback_register(format_data->port,
    1354                                         RTE_ETH_EVENT_INTR_LSC,
    1355                                         dpdk_lsc_callback,
    1356                                         format_data);
    1357     /* If this fails it is not a show stopper */
    1358 #if DEBUG
    1359     fprintf(stderr, "rte_eth_dev_callback_register failed %d : %s\n",
    1360             ret, strerror(-ret));
    1361 #endif
    1362 
    1363     /* Get the current link status */
    1364     rte_eth_link_get_nowait(format_data->port, &link_info);
    1365     format_data->link_speed = link_info.link_speed;
    1366 #if DEBUG
    1367     fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
    1368             (int) link_info.link_duplex, (int) link_info.link_speed);
    1369 #endif
    1370 
    1371     return 0;
    1372 }
    1373 
    1374 static int dpdk_start_input (libtrace_t *libtrace) {
    1375     char err[500];
    1376     err[0] = 0;
    1377 
    1378     if (dpdk_start_port(FORMAT(libtrace), err, sizeof(err)) != 0) {
    1379         trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
    1380         free(libtrace->format_data);
    1381         libtrace->format_data = NULL;
    1382         return -1;
    1383     }
    1384     return 0;
    1385 }
    1386 
    1387 static inline size_t dpdk_get_max_rx_queues (uint8_t port_id) {
    1388     struct rte_eth_dev_info dev_info;
    1389     rte_eth_dev_info_get(port_id, &dev_info);
    1390     return dev_info.max_rx_queues;
    1391 }
    1392 
    1393 static inline size_t dpdk_processor_count () {
    1394     long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
    1395     if (nb_cpu <= 0)
    1396         return 1;
    1397     else
    1398         return (size_t) nb_cpu;
    1399 }
    1400 
    1401 static int dpdk_pstart_input (libtrace_t *libtrace) {
    1402     char err[500];
    1403     int i=0, phys_cores=0;
    1404     int tot = libtrace->perpkt_thread_count;
    1405     err[0] = 0;
    1406 
    1407     if (rte_lcore_id() != rte_get_master_lcore())
    1408         fprintf(stderr, "Warning dpdk_pstart_input should be called from the master DPDK thread!\n");
    1409 
    1410     // If the master is not on the last thread we move it there
    1411     if (rte_get_master_lcore() != RTE_MAX_LCORE - 1) {
    1412         // Consider error handling here
    1413         dpdk_move_master_lcore(RTE_MAX_LCORE - 1);
    1414     }
    1415 
    1416     // Don't exceed the number of cores in the system/detected by dpdk
    1417     // We don't have to force this but performance wont be good if we don't
    1418     for (i = 0; i < RTE_MAX_LCORE; ++i) {
    1419         if (lcore_config[i].detected) {
    1420             if (rte_lcore_is_enabled(i))
    1421                 fprintf(stderr, "Found core %d already in use!\n", i);
    1422             else
    1423                 phys_cores++;
    1424         }
    1425     }
    1426 
    1427         tot = MIN(libtrace->perpkt_thread_count, dpdk_get_max_rx_queues(FORMAT(libtrace)->port));
    1428         tot = MIN(tot, phys_cores);
    1429 
    1430         fprintf(stderr, "Running pstart DPDK tot=%d req=%d phys=%d\n", tot, libtrace->perpkt_thread_count, phys_cores);
    1431 
    1432     if (dpdk_start_port_queues(FORMAT(libtrace), err, sizeof(err), tot) != 0) {
    1433         trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
    1434         free(libtrace->format_data);
    1435         libtrace->format_data = NULL;
    1436         return -1;
    1437     }
    1438 
    1439     // Make sure we only start the number that we should
    1440     libtrace->perpkt_thread_count = tot;
    1441     return 0;
    1442 }
    1443 
     1301        fprintf(stderr, "Running pstart DPDK tot=%d req=%d phys=%d\n", tot,
     1302                libtrace->perpkt_thread_count, phys_cores);
     1303#endif
     1304
     1305        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), tot) != 0) {
     1306                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
     1307                free(libtrace->format_data);
     1308                libtrace->format_data = NULL;
     1309                return -1;
     1310        }
     1311
     1312        /* Make sure we only start the number that we should */
     1313        libtrace->perpkt_thread_count = tot;
     1314        return 0;
     1315}
    14441316
    14451317/**
     
    14591331static int dpdk_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t, bool reading)
    14601332{
    1461     struct rte_config *cfg = rte_eal_get_configuration();
    1462     int i;
    1463     int new_id = -1;
    1464 
    1465     // If 'reading packets' fill in cores from 0 up and bind affinity
    1466     // otherwise start from the MAX core (which is also the master) and work backwards
    1467     // in this case physical cores on the system will not exist so we don't bind
    1468     // these to any particular physical core
    1469     pthread_mutex_lock(&libtrace->libtrace_lock);
    1470     if (reading) {
     1333        struct rte_config *cfg = rte_eal_get_configuration();
     1334        int i;
     1335        int new_id = -1;
     1336
     1337        /* If 'reading packets' fill in cores from 0 up and bind affinity
     1338         * otherwise start from the MAX core (which is also the master) and work backwards
     1339         * in this case physical cores on the system will not exist so we don't bind
     1340         * these to any particular physical core */
     1341        pthread_mutex_lock(&libtrace->libtrace_lock);
     1342        if (reading) {
    14711343#if HAVE_LIBNUMA
    1472         for (i = 0; i < RTE_MAX_LCORE; ++i) {
    1473                 if (!rte_lcore_is_enabled(i) && numa_node_of_cpu(i) == FORMAT(libtrace)->nic_numa_node) {
     1344                for (i = 0; i < RTE_MAX_LCORE; ++i) {
     1345                        if (!rte_lcore_is_enabled(i) && numa_node_of_cpu(i) == FORMAT(libtrace)->nic_numa_node) {
    14741346                                new_id = i;
    1475                         if (!lcore_config[i].detected)
    1476                                 new_id = -1;
    1477                         break;
    1478                 }
    1479         }
    1480 #endif
    1481         /* Retry without the the numa restriction */
    1482         if (new_id == -1) {
    1483                 for (i = 0; i < RTE_MAX_LCORE; ++i) {
     1347                                if (!lcore_config[i].detected)
     1348                                        new_id = -1;
     1349                                break;
     1350                        }
     1351                }
     1352#endif
     1353                /* Retry without the the numa restriction */
     1354                if (new_id == -1) {
     1355                        for (i = 0; i < RTE_MAX_LCORE; ++i) {
    14841356                                if (!rte_lcore_is_enabled(i)) {
    14851357                                        new_id = i;
    1486                                 if (!lcore_config[i].detected)
    1487                                         fprintf(stderr, "Warning the number of 'reading' threads exceed cores on machine!!\n");
     1358                                        if (!lcore_config[i].detected)
     1359                                                fprintf(stderr, "Warning the"
     1360                                                        " number of 'reading' "
     1361                                                        "threads exceed cores\n");
     1362                                        break;
     1363                                }
     1364                        }
     1365                }
     1366        } else {
     1367                for (i = RTE_MAX_LCORE-1; i >= 0; --i) {
     1368                        if (!rte_lcore_is_enabled(i)) {
     1369                                new_id = i;
    14881370                                break;
    14891371                        }
    14901372                }
    14911373        }
    1492     } else {
    1493         for (i = RTE_MAX_LCORE-1; i >= 0; --i) {
    1494             if (!rte_lcore_is_enabled(i)) {
    1495                 new_id = i;
    1496                 break;
    1497             }
    1498         }
    1499     }
    1500 
    1501     if (new_id == -1) {
    1502         assert(cfg->lcore_count == RTE_MAX_LCORE);
    1503         // TODO proper libtrace style error here!!
    1504         fprintf(stderr, "Too many threads for DPDK!!\n");
    1505         pthread_mutex_unlock(&libtrace->libtrace_lock);
    1506         return -1;
    1507     }
    1508 
    1509     // Enable the core in global DPDK structs
    1510     cfg->lcore_role[new_id] = ROLE_RTE;
    1511     cfg->lcore_count++;
    1512     // Set TLS to reflect our new number
    1513     assert(rte_lcore_id() == 0); // I think new threads are going get a default thread number of 0
    1514     fprintf(stderr, "original id%d", rte_lcore_id());
    1515     RTE_PER_LCORE(_lcore_id) = new_id;
     1374
     1375        if (new_id == -1) {
     1376                assert(cfg->lcore_count == RTE_MAX_LCORE);
     1377                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Too many threads"
     1378                              " for DPDK");
     1379                pthread_mutex_unlock(&libtrace->libtrace_lock);
     1380                return -1;
     1381        }
     1382
     1383        /* Enable the core in global DPDK structs */
     1384        cfg->lcore_role[new_id] = ROLE_RTE;
     1385        cfg->lcore_count++;
     1386         /* I think new threads are going get a default thread number of 0 */
     1387        assert(rte_lcore_id() == 0);
     1388        fprintf(stderr, "original id%d", rte_lcore_id());
     1389        RTE_PER_LCORE(_lcore_id) = new_id;
    15161390        char name[99];
    15171391        pthread_getname_np(pthread_self(),
    1518                               name, sizeof(name));
    1519 
    1520     fprintf(stderr, "%s new id%d\n", name, rte_lcore_id());
    1521 
    1522     if (reading) {
    1523         // Set affinity bind to corresponding core
    1524         cpu_set_t cpuset;
    1525         CPU_ZERO(&cpuset);
    1526         CPU_SET(rte_lcore_id(), &cpuset);
    1527         i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
    1528         if (i != 0) {
    1529             fprintf(stderr, "Warning pthread_setaffinity_np failed\n");
    1530             pthread_mutex_unlock(&libtrace->libtrace_lock);
    1531             return -1;
    1532         }
    1533     }
    1534 
    1535     // Map our TLS to the thread data
    1536     if (reading) {
    1537         if(t->type == THREAD_PERPKT) {
    1538             t->format_data = &FORMAT(libtrace)->per_lcore[t->perpkt_num];
    1539         } else {
    1540             t->format_data = &FORMAT(libtrace)->per_lcore[0];
    1541         }
    1542     }
    1543     pthread_mutex_unlock(&libtrace->libtrace_lock);
    1544     return 0;
    1545 }
    1546 
     1392                           name, sizeof(name));
     1393
     1394        fprintf(stderr, "%s new id%d\n", name, rte_lcore_id());
     1395
     1396        if (reading) {
     1397                /* Set affinity bind to corresponding core */
     1398                cpu_set_t cpuset;
     1399                CPU_ZERO(&cpuset);
     1400                CPU_SET(rte_lcore_id(), &cpuset);
     1401                i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
     1402                if (i != 0) {
     1403                        trace_set_err(libtrace, errno, "Warning "
     1404                                      "pthread_setaffinity_np failed");
     1405                        pthread_mutex_unlock(&libtrace->libtrace_lock);
     1406                        return -1;
     1407                }
     1408        }
     1409
     1410        /* Map our TLS to the thread data */
     1411        if (reading) {
     1412                if(t->type == THREAD_PERPKT) {
     1413                        t->format_data = libtrace_list_get_index(FORMAT(libtrace)->per_stream, t->perpkt_num)->data;
     1414                        if (t->format_data == NULL) {
     1415                                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
     1416                                              "Too many threads registered");
     1417                                return -1;
     1418                        }
     1419                } else {
     1420                        t->format_data = FORMAT_DATA_FIRST(libtrace);
     1421                }
     1422        }
     1423        pthread_mutex_unlock(&libtrace->libtrace_lock);
     1424        return 0;
     1425}
    15471426
    15481427/**
     
    15541433static void dpdk_punregister_thread(libtrace_t *libtrace UNUSED, libtrace_thread_t *t UNUSED)
    15551434{
    1556     struct rte_config *cfg = rte_eal_get_configuration();
    1557 
    1558     assert(rte_lcore_id() < RTE_MAX_LCORE);
    1559     pthread_mutex_lock(&libtrace->libtrace_lock);
    1560     // Skip if master!!
    1561     if (rte_lcore_id() == rte_get_master_lcore()) {
    1562         fprintf(stderr, "INFO: we are skipping unregistering the master lcore\n");
     1435        struct rte_config *cfg = rte_eal_get_configuration();
     1436
     1437        assert(rte_lcore_id() < RTE_MAX_LCORE);
     1438        pthread_mutex_lock(&libtrace->libtrace_lock);
     1439        /* Skip if master */
     1440        if (rte_lcore_id() == rte_get_master_lcore()) {
     1441                fprintf(stderr, "INFO: we are skipping unregistering the master lcore\n");
     1442                pthread_mutex_unlock(&libtrace->libtrace_lock);
     1443                return;
     1444        }
     1445
     1446        /* Disable this core in global DPDK structs */
     1447        cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
     1448        cfg->lcore_count--;
     1449        RTE_PER_LCORE(_lcore_id) = -1; // Might make the world burn if used again
     1450        assert(cfg->lcore_count >= 1); // We cannot unregister the master LCORE!!
    15631451        pthread_mutex_unlock(&libtrace->libtrace_lock);
    15641452        return;
    1565     }
    1566 
    1567     // Disable this core in global DPDK structs
    1568     cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
    1569     cfg->lcore_count--;
    1570     RTE_PER_LCORE(_lcore_id) = -1; // Might make the world burn if used again
    1571     assert(cfg->lcore_count >= 1); // We cannot unregister the master LCORE!!
    1572     pthread_mutex_unlock(&libtrace->libtrace_lock);
    1573     return;
    15741453}
    15751454
    15761455static int dpdk_start_output(libtrace_out_t *libtrace)
    15771456{
    1578     char err[500];
    1579     err[0] = 0;
    1580 
    1581     if (dpdk_start_port(FORMAT(libtrace), err, sizeof(err)) != 0) {
    1582         trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
    1583         free(libtrace->format_data);
    1584         libtrace->format_data = NULL;
    1585         return -1;
    1586     }
    1587     return 0;
     1457        char err[500];
     1458        err[0] = 0;
     1459
     1460        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), 1) != 0) {
     1461                trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
     1462                free(libtrace->format_data);
     1463                libtrace->format_data = NULL;
     1464                return -1;
     1465        }
     1466        return 0;
    15881467}
    15891468
    15901469static int dpdk_pause_input(libtrace_t * libtrace) {
    1591     /* This stops the device, but can be restarted using rte_eth_dev_start() */
    1592     if (FORMAT(libtrace)->paused == DPDK_RUNNING) {
     1470        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD(libtrace);
     1471        /* This stops the device, but can be restarted using rte_eth_dev_start() */
     1472        if (FORMAT(libtrace)->paused == DPDK_RUNNING) {
    15931473#if DEBUG
    1594         fprintf(stderr, "Pausing DPDK port\n");
    1595 #endif
    1596         rte_eth_dev_stop(FORMAT(libtrace)->port);
    1597         FORMAT(libtrace)->paused = DPDK_PAUSED;
    1598         /* Empty the queue of packets */
    1599         for (; FORMAT(libtrace)->burst_offset < FORMAT(libtrace)->burst_size; ++FORMAT(libtrace)->burst_offset) {
    1600                 rte_pktmbuf_free(FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset]);
    1601         }
    1602         FORMAT(libtrace)->burst_offset = 0;
    1603         FORMAT(libtrace)->burst_size = 0;
    1604         /* If we pause it the driver will be reset and likely our counter */
    1605 
    1606         FORMAT(libtrace)->per_lcore[0].ts_last_sys = 0;
     1474                fprintf(stderr, "Pausing DPDK port\n");
     1475#endif
     1476                rte_eth_dev_stop(FORMAT(libtrace)->port);
     1477                FORMAT(libtrace)->paused = DPDK_PAUSED;
     1478                /* Empty the queue of packets */
     1479                for (; FORMAT(libtrace)->burst_offset < FORMAT(libtrace)->burst_size; ++FORMAT(libtrace)->burst_offset) {
     1480                        rte_pktmbuf_free(FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset]);
     1481                }
     1482                FORMAT(libtrace)->burst_offset = 0;
     1483                FORMAT(libtrace)->burst_size = 0;
     1484
     1485                for (; tmp != NULL; tmp = tmp->next) {
     1486                        dpdk_per_stream_t *stream = tmp->data;
     1487                        stream->ts_last_sys = 0;
    16071488#if HAS_HW_TIMESTAMPS_82580
    1608         FORMAT(libtrace)->per_lcore[0].ts_first_sys = 0;
    1609 #endif
    1610     }
    1611     return 0;
     1489                        stream->ts_first_sys = 0;
     1490#endif
     1491                }
     1492
     1493        }
     1494        return 0;
    16121495}
    16131496
    16141497static int dpdk_write_packet(libtrace_out_t *trace,
    1615                 libtrace_packet_t *packet){
    1616     struct rte_mbuf* m_buff[1];
    1617 
    1618     int wirelen = trace_get_wire_length(packet);
    1619     int caplen = trace_get_capture_length(packet);
    1620 
    1621     /* Check for a checksum and remove it */
    1622     if (trace_get_link_type(packet) == TRACE_TYPE_ETH &&
    1623                                             wirelen == caplen)
    1624         caplen -= ETHER_CRC_LEN;
    1625 
    1626     m_buff[0] = rte_pktmbuf_alloc(FORMAT(trace)->pktmbuf_pool);
    1627     if (m_buff[0] == NULL) {
    1628         trace_set_err_out(trace, errno, "Cannot get an empty packet buffer");
    1629         return -1;
    1630     } else {
    1631         int ret;
    1632         memcpy(rte_pktmbuf_append(m_buff[0], caplen), packet->payload, caplen);
    1633         do {
    1634             ret = rte_eth_tx_burst(FORMAT(trace)->queue_id, FORMAT(trace)->port, m_buff, 1);
    1635         } while (ret != 1);
    1636     }
    1637 
    1638     return 0;
     1498                             libtrace_packet_t *packet){
     1499        struct rte_mbuf* m_buff[1];
     1500
     1501        int wirelen = trace_get_wire_length(packet);
     1502        int caplen = trace_get_capture_length(packet);
     1503
     1504        /* Check for a checksum and remove it */
     1505        if (trace_get_link_type(packet) == TRACE_TYPE_ETH &&
     1506            wirelen == caplen)
     1507                caplen -= ETHER_CRC_LEN;
     1508
     1509        m_buff[0] = rte_pktmbuf_alloc(FORMAT(trace)->pktmbuf_pool);
     1510        if (m_buff[0] == NULL) {
     1511                trace_set_err_out(trace, errno, "Cannot get an empty packet buffer");
     1512                return -1;
     1513        } else {
     1514                int ret;
     1515                memcpy(rte_pktmbuf_append(m_buff[0], caplen), packet->payload, caplen);
     1516                do {
     1517                        ret = rte_eth_tx_burst(0 /*queue TODO*/, FORMAT(trace)->port, m_buff, 1);
     1518                } while (ret != 1);
     1519        }
     1520
     1521        return 0;
    16391522}
    16401523
    16411524static int dpdk_fin_input(libtrace_t * libtrace) {
    1642     /* Free our memory structures */
    1643     if (libtrace->format_data != NULL) {
    1644         /* Close the device completely, device cannot be restarted */
    1645         if (FORMAT(libtrace)->port != 0xFF)
    1646                 rte_eth_dev_callback_unregister(FORMAT(libtrace)->port,
    1647                                                 RTE_ETH_EVENT_INTR_LSC,
    1648                                                 dpdk_lsc_callback,
    1649                                                 FORMAT(libtrace));
     1525        /* Free our memory structures */
     1526        if (libtrace->format_data != NULL) {
     1527                /* Close the device completely, device cannot be restarted */
     1528                if (FORMAT(libtrace)->port != 0xFF)
     1529                        rte_eth_dev_callback_unregister(FORMAT(libtrace)->port,
     1530                                                        RTE_ETH_EVENT_INTR_LSC,
     1531                                                        dpdk_lsc_callback,
     1532                                                        FORMAT(libtrace));
    16501533                rte_eth_dev_close(FORMAT(libtrace)->port);
     1534                libtrace_list_deinit(FORMAT(libtrace)->per_stream);
    16511535                /* filter here if we used it */
    16521536                free(libtrace->format_data);
    16531537        }
    16541538
    1655     /* Revert to the original PCI drivers */
    1656     /* No longer in DPDK
    1657     rte_eal_pci_exit(); */
    1658     return 0;
     1539        return 0;
    16591540}
    16601541
    16611542
    16621543static int dpdk_fin_output(libtrace_out_t * libtrace) {
    1663     /* Free our memory structures */
    1664     if (libtrace->format_data != NULL) {
    1665         /* Close the device completely, device cannot be restarted */
    1666         if (FORMAT(libtrace)->port != 0xFF)
    1667             rte_eth_dev_close(FORMAT(libtrace)->port);
    1668         /* filter here if we used it */
     1544        /* Free our memory structures */
     1545        if (libtrace->format_data != NULL) {
     1546                /* Close the device completely, device cannot be restarted */
     1547                if (FORMAT(libtrace)->port != 0xFF)
     1548                        rte_eth_dev_close(FORMAT(libtrace)->port);
     1549                libtrace_list_deinit(FORMAT(libtrace)->per_stream);
     1550                /* filter here if we used it */
    16691551                free(libtrace->format_data);
    16701552        }
    16711553
    1672     /* Revert to the original PCI drivers */
    1673     /* No longer in DPDK
    1674     rte_eal_pci_exit(); */
    1675     return 0;
     1554        return 0;
    16761555}
    16771556
     
    16801559 */
    16811560static inline struct dpdk_addt_hdr * get_addt_hdr (const libtrace_packet_t *packet) {
    1682     assert(packet);
    1683     assert(packet->buffer);
    1684     /* Our header sits straight after the mbuf header */
    1685     return (struct dpdk_addt_hdr *) ((struct rte_mbuf*) packet->buffer + 1);
     1561        assert(packet);
     1562        assert(packet->buffer);
     1563        /* Our header sits straight after the mbuf header */
     1564        return (struct dpdk_addt_hdr *) ((struct rte_mbuf*) packet->buffer + 1);
    16861565}
    16871566
    16881567static int dpdk_get_capture_length (const libtrace_packet_t *packet) {
    1689     struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
    1690     return hdr->cap_len;
     1568        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
     1569        return hdr->cap_len;
    16911570}
    16921571
    16931572static size_t dpdk_set_capture_length(libtrace_packet_t *packet, size_t size) {
    1694     struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
    1695     if (size > hdr->cap_len) {
    1696         /* Cannot make a packet bigger */
     1573        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
     1574        if (size > hdr->cap_len) {
     1575                /* Cannot make a packet bigger */
    16971576                return trace_get_capture_length(packet);
    16981577        }
    16991578
    1700     /* Reset the cached capture length first*/
    1701     packet->capture_length = -1;
    1702     hdr->cap_len = (uint32_t) size;
     1579        /* Reset the cached capture length first*/
     1580        packet->capture_length = -1;
     1581        hdr->cap_len = (uint32_t) size;
    17031582        return trace_get_capture_length(packet);
    17041583}
    17051584
    17061585static int dpdk_get_wire_length (const libtrace_packet_t *packet) {
    1707     struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
    1708     int org_cap_size; /* The original capture size */
    1709     if (hdr->flags & INCLUDES_HW_TIMESTAMP) {
    1710         org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
    1711                             sizeof(struct hw_timestamp_82580);
    1712     } else {
    1713         org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer));
    1714     }
    1715     if (hdr->flags & INCLUDES_CHECKSUM) {
    1716         return org_cap_size;
    1717     } else {
    1718         /* DPDK packets are always TRACE_TYPE_ETH packets */
    1719         return org_cap_size + ETHER_CRC_LEN;
    1720     }
    1721 }
     1586        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
     1587        int org_cap_size; /* The original capture size */
     1588        if (hdr->flags & INCLUDES_HW_TIMESTAMP) {
     1589                org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
     1590                               sizeof(struct hw_timestamp_82580);
     1591        } else {
     1592                org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer));
     1593        }
     1594        if (hdr->flags & INCLUDES_CHECKSUM) {
     1595                return org_cap_size;
     1596        } else {
     1597                /* DPDK packets are always TRACE_TYPE_ETH packets */
     1598                return org_cap_size + ETHER_CRC_LEN;
     1599        }
     1600}
     1601
    17221602static int dpdk_get_framing_length (const libtrace_packet_t *packet) {
    1723     struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
    1724     if (hdr->flags & INCLUDES_HW_TIMESTAMP)
    1725         return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM +
    1726                 sizeof(struct hw_timestamp_82580);
    1727     else
    1728         return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
     1603        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
     1604        if (hdr->flags & INCLUDES_HW_TIMESTAMP)
     1605                return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM +
     1606                                sizeof(struct hw_timestamp_82580);
     1607        else
     1608                return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
    17291609}
    17301610
    17311611static int dpdk_prepare_packet(libtrace_t *libtrace UNUSED,
    1732                 libtrace_packet_t *packet, void *buffer,
    1733                 libtrace_rt_types_t rt_type, uint32_t flags) {
    1734     assert(packet);
    1735     if (packet->buffer != buffer &&
    1736         packet->buf_control == TRACE_CTRL_PACKET) {
    1737         free(packet->buffer);
    1738     }
    1739 
    1740     if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
    1741         packet->buf_control = TRACE_CTRL_PACKET;
    1742     } else
    1743         packet->buf_control = TRACE_CTRL_EXTERNAL;
    1744 
    1745     packet->buffer = buffer;
    1746     packet->header = buffer;
    1747 
    1748     /* Don't use pktmbuf_mtod will fail if the packet is a copy */
    1749     packet->payload = (char *)buffer + dpdk_get_framing_length(packet);
    1750     packet->type = rt_type;
    1751     return 0;
    1752 }
    1753 
     1612                               libtrace_packet_t *packet, void *buffer,
     1613                               libtrace_rt_types_t rt_type, uint32_t flags) {
     1614        assert(packet);
     1615        if (packet->buffer != buffer &&
     1616            packet->buf_control == TRACE_CTRL_PACKET) {
     1617                free(packet->buffer);
     1618        }
     1619
     1620        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER)
     1621                packet->buf_control = TRACE_CTRL_PACKET;
     1622        else
     1623                packet->buf_control = TRACE_CTRL_EXTERNAL;
     1624
     1625        packet->buffer = buffer;
     1626        packet->header = buffer;
     1627
     1628        /* Don't use pktmbuf_mtod will fail if the packet is a copy */
     1629        packet->payload = (char *)buffer + dpdk_get_framing_length(packet);
     1630        packet->type = rt_type;
     1631        return 0;
     1632}
    17541633
    17551634/**
     
    18111690}
    18121691
    1813 
    1814 
    1815 /*
     1692/**
    18161693 * Does any extra preperation to all captured packets
    18171694 * This includes adding our extra header to it with the timestamp,
     
    18211698 * @param plc The DPDK per lcore format data
    18221699 * @param pkts An array of size nb_pkts of DPDK packets
    1823  * @param nb_pkts The number of packets in pkts and optionally packets
    1824  * @param packets Optional - If not null nb_pkts of libtrace packets which will be prepared
    1825  */
    1826 static inline void dpdk_ready_pkts(libtrace_t *libtrace, struct dpdk_per_lcore_t *plc,
    1827                                    struct rte_mbuf **pkts, size_t nb_pkts, libtrace_packet_t **packets) {
     1700 */
     1701static inline void dpdk_ready_pkts(libtrace_t *libtrace,
     1702                                   struct dpdk_per_stream_t *plc,
     1703                                   struct rte_mbuf **pkts,
     1704                                   size_t nb_pkts) {
    18281705        struct dpdk_format_data_t *format_data = FORMAT(libtrace);
    18291706        struct dpdk_addt_hdr *hdr;
     
    19731850
    19741851#endif
    1975                 if(packets) {
    1976                         packets[i]->buffer = pkts[i];
    1977                         packets[i]->header = pkts[i];
    1978                         packets[i]->trace = libtrace;
    1979 #if HAS_HW_TIMESTAMPS_82580
    1980                         packets[i]->payload = (char *) pkts[i] + sizeof(struct rte_mbuf) +
    1981                                               RTE_PKTMBUF_HEADROOM + sizeof(struct hw_timestamp_82580);
    1982 #else
    1983                         packets[i]->payload = (char *) pkts[i] + sizeof(struct rte_mbuf) +
    1984                                               RTE_PKTMBUF_HEADROOM;
    1985 #endif
    1986                         packets[i]->error = 1;
    1987                 }
    19881852        }
    19891853
     
    20021866}
    20031867
     1868/** Reads at least one packet or returns an error
     1869 */
     1870static inline int dpdk_read_packet_stream (libtrace_t *libtrace,
     1871                                           dpdk_per_stream_t *stream,
     1872                                           libtrace_message_queue_t *mesg,
     1873                                           struct rte_mbuf* pkts_burst[],
     1874                                           size_t nb_packets) {
     1875        size_t nb_rx; /* Number of rx packets we've recevied */
     1876        while (1) {
     1877                /* Poll for a batch of packets */
     1878                nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port,
     1879                                         stream->queue_id, pkts_burst, nb_packets);
     1880                if (nb_rx > 0) {
     1881                        /* Got some packets - otherwise we keep spining */
     1882                        dpdk_ready_pkts(libtrace, stream, pkts_burst, nb_rx);
     1883                        //fprintf(stderr, "Doing P READ PACKET port=%d q=%d\n", (int) FORMAT(libtrace)->port, (int) get_thread_table_num(libtrace));
     1884                        return nb_rx;
     1885                }
     1886                /* Check the message queue this could be less than 0 */
     1887                if (mesg && libtrace_message_queue_count(mesg) > 0)
     1888                        return READ_MESSAGE;
     1889                if (libtrace_halt)
     1890                        return READ_EOF;
     1891                /* Wait a while, polling on memory degrades performance
     1892                 * This relieves the pressure on memory allowing the NIC to DMA */
     1893                rte_delay_us(10);
     1894        }
     1895
     1896        /* We'll never get here - but if we did it would be bad */
     1897        return READ_ERROR;
     1898}
     1899
     1900static int dpdk_pread_packets (libtrace_t *libtrace,
     1901                                    libtrace_thread_t *t,
     1902                                    libtrace_packet_t **packets,
     1903                                    size_t nb_packets) {
     1904        int nb_rx; /* Number of rx packets we've recevied */
     1905        struct rte_mbuf* pkts_burst[nb_packets]; /* Array of pointer(s) */
     1906        size_t i;
     1907        dpdk_per_stream_t *stream = t->format_data;
     1908
     1909        nb_rx = dpdk_read_packet_stream (libtrace, stream, &t->messages,
     1910                                         pkts_burst, nb_packets);
     1911
     1912        if (nb_rx > 0) {
     1913                for (i = 0; i < nb_rx; ++i) {
     1914                        if (packets[i]->buffer != NULL) {
     1915                                /* The packet should always be finished */
     1916                                assert(packets[i]->buf_control == TRACE_CTRL_PACKET);
     1917                                free(packets[i]->buffer);
     1918                        }
     1919                        packets[i]->buf_control = TRACE_CTRL_EXTERNAL;
     1920                        packets[i]->type = TRACE_RT_DATA_DPDK;
     1921                        packets[i]->buffer = pkts_burst[i];
     1922                        packets[i]->trace = libtrace;
     1923                        packets[i]->error = 1;
     1924                        dpdk_prepare_packet(libtrace, packets[i], packets[i]->buffer, packets[i]->type, 0);
     1925                }
     1926        }
     1927
     1928        return nb_rx;
     1929}
    20041930
    20051931static int dpdk_read_packet (libtrace_t *libtrace, libtrace_packet_t *packet) {
    2006     int nb_rx; /* Number of rx packets we've received */
    2007 
    2008     /* Free the last packet buffer */
    2009     if (packet->buffer != NULL) {
    2010         /* Buffer is owned by DPDK */
    2011         if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
    2012             rte_pktmbuf_free(packet->buffer);
    2013             packet->buffer = NULL;
    2014         } else
    2015         /* Buffer is owned by packet i.e. has been malloc'd */
    2016         if (packet->buf_control == TRACE_CTRL_PACKET) {
    2017             free(packet->buffer);
    2018             packet->buffer = NULL;
    2019         }
    2020     }
    2021 
    2022     packet->buf_control = TRACE_CTRL_EXTERNAL;
    2023     packet->type = TRACE_RT_DATA_DPDK;
    2024 
    2025     /* Check if we already have some packets buffered */
    2026     if (FORMAT(libtrace)->burst_size != FORMAT(libtrace)->burst_offset) {
    2027             packet->buffer =  FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset++];
    2028             dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
    2029             return 1; // TODO should be bytes read, which essentially useless anyway
    2030     }
    2031     /* Wait for a packet */
    2032     while (1) {
    2033         /* Poll for a single packet */
    2034         nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port,
    2035                                  FORMAT(libtrace)->queue_id, FORMAT(libtrace)->burst_pkts, BURST_SIZE);
    2036         if (nb_rx > 0) { /* Got a packet - otherwise we keep spining */
     1932        int nb_rx; /* Number of rx packets we've received */
     1933        dpdk_per_stream_t *stream = FORMAT_DATA_FIRST(libtrace);
     1934
     1935        /* Free the last packet buffer */
     1936        if (packet->buffer != NULL) {
     1937                /* The packet should always be finished */
     1938                assert(packet->buf_control == TRACE_CTRL_PACKET);
     1939                free(packet->buffer);
     1940                packet->buffer = NULL;
     1941        }
     1942
     1943        packet->buf_control = TRACE_CTRL_EXTERNAL;
     1944        packet->type = TRACE_RT_DATA_DPDK;
     1945
     1946        /* Check if we already have some packets buffered */
     1947        if (FORMAT(libtrace)->burst_size != FORMAT(libtrace)->burst_offset) {
     1948                packet->buffer = FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset++];
     1949                dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
     1950                return 1; // TODO should be bytes read, which essentially useless anyway
     1951        }
     1952
     1953        nb_rx = dpdk_read_packet_stream (libtrace, stream, NULL,
     1954                                         FORMAT(libtrace)->burst_pkts, BURST_SIZE);
     1955
     1956        if (nb_rx > 0) {
    20371957                FORMAT(libtrace)->burst_size = nb_rx;
    20381958                FORMAT(libtrace)->burst_offset = 1;
    2039                 dpdk_ready_pkts(libtrace, &FORMAT(libtrace)->per_lcore[0], FORMAT(libtrace)->burst_pkts, nb_rx, NULL);
    20401959                packet->buffer = FORMAT(libtrace)->burst_pkts[0];
    20411960                dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
    2042                 return 1; // TODO should be bytes read, which essentially useless anyway
    2043         }
    2044         if (libtrace_halt) {
    2045                 return 0;
    2046         }
    2047         /* Wait a while, polling on memory degrades performance
    2048          * This relieves the pressure on memory allowing the NIC to DMA */
    2049         rte_delay_us(10);
    2050     }
    2051 
    2052     /* We'll never get here - but if we did it would be bad */
    2053     return -1;
    2054 }
    2055 
    2056 static int dpdk_pread_packets (libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packets, size_t nb_packets) {
    2057     size_t nb_rx; /* Number of rx packets we've recevied */
    2058     struct rte_mbuf* pkts_burst[nb_packets]; /* Array of pointer(s) */
    2059     size_t i;
    2060 
    2061     for (i = 0 ; i < nb_packets ; ++i) {
    2062             /* Free the last packet buffer */
    2063             if (packets[i]->buffer != NULL) {
    2064                 /* Buffer is owned by DPDK */
    2065                 if (packets[i]->buf_control == TRACE_CTRL_EXTERNAL) {
    2066                     rte_pktmbuf_free(packets[i]->buffer);
    2067                     packets[i]->buffer = NULL;
    2068                 } else
    2069                 /* Buffer is owned by packet i.e. has been malloc'd */
    2070                 if (packets[i]->buf_control == TRACE_CTRL_PACKET) {
    2071                     free(packets[i]->buffer);
    2072                     packets[i]->buffer = NULL;
    2073                 }
    2074             }
    2075             packets[i]->buf_control = TRACE_CTRL_EXTERNAL;
    2076             packets[i]->type = TRACE_RT_DATA_DPDK;
    2077     }
    2078 
    2079     /* Wait for a packet */
    2080     while (1) {
    2081         /* Poll for a single packet */
    2082         nb_rx = rte_eth_rx_burst(PERPKT_FORMAT(t)->port,
    2083                             PERPKT_FORMAT(t)->queue_id, pkts_burst, nb_packets);
    2084         if (nb_rx > 0) {
    2085                 /* Got some packets - otherwise we keep spining */
    2086                 //fprintf(stderr, "Doing P READ PACKET port=%d q=%d\n", (int) FORMAT(libtrace)->port, (int) get_thread_table_num(libtrace));
    2087                 dpdk_ready_pkts(libtrace, PERPKT_FORMAT(t), pkts_burst, nb_rx, packets);
    2088                 return nb_rx;
    2089         }
    2090         // Check the message queue this could be (Well it shouldn't but anyway) be less than 0
    2091         if (libtrace_message_queue_count(&t->messages) > 0) {
    2092                 printf("Extra message yay");
    2093                 return -2;
    2094         }
    2095         if (libtrace_halt) {
    2096                 return 0;
    2097         }
    2098         /* Wait a while, polling on memory degrades performance
    2099          * This relieves the pressure on memory allowing the NIC to DMA */
    2100         rte_delay_us(10);
    2101     }
    2102 
    2103     /* We'll never get here - but if we did it would be bad */
    2104     return -1;
     1961                return 1;
     1962        }
     1963        return nb_rx;
    21051964}
    21061965
    21071966static struct timeval dpdk_get_timeval (const libtrace_packet_t *packet) {
    2108     struct timeval tv;
    2109     struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
    2110 
    2111     tv.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
    2112     tv.tv_usec = (hdr->timestamp % (uint64_t) 1000000000) / 1000;
    2113     return tv;
     1967        struct timeval tv;
     1968        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
     1969
     1970        tv.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
     1971        tv.tv_usec = (hdr->timestamp % (uint64_t) 1000000000) / 1000;
     1972        return tv;
    21141973}
    21151974
    21161975static struct timespec dpdk_get_timespec (const libtrace_packet_t *packet) {
    2117     struct timespec ts;
    2118     struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
    2119 
    2120     ts.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
    2121     ts.tv_nsec = hdr->timestamp % (uint64_t) 1000000000;
    2122     return ts;
     1976        struct timespec ts;
     1977        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
     1978
     1979        ts.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
     1980        ts.tv_nsec = hdr->timestamp % (uint64_t) 1000000000;
     1981        return ts;
    21231982}
    21241983
    21251984static libtrace_linktype_t dpdk_get_link_type (const libtrace_packet_t *packet UNUSED) {
    2126     return TRACE_TYPE_ETH; /* Always ethernet until proven otherwise */
     1985        return TRACE_TYPE_ETH; /* Always ethernet until proven otherwise */
    21271986}
    21281987
    21291988static libtrace_direction_t dpdk_get_direction (const libtrace_packet_t *packet) {
    2130     struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
    2131     return (libtrace_direction_t) hdr->direction;
     1989        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
     1990        return (libtrace_direction_t) hdr->direction;
    21321991}
    21331992
    21341993static libtrace_direction_t dpdk_set_direction(libtrace_packet_t *packet, libtrace_direction_t direction) {
    2135     struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
    2136     hdr->direction = (uint8_t) direction;
    2137     return (libtrace_direction_t) hdr->direction;
     1994        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
     1995        hdr->direction = (uint8_t) direction;
     1996        return (libtrace_direction_t) hdr->direction;
    21381997}
    21391998
     
    21432002 */
    21442003static uint64_t dpdk_get_dropped_packets (libtrace_t *trace) {
    2145     struct rte_eth_stats stats = {0};
    2146 
    2147     if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
    2148         return UINT64_MAX;
    2149     /* Grab the current stats */
    2150     rte_eth_stats_get(FORMAT(trace)->port, &stats);
    2151 
    2152     /* Get the drop counter */
    2153     return (uint64_t) stats.ierrors;
     2004        struct rte_eth_stats stats = {0};
     2005
     2006        if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
     2007                return UINT64_MAX;
     2008        /* Grab the current stats */
     2009        rte_eth_stats_get(FORMAT(trace)->port, &stats);
     2010
     2011        /* Get the drop counter */
     2012        return (uint64_t) stats.ierrors;
    21542013}
    21552014
     
    21622021 */
    21632022static uint64_t dpdk_get_filtered_packets (libtrace_t *trace) {
    2164     struct rte_eth_stats stats = {0};
    2165 
    2166     if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
    2167         return UINT64_MAX;
    2168     /* Grab the current stats */
    2169     rte_eth_stats_get(FORMAT(trace)->port, &stats);
    2170 
    2171     /* Get the drop counter */
    2172     return (uint64_t) stats.fdirmiss;
     2023        struct rte_eth_stats stats = {0};
     2024
     2025        if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
     2026                return UINT64_MAX;
     2027        /* Grab the current stats */
     2028        rte_eth_stats_get(FORMAT(trace)->port, &stats);
     2029
     2030        /* Get the drop counter */
     2031        return (uint64_t) stats.fdirmiss;
    21732032}
    21742033
     
    21782037 */
    21792038static libtrace_eventobj_t dpdk_trace_event(libtrace_t *trace,
    2180                                         libtrace_packet_t *packet) {
    2181     libtrace_eventobj_t event = {0,0,0.0,0};
    2182     int nb_rx; /* Number of receive packets we've read */
    2183     struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) to rx buffers */
    2184 
    2185     do {
    2186 
    2187         /* See if we already have a packet waiting */
    2188         nb_rx = rte_eth_rx_burst(FORMAT(trace)->port,
    2189                         FORMAT(trace)->queue_id, pkts_burst, 1);
    2190 
    2191         if (nb_rx > 0) {
    2192             /* Free the last packet buffer */
    2193             if (packet->buffer != NULL) {
    2194                 /* Buffer is owned by DPDK */
    2195                 if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
    2196                     rte_pktmbuf_free(packet->buffer);
    2197                     packet->buffer = NULL;
    2198                 } else
    2199                 /* Buffer is owned by packet i.e. has been malloc'd */
    2200                 if (packet->buf_control == TRACE_CTRL_PACKET) {
    2201                     free(packet->buffer);
    2202                     packet->buffer = NULL;
    2203                 }
    2204             }
    2205 
    2206             packet->buf_control = TRACE_CTRL_EXTERNAL;
    2207             packet->type = TRACE_RT_DATA_DPDK;
    2208             event.type = TRACE_EVENT_PACKET;
    2209             dpdk_ready_pkts(trace, &FORMAT(trace)->per_lcore[0], pkts_burst, 1, &packet);
    2210             event.size = 1; // TODO should be bytes read, which essentially useless anyway
    2211 
    2212             /* XXX - Check this passes the filter trace_read_packet normally
    2213              * does this for us but this wont */
    2214             if (trace->filter) {
    2215                 if (!trace_apply_filter(trace->filter, packet)) {
    2216                     /* Failed the filter so we loop for another packet */
    2217                     trace->filtered_packets ++;
    2218                     continue;
    2219                 }
    2220             }
    2221             trace->accepted_packets ++;
    2222         } else {
    2223             /* We only want to sleep for a very short time - we are non-blocking */
    2224             event.type = TRACE_EVENT_SLEEP;
    2225             event.seconds = 0.0001;
    2226             event.size = 0;
    2227         }
    2228 
    2229         /* If we get here we have our event */
    2230         break;
    2231     } while (1);
    2232 
    2233     return event;
    2234 }
    2235 
     2039                                            libtrace_packet_t *packet) {
     2040        libtrace_eventobj_t event = {0,0,0.0,0};
     2041        int nb_rx; /* Number of receive packets we've read */
     2042        struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) to rx buffers */
     2043
     2044        do {
     2045
     2046                /* See if we already have a packet waiting */
     2047                nb_rx = rte_eth_rx_burst(FORMAT(trace)->port,
     2048                                         FORMAT_DATA_FIRST(trace)->queue_id,
     2049                                         pkts_burst, 1);
     2050
     2051                if (nb_rx > 0) {
     2052                        /* Free the last packet buffer */
     2053                        if (packet->buffer != NULL) {
     2054                                /* The packet should always be finished */
     2055                                assert(packet->buf_control == TRACE_CTRL_PACKET);
     2056                                free(packet->buffer);
     2057                                packet->buffer = NULL;
     2058                        }
     2059
     2060                        packet->buf_control = TRACE_CTRL_EXTERNAL;
     2061                        packet->type = TRACE_RT_DATA_DPDK;
     2062                        event.type = TRACE_EVENT_PACKET;
     2063                        dpdk_ready_pkts(trace, FORMAT_DATA_FIRST(trace), pkts_burst, 1);
     2064                        packet->buffer = FORMAT(trace)->burst_pkts[0];
     2065                        dpdk_prepare_packet(trace, packet, packet->buffer, packet->type, 0);
     2066                        event.size = 1; // TODO should be bytes read, which essentially useless anyway
     2067
     2068                        /* XXX - Check this passes the filter trace_read_packet normally
     2069                         * does this for us but this wont */
     2070                        if (trace->filter) {
     2071                                if (!trace_apply_filter(trace->filter, packet)) {
     2072                                        /* Failed the filter so we loop for another packet */
     2073                                        trace->filtered_packets ++;
     2074                                        continue;
     2075                                }
     2076                        }
     2077                        trace->accepted_packets ++;
     2078                } else {
     2079                        /* We only want to sleep for a very short time - we are non-blocking */
     2080                        event.type = TRACE_EVENT_SLEEP;
     2081                        event.seconds = 0.0001;
     2082                        event.size = 0;
     2083                }
     2084
     2085                /* If we get here we have our event */
     2086                break;
     2087        } while (1);
     2088
     2089        return event;
     2090}
    22362091
    22372092static void dpdk_help(void) {
    2238     printf("dpdk format module: $Revision: 1752 $\n");
    2239     printf("Supported input URIs:\n");
    2240     printf("\tdpdk:<domain:bus:devid.func>-<coreid>\n");
    2241     printf("\tThe -<coreid> is optional \n");
    2242     printf("\t e.g. dpdk:0000:01:00.1\n");
    2243     printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n\n");
    2244     printf("\t By default the last CPU core is used if not otherwise specified.\n");
    2245     printf("\t Only a single libtrace instance of dpdk can use the same CPU core.\n");
    2246     printf("\t Support for multiple simultaneous instances of dpdk format is currently limited.\n");
    2247     printf("\n");
    2248     printf("Supported output URIs:\n");
    2249     printf("\tSame format as the input URI.\n");
    2250     printf("\t e.g. dpdk:0000:01:00.1\n");
    2251     printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n");
    2252     printf("\n");
     2093        printf("dpdk format module: $Revision: 1752 $\n");
     2094        printf("Supported input URIs:\n");
     2095        printf("\tdpdk:<domain:bus:devid.func>-<coreid>\n");
     2096        printf("\tThe -<coreid> is optional \n");
     2097        printf("\t e.g. dpdk:0000:01:00.1\n");
     2098        printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n\n");
     2099        printf("\t By default the last CPU core is used if not otherwise specified.\n");
     2100        printf("\t Only a single libtrace instance of dpdk can use the same CPU core.\n");
     2101        printf("\t Support for multiple simultaneous instances of dpdk format is currently limited.\n");
     2102        printf("\n");
     2103        printf("Supported output URIs:\n");
     2104        printf("\tSame format as the input URI.\n");
     2105        printf("\t e.g. dpdk:0000:01:00.1\n");
     2106        printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n");
     2107        printf("\n");
    22532108}
    22542109
Note: See TracChangeset for help on using the changeset viewer.