Changeset e54bd5f for lib


Ignore:
Timestamp:
12/14/17 11:06:48 (3 years ago)
Author:
Shane Alcock <salcock@…>
Branches:
cachetimestamps, develop, dpdk-ndag, etsilive, master, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance
Children:
4f0f93f
Parents:
a6f2d1d
Message:

Tweaks to dpdkndag: format

  • Set snap length for dpdk receiver to 9K so that we can receive jumbo frames.
  • Enforce a unidirectional hasher on the dpdk receiver, just to ensure ndag streams don't get spread across multiple CPUs.
  • Add a mutex around any statistic updates to keep helgrind happy.
  • Increase number of mbufs used from 20 to 40.
  • Dropped packets by the dpdk receiver are included in stats as 'errors'.
File:
1 edited

Legend:

Unmodified
Added
Removed
  • lib/format_dpdkndag.c

    rb3199cf re54bd5f  
    4242        uint32_t ndagsize;
    4343
     44        pthread_mutex_t ndag_lock;
    4445        dpdk_per_stream_t *dpdkstreamdata;
    4546        int burstsize;
    4647        int burstoffset;
    47         struct rte_mbuf* burstspace[20];
     48        struct rte_mbuf* burstspace[40];
    4849
    4950} perthread_t;
     
    156157                FORMAT_DATA->threaddatas[i].burstoffset = 0;
    157158                memset(FORMAT_DATA->threaddatas[i].burstspace, 0,
    158                                 sizeof(struct rte_mbuf *) * 20);
     159                                sizeof(struct rte_mbuf *) * 40);
     160                pthread_mutex_init(&(FORMAT_DATA->threaddatas[i].ndag_lock),
     161                                NULL);
    159162        }
    160163        return maxthreads;
     
    162165
    163166static int dpdkndag_start_input(libtrace_t *libtrace) {
     167        enum hasher_types hash = HASHER_UNIDIRECTIONAL;
     168        int snaplen = 9000;
     169
     170        if (dpdk_config_input(FORMAT_DATA->dpdkrecv, TRACE_OPTION_HASHER,
     171                                &hash) == -1) {
     172                libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv);
     173                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem);
     174                return -1;
     175        }
     176
     177        if (dpdk_config_input(FORMAT_DATA->dpdkrecv, TRACE_OPTION_SNAPLEN,
     178                                &snaplen) == -1) {
     179                libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv);
     180                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem);
     181                return -1;
     182        }
    164183
    165184        if (dpdk_start_input(FORMAT_DATA->dpdkrecv) == -1) {
     
    176195static int dpdkndag_pstart_input(libtrace_t *libtrace) {
    177196
     197        enum hasher_types hash = HASHER_UNIDIRECTIONAL;
     198        int snaplen = 9000;
    178199        FORMAT_DATA->dpdkrecv->perpkt_thread_count = libtrace->perpkt_thread_count;
     200        if (dpdk_config_input(FORMAT_DATA->dpdkrecv, TRACE_OPTION_HASHER,
     201                                &hash) == -1) {
     202                libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv);
     203                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem);
     204                return -1;
     205        }
     206
     207        if (dpdk_config_input(FORMAT_DATA->dpdkrecv, TRACE_OPTION_SNAPLEN,
     208                                &snaplen) == -1) {
     209                libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv);
     210                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem);
     211                return -1;
     212        }
    179213        if (dpdk_pstart_input(FORMAT_DATA->dpdkrecv) == -1) {
    180214                libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv);
     
    199233        }
    200234
    201         for (i = 0; i < 20; i++) {
     235        for (i = 0; i < 40; i++) {
    202236                if (pt->burstspace[i]) {
    203237                        rte_pktmbuf_free(pt->burstspace[i]);
    204238                }
    205239        }
     240        pthread_mutex_destroy(&(pt->ndag_lock));
    206241}
    207242
     
    291326static void dpdkndag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat) {
    292327        int i;
     328
     329        libtrace_stat_t *dpdkstat;
    293330
    294331        stat->dropped_valid = 1;
     
    299336        stat->missing = 0;
    300337
     338        dpdkstat = trace_create_statistics();
     339        dpdk_get_stats(FORMAT_DATA->dpdkrecv, dpdkstat);
     340
     341        if (dpdkstat->dropped_valid) {
     342                stat->errors_valid = 1;
     343                stat->errors = dpdkstat->dropped;
     344        }
     345
    301346        /* TODO Is this thread safe? */
    302347        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
     348                pthread_mutex_lock(&(FORMAT_DATA->threaddatas[i].ndag_lock));
    303349                stat->dropped += FORMAT_DATA->threaddatas[i].dropped_upstream;
    304350                stat->received += FORMAT_DATA->threaddatas[i].received_packets;
    305351                stat->missing += FORMAT_DATA->threaddatas[i].missing_records;
    306         }
     352                pthread_mutex_unlock(&(FORMAT_DATA->threaddatas[i].ndag_lock));
     353        }
     354        free(dpdkstat);
    307355}
    308356
     
    434482                }
    435483        }
    436 
    437484        if (cap->expectedseq != 0) {
     485                pthread_mutex_lock(&pt->ndag_lock);
    438486                pt->missing_records += seq_cmp(
    439487                                ntohl(encaphdr->seqno), cap->expectedseq);
     488                pthread_mutex_unlock(&pt->ndag_lock);
    440489        }
    441490        cap->expectedseq = ntohl(encaphdr->seqno) + 1;
     
    490539                /* TODO */
    491540        } else {
     541                pthread_mutex_lock(&(pt->ndag_lock));
    492542                if (pt->received_packets > 0) {
    493543                        pt->dropped_upstream += ntohs(erfptr->lctr);
    494544                }
    495         }
    496 
     545                pthread_mutex_unlock(&(pt->ndag_lock));
     546        }
     547
     548        pthread_mutex_lock(&(pt->ndag_lock));
    497549        pt->received_packets ++;
     550        pthread_mutex_unlock(&(pt->ndag_lock));
    498551        encaphdr = (ndag_encap_t *)(pt->ndagheader + sizeof(ndag_common_t));
    499552
     
    542595                                        &t->messages,
    543596                                        pt->burstspace,
    544                                         20);
     597                                        40);
    545598                        if (ret <= 0) {
    546599                                return ret;
    547600                        }
     601
    548602                        pt->dpdkpkt->buffer = pt->burstspace[0];
    549603                        pt->dpdkpkt->trace = FORMAT_DATA->dpdkrecv;
Note: See TracChangeset for help on using the changeset viewer.