Changeset e54bd5f
- Timestamp:
- 12/14/17 11:06:48 (3 years ago)
- Branches:
- cachetimestamps, develop, dpdk-ndag, etsilive, master, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance
- Children:
- 4f0f93f
- Parents:
- a6f2d1d
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
lib/format_dpdkndag.c
rb3199cf re54bd5f 42 42 uint32_t ndagsize; 43 43 44 pthread_mutex_t ndag_lock; 44 45 dpdk_per_stream_t *dpdkstreamdata; 45 46 int burstsize; 46 47 int burstoffset; 47 struct rte_mbuf* burstspace[ 20];48 struct rte_mbuf* burstspace[40]; 48 49 49 50 } perthread_t; … … 156 157 FORMAT_DATA->threaddatas[i].burstoffset = 0; 157 158 memset(FORMAT_DATA->threaddatas[i].burstspace, 0, 158 sizeof(struct rte_mbuf *) * 20); 159 sizeof(struct rte_mbuf *) * 40); 160 pthread_mutex_init(&(FORMAT_DATA->threaddatas[i].ndag_lock), 161 NULL); 159 162 } 160 163 return maxthreads; … … 162 165 163 166 static int dpdkndag_start_input(libtrace_t *libtrace) { 167 enum hasher_types hash = HASHER_UNIDIRECTIONAL; 168 int snaplen = 9000; 169 170 if (dpdk_config_input(FORMAT_DATA->dpdkrecv, TRACE_OPTION_HASHER, 171 &hash) == -1) { 172 libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv); 173 trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem); 174 return -1; 175 } 176 177 if (dpdk_config_input(FORMAT_DATA->dpdkrecv, TRACE_OPTION_SNAPLEN, 178 &snaplen) == -1) { 179 libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv); 180 trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem); 181 return -1; 182 } 164 183 165 184 if (dpdk_start_input(FORMAT_DATA->dpdkrecv) == -1) { … … 176 195 static int dpdkndag_pstart_input(libtrace_t *libtrace) { 177 196 197 enum hasher_types hash = HASHER_UNIDIRECTIONAL; 198 int snaplen = 9000; 178 199 FORMAT_DATA->dpdkrecv->perpkt_thread_count = libtrace->perpkt_thread_count; 200 if (dpdk_config_input(FORMAT_DATA->dpdkrecv, TRACE_OPTION_HASHER, 201 &hash) == -1) { 202 libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv); 203 trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem); 204 return -1; 205 } 206 207 if (dpdk_config_input(FORMAT_DATA->dpdkrecv, TRACE_OPTION_SNAPLEN, 208 &snaplen) == -1) { 209 libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv); 210 trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem); 211 return -1; 212 } 179 213 if (dpdk_pstart_input(FORMAT_DATA->dpdkrecv) == -1) { 180 214 libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv); … … 199 233 } 200 234 201 for (i = 0; i < 20; i++) {235 for (i = 0; i < 40; i++) { 202 236 if (pt->burstspace[i]) { 203 237 rte_pktmbuf_free(pt->burstspace[i]); 204 238 } 205 239 } 240 pthread_mutex_destroy(&(pt->ndag_lock)); 206 241 } 207 242 … … 291 326 static void dpdkndag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat) { 292 327 int i; 328 329 libtrace_stat_t *dpdkstat; 293 330 294 331 stat->dropped_valid = 1; … … 299 336 stat->missing = 0; 300 337 338 dpdkstat = trace_create_statistics(); 339 dpdk_get_stats(FORMAT_DATA->dpdkrecv, dpdkstat); 340 341 if (dpdkstat->dropped_valid) { 342 stat->errors_valid = 1; 343 stat->errors = dpdkstat->dropped; 344 } 345 301 346 /* TODO Is this thread safe? */ 302 347 for (i = 0; i < libtrace->perpkt_thread_count; i++) { 348 pthread_mutex_lock(&(FORMAT_DATA->threaddatas[i].ndag_lock)); 303 349 stat->dropped += FORMAT_DATA->threaddatas[i].dropped_upstream; 304 350 stat->received += FORMAT_DATA->threaddatas[i].received_packets; 305 351 stat->missing += FORMAT_DATA->threaddatas[i].missing_records; 306 } 352 pthread_mutex_unlock(&(FORMAT_DATA->threaddatas[i].ndag_lock)); 353 } 354 free(dpdkstat); 307 355 } 308 356 … … 434 482 } 435 483 } 436 437 484 if (cap->expectedseq != 0) { 485 pthread_mutex_lock(&pt->ndag_lock); 438 486 pt->missing_records += seq_cmp( 439 487 ntohl(encaphdr->seqno), cap->expectedseq); 488 pthread_mutex_unlock(&pt->ndag_lock); 440 489 } 441 490 cap->expectedseq = ntohl(encaphdr->seqno) + 1; … … 490 539 /* TODO */ 491 540 } else { 541 pthread_mutex_lock(&(pt->ndag_lock)); 492 542 if (pt->received_packets > 0) { 493 543 pt->dropped_upstream += ntohs(erfptr->lctr); 494 544 } 495 } 496 545 pthread_mutex_unlock(&(pt->ndag_lock)); 546 } 547 548 pthread_mutex_lock(&(pt->ndag_lock)); 497 549 pt->received_packets ++; 550 pthread_mutex_unlock(&(pt->ndag_lock)); 498 551 encaphdr = (ndag_encap_t *)(pt->ndagheader + sizeof(ndag_common_t)); 499 552 … … 542 595 &t->messages, 543 596 pt->burstspace, 544 20);597 40); 545 598 if (ret <= 0) { 546 599 return ret; 547 600 } 601 548 602 pt->dpdkpkt->buffer = pt->burstspace[0]; 549 603 pt->dpdkpkt->trace = FORMAT_DATA->dpdkrecv;
Note: See TracChangeset
for help on using the changeset viewer.