Changes in / [92d5f15:4631115]
- Files:
-
- 40 added
- 1 deleted
- 25 edited
Legend:
- Unmodified
- Added
- Removed
-
README
r3e5518a r3e5518a 1 This fork of Libtrace aims to support parallel packet processing. 2 3 This is still work in progress and is full of bugs, some of the original 4 Libtrace functions might not function correctly breaking the supplied tools. 5 1 6 libtrace 3.0.22 2 7 -
configure.in
r3e5518a r3e5518a 392 392 fi 393 393 394 # If we use DPDK we might be able to use libnuma 395 AC_CHECK_LIB(numa, numa_node_to_cpus, have_numa=1, have_numa=0) 396 394 397 # Checks for various "optional" libraries 395 398 AC_CHECK_LIB(pthread, pthread_create, have_pthread=1, have_pthread=0) … … 411 414 AC_CHECK_LIB(rt, clock_gettime, have_clock_gettime=1, have_clock_gettime=0) 412 415 LIBS= 416 417 if test "$have_numa" = 1; then 418 LIBTRACE_LIBS="$LIBTRACE_LIBS -lnuma" 419 AC_DEFINE(HAVE_LIBNUMA, 1, [Set to 1 if libnuma is found supported]) 420 with_numa=yes 421 else 422 AC_DEFINE(HAVE_LIBNUMA, 0, [Set to 1 if libnuma is found supported]) 423 with_numa=no 424 fi 413 425 414 426 if test "$dlfound" = 0; then … … 688 700 if test x"$libtrace_dpdk" = xtrue; then 689 701 AC_MSG_NOTICE([Compiled with DPDK live capture support: Yes]) 702 reportopt "Compiled with DPDK trace NUMA support" $with_numa 690 703 elif test x"$want_dpdk" != "xno"; then 691 704 # We don't officially support DPDK so only report failure if the user -
lib/Makefile.am
r3fc3267 r6cf3ca0 2 2 include_HEADERS = libtrace.h dagformat.h lt_inttypes.h daglegacy.h rt_protocol.h erftypes.h 3 3 4 AM_CFLAGS=@LIBCFLAGS@ @CFLAG_VISIBILITY@ 5 AM_CXXFLAGS=@LIBCXXFLAGS@ @CFLAG_VISIBILITY@ 4 AM_CFLAGS=@LIBCFLAGS@ @CFLAG_VISIBILITY@ -pthread 5 AM_CXXFLAGS=@LIBCXXFLAGS@ @CFLAG_VISIBILITY@ -pthread 6 6 7 7 extra_DIST = format_template.c 8 NATIVEFORMATS=format_linux .c8 NATIVEFORMATS=format_linux_common.c format_linux_ring.c format_linux_int.c format_linux_common.h 9 9 BPFFORMATS=format_bpf.c 10 10 … … 29 29 NATIVEFORMATS+= format_dpdk.c 30 30 # So we also make libtrace.mk in dpdk otherwise automake tries to expand 31 # it to early which I cannot seem to stop unless we use a path that31 # it too early which I cannot seem to stop unless we use a path that 32 32 # doesn't exist currently 33 33 export RTE_SDK=@RTE_SDK@ … … 44 44 endif 45 45 46 libtrace_la_SOURCES = trace.c common.h \46 libtrace_la_SOURCES = trace.c trace_parallel.c common.h \ 47 47 format_erf.c format_pcap.c format_legacy.c \ 48 48 format_rt.c format_helper.c format_helper.h format_pcapfile.c \ … … 57 57 $(DAGSOURCE) format_erf.h \ 58 58 $(BPFJITSOURCE) \ 59 libtrace_arphrd.h 59 libtrace_arphrd.h \ 60 data-struct/ring_buffer.c data-struct/vector.c \ 61 data-struct/message_queue.c data-struct/deque.c \ 62 data-struct/sliding_window.c data-struct/object_cache.c \ 63 data-struct/linked_list.c hash_toeplitz.c combiner_ordered.c \ 64 combiner_sorted.c combiner_unordered.c 60 65 61 66 if DAG2_4 -
lib/format_atmhdr.c
r5952ff0 rb13b939 231 231 trace_event_trace, /* trace_event */ 232 232 NULL, /* help */ 233 NULL /* next pointer */ 233 NULL, /* next pointer */ 234 NON_PARALLEL(false) 234 235 }; 235 236 -
lib/format_bpf.c
r08f5060 r08f5060 614 614 trace_event_device, /* trace_event */ 615 615 bpf_help, /* help */ 616 NULL 616 NULL, /* next pointer */ 617 NON_PARALLEL(true) 617 618 }; 618 619 #else /* HAVE_DECL_BIOCSETIF */ … … 663 664 NULL, /* trace_event */ 664 665 bpf_help, /* help */ 665 NULL 666 NULL, /* next pointer */ 667 NON_PARALLEL(true) 666 668 }; 667 669 #endif /* HAVE_DECL_BIOCSETIF */ -
lib/format_dag24.c
rc70f59f rc70f59f 558 558 trace_event_dag, /* trace_event */ 559 559 dag_help, /* help */ 560 NULL /* next pointer */ 560 NULL, /* next pointer */ 561 NON_PARALLEL(true) 561 562 }; 562 563 -
lib/format_dag25.c
rc5ac872 rc5ac872 79 79 */ 80 80 81 82 81 #define DATA(x) ((struct dag_format_data_t *)x->format_data) 83 82 #define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data) 83 #define STREAM_DATA(x) ((struct dag_per_stream_t *)x->data) 84 84 85 85 #define FORMAT_DATA DATA(libtrace) … … 87 87 88 88 #define DUCK FORMAT_DATA->duck 89 90 #define FORMAT_DATA_HEAD FORMAT_DATA->per_stream->head 91 #define FORMAT_DATA_FIRST ((struct dag_per_stream_t *)FORMAT_DATA_HEAD->data) 92 89 93 static struct libtrace_format_t dag; 90 94 … … 118 122 }; 119 123 124 /* Data that is stored against each input stream */ 125 struct dag_per_stream_t { 126 /* DAG device */ 127 struct dag_dev_t *device; 128 /* DAG stream number */ 129 uint16_t dagstream; 130 /* Pointer to the last unread byte in the DAG memory */ 131 uint8_t *top; 132 /* Pointer to the first unread byte in the DAG memory */ 133 uint8_t *bottom; 134 /* Amount of data processed from the bottom pointer */ 135 uint32_t processed; 136 /* Number of packets seen by the stream */ 137 uint64_t pkt_count; 138 /* Drop count for this particular stream */ 139 uint64_t drops; 140 /* Boolean values to indicate if a particular interface has been seen 141 * or not. This is limited to four interfaces, which is enough to 142 * support all current DAG cards */ 143 uint8_t seeninterface[4]; 144 }; 145 120 146 /* "Global" data that is stored for each DAG input trace */ 121 147 struct dag_format_data_t { 122 123 148 /* Data required for regular DUCK reporting */ 149 /* TODO: This doesn't work with the 10X2S card! I don't know how 150 * DUCK stuff works and don't know how to fix it */ 124 151 struct { 125 152 /* Timestamp of the last DUCK report */ 126 153 uint32_t last_duck; 127 154 /* The number of seconds between each DUCK report */ 128 155 uint32_t duck_freq; 129 156 /* Timestamp of the last packet read from the DAG card */ 130 157 uint32_t last_pkt; 131 158 /* Dummy trace to ensure DUCK packets are dealt with using the 132 159 * DUCK format functions */ 133 134 160 libtrace_t *dummy_duck; 161 } duck; 135 162 136 163 /* String containing the DAG device name */ 137 164 char *device_name; 138 /* The DAG device that we are reading from */ 139 struct dag_dev_t *device; 140 /* The DAG stream that we are reading from */ 141 unsigned int dagstream; 142 /* Boolean flag indicating whether the stream is currently attached */ 165 /* Boolean flag indicating whether the trace is currently attached */ 143 166 int stream_attached; 144 /* Pointer to the first unread byte in the DAG memory hole */ 145 uint8_t *bottom; 146 /* Pointer to the last unread byte in the DAG memory hole */ 147 uint8_t *top; 148 /* The amount of data processed thus far from the bottom pointer */ 149 uint32_t processed; 150 /* The number of packets that have been dropped */ 151 uint64_t drops; 152 153 uint8_t seeninterface[4]; 167 168 /* Data stored against each DAG input stream */ 169 libtrace_list_t *per_stream; 154 170 }; 155 171 … … 207 223 208 224 /* Initialises the DAG output data structure */ 209 static void dag_init_format_out_data(libtrace_out_t *libtrace) { 210 libtrace->format_data = (struct dag_format_data_out_t *) malloc(sizeof(struct dag_format_data_out_t)); 225 static void dag_init_format_out_data(libtrace_out_t *libtrace) 226 { 227 libtrace->format_data = (struct dag_format_data_out_t *) 228 malloc(sizeof(struct dag_format_data_out_t)); 211 229 // no DUCK on output 212 230 FORMAT_DATA_OUT->stream_attached = 0; … … 219 237 220 238 /* Initialises the DAG input data structure */ 221 static void dag_init_format_data(libtrace_t *libtrace) { 239 static void dag_init_format_data(libtrace_t *libtrace) 240 { 241 struct dag_per_stream_t stream_data; 242 222 243 libtrace->format_data = (struct dag_format_data_t *) 223 244 malloc(sizeof(struct dag_format_data_t)); 224 245 DUCK.last_duck = 0; 225 DUCK.duck_freq = 0; 226 DUCK.last_pkt = 0; 227 DUCK.dummy_duck = NULL; 228 FORMAT_DATA->stream_attached = 0; 229 FORMAT_DATA->drops = 0; 246 DUCK.duck_freq = 0; 247 DUCK.last_pkt = 0; 248 DUCK.dummy_duck = NULL; 249 250 FORMAT_DATA->per_stream = 251 libtrace_list_init(sizeof(stream_data)); 252 assert(FORMAT_DATA->per_stream != NULL); 253 254 /* We'll start with just one instance of stream_data, and we'll 255 * add more later if we need them */ 256 memset(&stream_data, 0, sizeof(stream_data)); 257 libtrace_list_push_back(FORMAT_DATA->per_stream, &stream_data); 230 258 FORMAT_DATA->device_name = NULL; 231 FORMAT_DATA->device = NULL;232 FORMAT_DATA->dagstream = 0;233 FORMAT_DATA->processed = 0;234 FORMAT_DATA->bottom = NULL;235 FORMAT_DATA->top = NULL;236 memset(FORMAT_DATA->seeninterface, 0, sizeof(FORMAT_DATA->seeninterface));237 259 } 238 260 … … 241 263 * 242 264 * NOTE: This function assumes the open_dag_mutex is held by the caller */ 243 static struct dag_dev_t *dag_find_open_device(char *dev_name) { 265 static struct dag_dev_t *dag_find_open_device(char *dev_name) 266 { 244 267 struct dag_dev_t *dag_dev; 245 268 … … 252 275 dag_dev->ref_count ++; 253 276 return dag_dev; 254 255 277 } 256 278 dag_dev = dag_dev->next; 257 279 } 258 280 return NULL; 259 260 261 281 } 262 282 … … 267 287 * 268 288 * NOTE: This function assumes the open_dag_mutex is held by the caller */ 269 static void dag_close_device(struct dag_dev_t *dev) { 289 static void dag_close_device(struct dag_dev_t *dev) 290 { 270 291 /* Need to remove from the device list */ 271 272 292 assert(dev->ref_count == 0); 273 293 … … 292 312 * 293 313 * NOTE: this function should only be called when opening a DAG device for 294 * writing - there is little practical difference between this and the 314 * writing - there is little practical difference between this and the 295 315 * function below that covers the reading case, but we need the output trace 296 * object to report errors properly so the two functions take slightly 316 * object to report errors properly so the two functions take slightly 297 317 * different arguments. This is really lame and there should be a much better 298 318 * way of doing this. 299 319 * 300 * NOTE: This function assumes the open_dag_mutex is held by the caller 320 * NOTE: This function assumes the open_dag_mutex is held by the caller 301 321 */ 302 static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace, char *dev_name) { 322 static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace, 323 char *dev_name) 324 { 303 325 struct stat buf; 304 326 int fd; … … 309 331 trace_set_err_out(libtrace,errno,"stat(%s)",dev_name); 310 332 return NULL; 311 }333 } 312 334 313 335 /* Make sure it is the appropriate type of device */ … … 346 368 * 347 369 * NOTE: this function should only be called when opening a DAG device for 348 * reading - there is little practical difference between this and the 370 * reading - there is little practical difference between this and the 349 371 * function above that covers the writing case, but we need the input trace 350 * object to report errors properly so the two functions take slightly 372 * object to report errors properly so the two functions take slightly 351 373 * different arguments. This is really lame and there should be a much better 352 374 * way of doing this. … … 359 381 360 382 /* Make sure the device exists */ 361 362 363 364 383 if (stat(dev_name, &buf) == -1) { 384 trace_set_err(libtrace,errno,"stat(%s)",dev_name); 385 return NULL; 386 } 365 387 366 388 /* Make sure it is the appropriate type of device */ … … 368 390 /* Try opening the DAG device */ 369 391 if((fd = dag_open(dev_name)) < 0) { 370 371 372 373 392 trace_set_err(libtrace,errno,"Cannot open DAG %s", 393 dev_name); 394 return NULL; 395 } 374 396 } else { 375 397 trace_set_err(libtrace,errno,"Not a valid dag device: %s", 376 377 378 398 dev_name); 399 return NULL; 400 } 379 401 380 402 /* Add the device to our device list - it is just a doubly linked … … 397 419 398 420 /* Creates and initialises a DAG output trace */ 399 static int dag_init_output(libtrace_out_t *libtrace) { 421 static int dag_init_output(libtrace_out_t *libtrace) 422 { 400 423 char *scan = NULL; 401 424 struct dag_dev_t *dag_device = NULL; 402 425 int stream = 1; 403 426 404 427 /* XXX I don't know if this is important or not, but this function 405 428 * isn't present in all of the driver releases that this code is … … 411 434 412 435 dag_init_format_out_data(libtrace); 413 /* Grab the mutex while we're likely to be messing with the device 436 /* Grab the mutex while we're likely to be messing with the device 414 437 * list */ 415 438 pthread_mutex_lock(&open_dag_mutex); 416 439 417 440 /* Specific streams are signified using a comma in the libtrace URI, 418 441 * e.g. dag:/dev/dag0,1 refers to stream 1 on the dag0 device. … … 460 483 461 484 dag_init_format_data(libtrace); 462 /* Grab the mutex while we're likely to be messing with the device 485 /* Grab the mutex while we're likely to be messing with the device 463 486 * list */ 464 487 pthread_mutex_lock(&open_dag_mutex); 465 466 467 /* Specific streams are signified using a comma in the libtrace URI, 488 489 490 /* DAG cards support multiple streams. In a single threaded capture, 491 * these are specified using a comma in the libtrace URI, 468 492 * e.g. dag:/dev/dag0,2 refers to stream 2 on the dag0 device. 469 493 * 470 * If no stream is specified, we will read from stream 0 */ 494 * If no stream is specified, we will read from stream 0 with 495 * one thread 496 */ 471 497 if ((scan = strchr(libtrace->uridata,',')) == NULL) { 472 498 FORMAT_DATA->device_name = strdup(libtrace->uridata); … … 477 503 } 478 504 479 FORMAT_DATA ->dagstream = stream;505 FORMAT_DATA_FIRST->dagstream = stream; 480 506 481 507 /* See if our DAG device is already open */ … … 496 522 } 497 523 498 FORMAT_DATA->device = dag_device; 499 500 /* See Config_Status_API_Programming_Guide.pdf from the Endace Dag Documentation */ 501 /* Check kBooleanAttributeActive is true -- no point capturing on an interface that's disabled 502 503 * The symptom of the port being disabled is that libtrace will appear to hang. 504 */ 524 FORMAT_DATA_FIRST->device = dag_device; 525 526 /* See Config_Status_API_Programming_Guide.pdf from the Endace 527 Dag Documentation */ 528 /* Check kBooleanAttributeActive is true -- no point capturing 529 * on an interface that's disabled 530 * 531 * The symptom of the port being disabled is that libtrace 532 * will appear to hang. */ 505 533 /* Check kBooleanAttributeFault is false */ 506 534 /* Check kBooleanAttributeLocalFault is false */ … … 508 536 /* Check kBooleanAttributePeerLink ? */ 509 537 510 /* Set kBooleanAttributePromisc/kBooleanPromiscuousMode based on libtrace promisc attribute?*/ 538 /* Set kBooleanAttributePromisc/kBooleanPromiscuousMode based 539 on libtrace promisc attribute?*/ 511 540 /* Set kUint32AttributeSnapLength to the snaplength */ 512 541 513 542 pthread_mutex_unlock(&open_dag_mutex); 514 543 return 0; 515 544 } 516 545 517 546 /* Configures a DAG input trace */ 518 547 static int dag_config_input(libtrace_t *libtrace, trace_option_t option, 519 void *data) { 520 char conf_str[4096]; 548 void *data) 549 { 550 char conf_str[4096]; 521 551 switch(option) { 522 case TRACE_OPTION_META_FREQ: 523 /* This option is used to specify the frequency of DUCK 524 * updates */ 525 DUCK.duck_freq = *(int *)data; 526 return 0; 527 case TRACE_OPTION_SNAPLEN: 528 /* Tell the card our new snap length */ 529 snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data); 530 if (dag_configure(FORMAT_DATA->device->fd, 531 conf_str) != 0) { 532 trace_set_err(libtrace, errno, "Failed to configure snaplen on DAG card: %s", libtrace->uridata); 533 return -1; 534 } 535 return 0; 536 case TRACE_OPTION_PROMISC: 537 /* DAG already operates in a promisc fashion */ 538 return -1; 539 case TRACE_OPTION_FILTER: 540 /* We don't yet support pushing filters into DAG 541 * cards */ 542 return -1; 543 case TRACE_OPTION_EVENT_REALTIME: 544 /* Live capture is always going to be realtime */ 552 case TRACE_OPTION_META_FREQ: 553 /* This option is used to specify the frequency of DUCK 554 * updates */ 555 DUCK.duck_freq = *(int *)data; 556 return 0; 557 case TRACE_OPTION_SNAPLEN: 558 /* Tell the card our new snap length */ 559 snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data); 560 if (dag_configure(FORMAT_DATA_FIRST->device->fd, 561 conf_str) != 0) { 562 trace_set_err(libtrace, errno, "Failed to configure " 563 "snaplen on DAG card: %s", 564 libtrace->uridata); 545 565 return -1; 546 } 566 } 567 return 0; 568 case TRACE_OPTION_PROMISC: 569 /* DAG already operates in a promisc fashion */ 570 return -1; 571 case TRACE_OPTION_FILTER: 572 /* We don't yet support pushing filters into DAG 573 * cards */ 574 return -1; 575 case TRACE_OPTION_EVENT_REALTIME: 576 /* Live capture is always going to be realtime */ 577 return -1; 578 } 547 579 return -1; 548 580 } 549 581 550 582 /* Starts a DAG output trace */ 551 static int dag_start_output(libtrace_out_t *libtrace) { 583 static int dag_start_output(libtrace_out_t *libtrace) 584 { 552 585 struct timeval zero, nopoll; 553 586 … … 557 590 558 591 /* Attach and start the DAG stream */ 559 560 592 if (dag_attach_stream(FORMAT_DATA_OUT->device->fd, 561 593 FORMAT_DATA_OUT->dagstream, 0, 4 * 1024 * 1024) < 0) { … … 572 604 573 605 /* We don't want the dag card to do any sleeping */ 574 575 606 dag_set_stream_poll(FORMAT_DATA_OUT->device->fd, 576 607 FORMAT_DATA_OUT->dagstream, 0, &zero, … … 581 612 582 613 /* Starts a DAG input trace */ 583 static int dag_start_input(libtrace_t *libtrace) { 584 struct timeval zero, nopoll; 585 uint8_t *top, *bottom, *starttop; 614 static int dag_start_input(libtrace_t *libtrace) 615 { 616 struct timeval zero, nopoll; 617 uint8_t *top, *bottom, *starttop; 586 618 top = bottom = NULL; 587 619 588 620 zero.tv_sec = 0; 589 590 621 zero.tv_usec = 10000; 622 nopoll = zero; 591 623 592 624 /* Attach and start the DAG stream */ 593 if (dag_attach_stream(FORMAT_DATA ->device->fd,594 FORMAT_DATA->dagstream, 0, 0) < 0) {595 596 597 598 599 if (dag_start_stream(FORMAT_DATA ->device->fd,600 FORMAT_DATA->dagstream) < 0) {601 602 603 625 if (dag_attach_stream(FORMAT_DATA_FIRST->device->fd, 626 FORMAT_DATA_FIRST->dagstream, 0, 0) < 0) { 627 trace_set_err(libtrace, errno, "Cannot attach DAG stream"); 628 return -1; 629 } 630 631 if (dag_start_stream(FORMAT_DATA_FIRST->device->fd, 632 FORMAT_DATA_FIRST->dagstream) < 0) { 633 trace_set_err(libtrace, errno, "Cannot start DAG stream"); 634 return -1; 635 } 604 636 FORMAT_DATA->stream_attached = 1; 605 637 606 638 /* We don't want the dag card to do any sleeping */ 607 dag_set_stream_poll(FORMAT_DATA->device->fd,608 FORMAT_DATA->dagstream, 0, &zero,609 610 611 starttop = dag_advance_stream(FORMAT_DATA ->device->fd,612 FORMAT_DATA->dagstream,613 639 dag_set_stream_poll(FORMAT_DATA_FIRST->device->fd, 640 FORMAT_DATA_FIRST->dagstream, 0, &zero, 641 &nopoll); 642 643 starttop = dag_advance_stream(FORMAT_DATA_FIRST->device->fd, 644 FORMAT_DATA_FIRST->dagstream, 645 &bottom); 614 646 615 647 /* Should probably flush the memory hole now */ … … 617 649 while (starttop - bottom > 0) { 618 650 bottom += (starttop - bottom); 619 top = dag_advance_stream(FORMAT_DATA ->device->fd,620 FORMAT_DATA->dagstream,621 &bottom);622 } 623 FORMAT_DATA ->top = top;624 FORMAT_DATA ->bottom = bottom;625 FORMAT_DATA ->processed = 0;626 FORMAT_DATA ->drops = 0;651 top = dag_advance_stream(FORMAT_DATA_FIRST->device->fd, 652 FORMAT_DATA_FIRST->dagstream, 653 &bottom); 654 } 655 FORMAT_DATA_FIRST->top = top; 656 FORMAT_DATA_FIRST->bottom = bottom; 657 FORMAT_DATA_FIRST->processed = 0; 658 FORMAT_DATA_FIRST->drops = 0; 627 659 628 660 return 0; 629 661 } 630 662 663 static int dag_pstart_input(libtrace_t *libtrace) 664 { 665 char *scan, *tok; 666 uint16_t stream_count = 0, max_streams; 667 int iserror = 0; 668 struct dag_per_stream_t stream_data; 669 670 /* Check we aren't trying to create more threads than the DAG card can 671 * handle */ 672 max_streams = dag_rx_get_stream_count(FORMAT_DATA_FIRST->device->fd); 673 if (libtrace->perpkt_thread_count > max_streams) { 674 trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT, 675 "trying to create too many threads (max is %u)", 676 max_streams); 677 iserror = 1; 678 goto cleanup; 679 } 680 681 /* Get the stream names from the uri */ 682 if ((scan = strchr(libtrace->uridata, ',')) == NULL) { 683 trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT, 684 "format uri doesn't specify the DAG streams"); 685 iserror = 1; 686 goto cleanup; 687 } 688 689 scan++; 690 691 tok = strtok(scan, ","); 692 while (tok != NULL) { 693 /* Ensure we haven't specified too many streams */ 694 if (stream_count >= libtrace->perpkt_thread_count) { 695 trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT, 696 "format uri specifies too many streams. " 697 "Max is %u", max_streams); 698 iserror = 1; 699 goto cleanup; 700 } 701 702 /* Save the stream details */ 703 if (stream_count == 0) { 704 /* Special case where we update the existing stream 705 * data structure */ 706 FORMAT_DATA_FIRST->dagstream = (uint16_t)atoi(tok); 707 } else { 708 memset(&stream_data, 0, sizeof(stream_data)); 709 stream_data.device = FORMAT_DATA_FIRST->device; 710 stream_data.dagstream = (uint16_t)atoi(tok); 711 libtrace_list_push_back(FORMAT_DATA->per_stream, 712 &stream_data); 713 } 714 715 stream_count++; 716 tok = strtok(NULL, ","); 717 } 718 719 FORMAT_DATA->stream_attached = 1; 720 721 cleanup: 722 if (iserror) { 723 return -1; 724 } else { 725 return 0; 726 } 727 } 728 631 729 /* Pauses a DAG output trace */ 632 static int dag_pause_output(libtrace_out_t *libtrace) {633 730 static int dag_pause_output(libtrace_out_t *libtrace) 731 { 634 732 /* Stop and detach the stream */ 635 733 if (dag_stop_stream(FORMAT_DATA_OUT->device->fd, 636 FORMAT_DATA_OUT->dagstream) < 0) {734 FORMAT_DATA_OUT->dagstream) < 0) { 637 735 trace_set_err_out(libtrace, errno, "Could not stop DAG stream"); 638 736 return -1; 639 737 } 640 738 if (dag_detach_stream(FORMAT_DATA_OUT->device->fd, 641 FORMAT_DATA_OUT->dagstream) < 0) { 642 trace_set_err_out(libtrace, errno, "Could not detach DAG stream"); 739 FORMAT_DATA_OUT->dagstream) < 0) { 740 trace_set_err_out(libtrace, errno, 741 "Could not detach DAG stream"); 643 742 return -1; 644 743 } … … 648 747 649 748 /* Pauses a DAG input trace */ 650 static int dag_pause_input(libtrace_t *libtrace) { 651 652 /* Stop and detach the stream */ 653 if (dag_stop_stream(FORMAT_DATA->device->fd, 654 FORMAT_DATA->dagstream) < 0) { 655 trace_set_err(libtrace, errno, "Could not stop DAG stream"); 656 return -1; 657 } 658 if (dag_detach_stream(FORMAT_DATA->device->fd, 659 FORMAT_DATA->dagstream) < 0) { 660 trace_set_err(libtrace, errno, "Could not detach DAG stream"); 661 return -1; 662 } 749 static int dag_pause_input(libtrace_t *libtrace) 750 { 751 libtrace_list_node_t *tmp = FORMAT_DATA_HEAD; 752 753 /* Stop and detach each stream */ 754 while (tmp != NULL) { 755 if (dag_stop_stream(STREAM_DATA(tmp)->device->fd, 756 STREAM_DATA(tmp)->dagstream) < 0) { 757 trace_set_err(libtrace, errno, 758 "Could not stop DAG stream"); 759 printf("Count not stop DAG stream\n"); 760 return -1; 761 } 762 if (dag_detach_stream(STREAM_DATA(tmp)->device->fd, 763 STREAM_DATA(tmp)->dagstream) < 0) { 764 trace_set_err(libtrace, errno, 765 "Could not detach DAG stream"); 766 printf("Count not detach DAG stream\n"); 767 return -1; 768 } 769 770 tmp = tmp->next; 771 } 772 663 773 FORMAT_DATA->stream_attached = 0; 664 774 return 0; 665 775 } 666 776 777 778 667 779 /* Closes a DAG input trace */ 668 static int dag_fin_input(libtrace_t *libtrace) { 780 static int dag_fin_input(libtrace_t *libtrace) 781 { 782 libtrace_list_node_t *tmp = FORMAT_DATA_HEAD; 783 669 784 /* Need the lock, since we're going to be handling the device list */ 670 785 pthread_mutex_lock(&open_dag_mutex); 671 786 672 787 /* Detach the stream if we are not paused */ 673 788 if (FORMAT_DATA->stream_attached) 674 789 dag_pause_input(libtrace); 675 FORMAT_DATA->device->ref_count --; 676 677 /* Close the DAG device if there are no more references to it */ 678 if (FORMAT_DATA->device->ref_count == 0) 679 dag_close_device(FORMAT_DATA->device); 790 791 /* Close any dag devices that have no more references */ 792 while (tmp != NULL) { 793 STREAM_DATA(tmp)->device->ref_count--; 794 if (STREAM_DATA(tmp)->device->ref_count == 0) 795 dag_close_device(STREAM_DATA(tmp)->device); 796 797 tmp = tmp->next; 798 } 799 680 800 if (DUCK.dummy_duck) 681 801 trace_destroy_dead(DUCK.dummy_duck); 802 803 /* Clear the list */ 804 libtrace_list_deinit(FORMAT_DATA->per_stream); 805 682 806 if (FORMAT_DATA->device_name) 683 807 free(FORMAT_DATA->device_name); 684 808 free(libtrace->format_data); 685 809 pthread_mutex_unlock(&open_dag_mutex); 686 810 return 0; /* success */ 687 811 } 688 812 689 813 /* Closes a DAG output trace */ 690 static int dag_fin_output(libtrace_out_t *libtrace) { 691 814 static int dag_fin_output(libtrace_out_t *libtrace) 815 { 816 692 817 /* Commit any outstanding traffic in the txbuffer */ 693 818 if (FORMAT_DATA_OUT->waiting) { 694 dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream, 695 FORMAT_DATA_OUT->waiting ); 696 } 697 698 /* Wait until the buffer is nearly clear before exiting the program, 819 dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, 820 FORMAT_DATA_OUT->dagstream, 821 FORMAT_DATA_OUT->waiting ); 822 } 823 824 /* Wait until the buffer is nearly clear before exiting the program, 699 825 * as we will lose packets otherwise */ 700 dag_tx_get_stream_space (FORMAT_DATA_OUT->device->fd,701 FORMAT_DATA_OUT->dagstream,702 dag_get_stream_buffer_size(FORMAT_DATA_OUT->device->fd,703 FORMAT_DATA_OUT->dagstream) - 8704 );826 dag_tx_get_stream_space 827 (FORMAT_DATA_OUT->device->fd, 828 FORMAT_DATA_OUT->dagstream, 829 dag_get_stream_buffer_size(FORMAT_DATA_OUT->device->fd, 830 FORMAT_DATA_OUT->dagstream) - 8); 705 831 706 832 /* Need the lock, since we're going to be handling the device list */ … … 752 878 753 879 /* Allocate memory for the DUCK data */ 754 755 756 757 758 759 760 761 762 763 880 if (packet->buf_control == TRACE_CTRL_EXTERNAL || 881 !packet->buffer) { 882 packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE); 883 packet->buf_control = TRACE_CTRL_PACKET; 884 if (!packet->buffer) { 885 trace_set_err(libtrace, errno, 886 "Cannot allocate packet buffer"); 887 return -1; 888 } 889 } 764 890 765 891 /* DUCK doesn't have a format header */ … … 789 915 790 916 /* Determines the amount of data available to read from the DAG card */ 791 static int dag_available(libtrace_t *libtrace) { 792 uint32_t diff = FORMAT_DATA->top - FORMAT_DATA->bottom; 917 static int dag_available(libtrace_t *libtrace, 918 struct dag_per_stream_t *stream_data) 919 { 920 uint32_t diff = stream_data->top - stream_data->bottom; 793 921 794 922 /* If we've processed more than 4MB of data since we last called 795 923 * dag_advance_stream, then we should call it again to allow the 796 924 * space occupied by that 4MB to be released */ 797 if (diff >= dag_record_size && FORMAT_DATA->processed < 4 * 1024 * 1024)925 if (diff >= dag_record_size && stream_data->processed < 4 * 1024 * 1024) 798 926 return diff; 799 927 800 928 /* Update the top and bottom pointers */ 801 FORMAT_DATA->top = dag_advance_stream(FORMAT_DATA->device->fd,802 FORMAT_DATA->dagstream,803 &(FORMAT_DATA->bottom));804 805 if ( FORMAT_DATA->top == NULL) {929 stream_data->top = dag_advance_stream(stream_data->device->fd, 930 stream_data->dagstream, 931 &(stream_data->bottom)); 932 933 if (stream_data->top == NULL) { 806 934 trace_set_err(libtrace, errno, "dag_advance_stream failed!"); 807 935 return -1; 808 936 } 809 FORMAT_DATA->processed = 0;810 diff = FORMAT_DATA->top - FORMAT_DATA->bottom;937 stream_data->processed = 0; 938 diff = stream_data->top - stream_data->bottom; 811 939 return diff; 812 940 } 813 941 814 942 /* Returns a pointer to the start of the next complete ERF record */ 815 static dag_record_t *dag_get_record(libtrace_t *libtrace) { 816 dag_record_t *erfptr = NULL; 817 uint16_t size; 818 erfptr = (dag_record_t *)FORMAT_DATA->bottom; 943 static dag_record_t *dag_get_record(struct dag_per_stream_t *stream_data) 944 { 945 dag_record_t *erfptr = NULL; 946 uint16_t size; 947 948 erfptr = (dag_record_t *)stream_data->bottom; 819 949 if (!erfptr) 820 return NULL; 821 size = ntohs(erfptr->rlen); 822 assert( size >= dag_record_size ); 950 return NULL; 951 952 size = ntohs(erfptr->rlen); 953 assert( size >= dag_record_size ); 954 823 955 /* Make certain we have the full packet available */ 824 if (size > ( FORMAT_DATA->top - FORMAT_DATA->bottom))956 if (size > (stream_data->top - stream_data->bottom)) 825 957 return NULL; 826 FORMAT_DATA->bottom += size; 827 FORMAT_DATA->processed += size; 958 959 stream_data->bottom += size; 960 stream_data->processed += size; 828 961 return erfptr; 829 962 } … … 831 964 /* Converts a buffer containing a recently read DAG packet record into a 832 965 * libtrace packet */ 833 static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet, 834 void *buffer, libtrace_rt_types_t rt_type, uint32_t flags) { 835 966 static int dag_prepare_packet_real(libtrace_t *libtrace, 967 struct dag_per_stream_t *stream_data, 968 libtrace_packet_t *packet, 969 void *buffer, libtrace_rt_types_t rt_type, 970 uint32_t flags) 971 { 836 972 dag_record_t *erfptr; 837 973 838 974 /* If the packet previously owned a buffer that is not the buffer 839 840 975 * that contains the new packet data, we're going to need to free the 976 * old one to avoid memory leaks */ 841 977 if (packet->buffer != buffer && 842 978 packet->buf_control == TRACE_CTRL_PACKET) { 843 979 free(packet->buffer); 844 980 } … … 853 989 erfptr = (dag_record_t *)buffer; 854 990 packet->buffer = erfptr; 855 856 991 packet->header = erfptr; 992 packet->type = rt_type; 857 993 858 994 if (erfptr->flags.rxerror == 1) { 859 860 861 862 863 864 865 866 995 /* rxerror means the payload is corrupt - drop the payload 996 * by tweaking rlen */ 997 packet->payload = NULL; 998 erfptr->rlen = htons(erf_get_framing_length(packet)); 999 } else { 1000 packet->payload = (char*)packet->buffer 1001 + erf_get_framing_length(packet); 1002 } 867 1003 868 1004 if (libtrace->format_data == NULL) { … … 871 1007 872 1008 /* Update the dropped packets counter */ 873 874 /* No loss counter for DSM coloured records - have to use 875 * some other API */ 1009 /* No loss counter for DSM coloured records - have to use some 1010 * other API */ 876 1011 if (erfptr->type == TYPE_DSM_COLOR_ETH) { 877 1012 /* TODO */ 878 1013 } else { 879 1014 /* Use the ERF loss counter */ 880 if (FORMAT_DATA->seeninterface[erfptr->flags.iface] == 0) { 881 FORMAT_DATA->seeninterface[erfptr->flags.iface] = 1; 1015 if (stream_data->seeninterface[erfptr->flags.iface] 1016 == 0) { 1017 stream_data->seeninterface[erfptr->flags.iface] 1018 = 1; 882 1019 } else { 883 FORMAT_DATA->drops += ntohs(erfptr->lctr); 884 } 885 } 1020 stream_data->drops += ntohs(erfptr->lctr); 1021 } 1022 } 1023 1024 packet->error = 1; 886 1025 887 1026 return 0; 1027 } 1028 1029 static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet, 1030 void *buffer, libtrace_rt_types_t rt_type, 1031 uint32_t flags) 1032 { 1033 return dag_prepare_packet_real(libtrace, FORMAT_DATA_FIRST, packet, 1034 buffer, rt_type, flags); 888 1035 } 889 1036 … … 898 1045 /* Pushes an ERF record onto the transmit stream */ 899 1046 static int dag_dump_packet(libtrace_out_t *libtrace, 900 dag_record_t *erfptr, unsigned int pad, void *buffer) { 1047 dag_record_t *erfptr, unsigned int pad, 1048 void *buffer) 1049 { 901 1050 int size; 902 1051 903 1052 /* 904 * If we've got 0 bytes waiting in the txqueue, assume that we haven't905 * requested any space yet, and request some, storing the pointer at906 * FORMAT_DATA_OUT->txbuffer.1053 * If we've got 0 bytes waiting in the txqueue, assume that we 1054 * haven't requested any space yet, and request some, storing 1055 * the pointer at FORMAT_DATA_OUT->txbuffer. 907 1056 * 908 1057 * The amount to request is slightly magical at the moment - it's … … 911 1060 */ 912 1061 if (FORMAT_DATA_OUT->waiting == 0) { 913 FORMAT_DATA_OUT->txbuffer = dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd, 914 FORMAT_DATA_OUT->dagstream, 16908288); 1062 FORMAT_DATA_OUT->txbuffer = 1063 dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd, 1064 FORMAT_DATA_OUT->dagstream, 1065 16908288); 915 1066 } 916 1067 … … 919 1070 * are in contiguous memory 920 1071 */ 921 memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting,erfptr,(dag_record_size + pad)); 1072 memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, erfptr, 1073 (dag_record_size + pad)); 922 1074 FORMAT_DATA_OUT->waiting += (dag_record_size + pad); 923 924 925 1075 926 1076 /* … … 929 1079 */ 930 1080 size = ntohs(erfptr->rlen)-(dag_record_size + pad); 931 memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting,buffer,size); 1081 memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, buffer, 1082 size); 932 1083 FORMAT_DATA_OUT->waiting += size; 933 1084 … … 938 1089 * case there is still data in the buffer at program exit. 939 1090 */ 940 941 1091 if (FORMAT_DATA_OUT->waiting >= 16*1024*1024) { 942 FORMAT_DATA_OUT->txbuffer = dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream, 943 FORMAT_DATA_OUT->waiting ); 1092 FORMAT_DATA_OUT->txbuffer = 1093 dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, 1094 FORMAT_DATA_OUT->dagstream, 1095 FORMAT_DATA_OUT->waiting); 944 1096 FORMAT_DATA_OUT->waiting = 0; 945 1097 } 946 1098 947 1099 return size + pad + dag_record_size; 948 949 1100 } 950 1101 … … 952 1103 * if one is found, false otherwise */ 953 1104 static bool find_compatible_linktype(libtrace_out_t *libtrace, 954 libtrace_packet_t *packet, char *type)955 { 956 //Keep trying to simplify the packet until we can find957 //something we can do with it1105 libtrace_packet_t *packet, char *type) 1106 { 1107 /* Keep trying to simplify the packet until we can find 1108 * something we can do with it */ 958 1109 959 1110 do { 960 *type =libtrace_to_erf_type(trace_get_link_type(packet));961 962 / / Success1111 *type = libtrace_to_erf_type(trace_get_link_type(packet)); 1112 1113 /* Success */ 963 1114 if (*type != (char)-1) 964 1115 return true; … … 966 1117 if (!demote_packet(packet)) { 967 1118 trace_set_err_out(libtrace, 968 TRACE_ERR_NO_CONVERSION,969 "No erf type for packet (%i)",970 trace_get_link_type(packet));1119 TRACE_ERR_NO_CONVERSION, 1120 "No erf type for packet (%i)", 1121 trace_get_link_type(packet)); 971 1122 return false; 972 1123 } … … 978 1129 979 1130 /* Writes a packet to the provided DAG output trace */ 980 static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet) {981 /* 982 * This is heavily borrowed from erf_write_packet(). Yes, CnP coding983 * sucks, sorry about that.1131 static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet) 1132 { 1133 /* This is heavily borrowed from erf_write_packet(). Yes, CnP 1134 * coding sucks, sorry about that. 984 1135 */ 985 1136 unsigned int pad = 0; … … 990 1141 991 1142 if(!packet->header) { 992 /* No header, probably an RT packet. Lifted from 1143 /* No header, probably an RT packet. Lifted from 993 1144 * erf_write_packet(). */ 994 1145 return -1; … … 1009 1160 1010 1161 if (packet->type == TRACE_RT_DATA_ERF) { 1011 numbytes = dag_dump_packet(libtrace, 1012 header, 1013 pad, 1014 payload 1015 ); 1016 1162 numbytes = dag_dump_packet(libtrace, header, pad, payload); 1017 1163 } else { 1018 1164 /* Build up a new packet header from the existing header */ 1019 1165 1020 /* Simplify the packet first - if we can't do this, break 1166 /* Simplify the packet first - if we can't do this, break 1021 1167 * early */ 1022 1168 if (!find_compatible_linktype(libtrace,packet,&erf_type)) … … 1037 1183 1038 1184 /* Packet length (rlen includes format overhead) */ 1039 assert(trace_get_capture_length(packet)>0 1040 && trace_get_capture_length(packet)<=65536); 1041 assert(erf_get_framing_length(packet)>0 1042 && trace_get_framing_length(packet)<=65536); 1043 assert(trace_get_capture_length(packet)+erf_get_framing_length(packet)>0 1044 &&trace_get_capture_length(packet)+erf_get_framing_length(packet)<=65536); 1185 assert(trace_get_capture_length(packet) > 0 1186 && trace_get_capture_length(packet) <= 65536); 1187 assert(erf_get_framing_length(packet) > 0 1188 && trace_get_framing_length(packet) <= 65536); 1189 assert(trace_get_capture_length(packet) + 1190 erf_get_framing_length(packet) > 0 1191 && trace_get_capture_length(packet) + 1192 erf_get_framing_length(packet) <= 65536); 1045 1193 1046 1194 erfhdr.rlen = htons(trace_get_capture_length(packet) 1047 + erf_get_framing_length(packet));1195 + erf_get_framing_length(packet)); 1048 1196 1049 1197 … … 1054 1202 1055 1203 /* Write it out */ 1056 numbytes = dag_dump_packet(libtrace, 1057 &erfhdr, 1058 pad, 1059 payload); 1060 1204 numbytes = dag_dump_packet(libtrace, &erfhdr, pad, payload); 1061 1205 } 1062 1206 … … 1068 1212 * If DUCK reporting is enabled, the packet returned may be a DUCK update 1069 1213 */ 1070 static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) { 1071 int size = 0; 1072 struct timeval tv; 1073 dag_record_t *erfptr = NULL; 1214 static int dag_read_packet_real(libtrace_t *libtrace, 1215 struct dag_per_stream_t *stream_data, 1216 libtrace_thread_t *t, /* Optional */ 1217 libtrace_packet_t *packet) 1218 { 1219 dag_record_t *erfptr = NULL; 1074 1220 int numbytes = 0; 1075 1221 uint32_t flags = 0; 1076 struct timeval maxwait; 1077 struct timeval pollwait; 1222 struct timeval maxwait, pollwait; 1078 1223 1079 1224 pollwait.tv_sec = 0; … … 1088 1233 return size; 1089 1234 1090 1091 1235 /* Don't let anyone try to free our DAG memory hole! */ 1092 1236 flags |= TRACE_PREP_DO_NOT_OWN_BUFFER; … … 1094 1238 /* If the packet buffer is currently owned by libtrace, free it so 1095 1239 * that we can set the packet to point into the DAG memory hole */ 1096 if (packet->buf_control == TRACE_CTRL_PACKET) { 1097 free(packet->buffer); 1098 packet->buffer = 0; 1099 } 1100 1101 if (dag_set_stream_poll(FORMAT_DATA->device->fd, 1102 FORMAT_DATA->dagstream, sizeof(dag_record_t), &maxwait, 1103 &pollwait) == -1) 1104 { 1240 if (packet->buf_control == TRACE_CTRL_PACKET) { 1241 free(packet->buffer); 1242 packet->buffer = 0; 1243 } 1244 1245 if (dag_set_stream_poll(stream_data->device->fd, stream_data->dagstream, 1246 sizeof(dag_record_t), &maxwait, 1247 &pollwait) == -1) { 1105 1248 trace_set_err(libtrace, errno, "dag_set_stream_poll"); 1106 1249 return -1; 1107 1250 } 1108 1251 1109 1110 1252 /* Grab a full ERF record */ 1111 1253 do { 1112 numbytes = dag_available(libtrace );1254 numbytes = dag_available(libtrace, stream_data); 1113 1255 if (numbytes < 0) 1114 1256 return numbytes; 1115 1257 if (numbytes < dag_record_size) { 1258 /* Check the message queue if we have one to check */ 1259 if (t != NULL && 1260 libtrace_message_queue_count(&t->messages) > 0) 1261 return -2; 1262 1116 1263 if (libtrace_halt) 1117 1264 return 0; … … 1119 1266 continue; 1120 1267 } 1121 erfptr = dag_get_record( libtrace);1268 erfptr = dag_get_record(stream_data); 1122 1269 } while (erfptr == NULL); 1123 1270 1124 1271 /* Prepare the libtrace packet */ 1125 if (dag_prepare_packet (libtrace, packet, erfptr, TRACE_RT_DATA_ERF,1126 flags))1272 if (dag_prepare_packet_real(libtrace, stream_data, packet, erfptr, 1273 TRACE_RT_DATA_ERF, flags)) 1127 1274 return -1; 1128 1275 1129 /* Update the DUCK timer */1130 tv = trace_get_timeval(packet);1131 DUCK.last_pkt = tv.tv_sec;1132 1133 1276 return packet->payload ? htons(erfptr->rlen) : 1134 erf_get_framing_length(packet); 1277 erf_get_framing_length(packet); 1278 } 1279 1280 static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) 1281 { 1282 return dag_read_packet_real(libtrace, FORMAT_DATA_FIRST, NULL, packet); 1283 } 1284 1285 static int dag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t, 1286 libtrace_packet_t **packets, size_t nb_packets) 1287 { 1288 int ret; 1289 size_t read_packets = 0; 1290 int numbytes = 0; 1291 1292 struct dag_per_stream_t *stream_data = 1293 (struct dag_per_stream_t *)t->format_data; 1294 1295 /* Read as many packets as we can, but read atleast one packet */ 1296 do { 1297 ret = dag_read_packet_real(libtrace, stream_data, t, 1298 packets[read_packets]); 1299 if (ret < 0) 1300 return ret; 1301 1302 read_packets++; 1303 1304 /* Make sure we don't read too many packets..! */ 1305 if (read_packets >= nb_packets) 1306 break; 1307 1308 numbytes = dag_available(libtrace, stream_data); 1309 } while (numbytes >= dag_record_size); 1310 1311 return read_packets; 1135 1312 } 1136 1313 … … 1140 1317 */ 1141 1318 static libtrace_eventobj_t trace_event_dag(libtrace_t *libtrace, 1142 libtrace_packet_t *packet) { 1143 libtrace_eventobj_t event = {0,0,0.0,0}; 1319 libtrace_packet_t *packet) 1320 { 1321 libtrace_eventobj_t event = {0,0,0.0,0}; 1144 1322 dag_record_t *erfptr = NULL; 1145 1323 int numbytes; … … 1160 1338 } 1161 1339 1162 if (dag_set_stream_poll(FORMAT_DATA->device->fd, 1163 FORMAT_DATA->dagstream, 0, &minwait, 1164 &minwait) == -1) 1165 { 1340 if (dag_set_stream_poll(FORMAT_DATA_FIRST->device->fd, 1341 FORMAT_DATA_FIRST->dagstream, 0, &minwait, 1342 &minwait) == -1) { 1166 1343 trace_set_err(libtrace, errno, "dag_set_stream_poll"); 1167 1344 event.type = TRACE_EVENT_TERMINATE; … … 1172 1349 erfptr = NULL; 1173 1350 numbytes = 0; 1174 1351 1175 1352 /* Need to call dag_available so that the top pointer will get 1176 1353 * updated, otherwise we'll never see any data! */ 1177 numbytes = dag_available(libtrace );1178 1179 /* May as well not bother calling dag_get_record if 1354 numbytes = dag_available(libtrace, FORMAT_DATA_FIRST); 1355 1356 /* May as well not bother calling dag_get_record if 1180 1357 * dag_available suggests that there's no data */ 1181 1358 if (numbytes != 0) 1182 erfptr = dag_get_record( libtrace);1359 erfptr = dag_get_record(FORMAT_DATA_FIRST); 1183 1360 if (erfptr == NULL) { 1184 1361 /* No packet available - sleep for a very short time */ 1185 1362 if (libtrace_halt) { 1186 1363 event.type = TRACE_EVENT_TERMINATE; 1187 } else { 1364 } else { 1188 1365 event.type = TRACE_EVENT_SLEEP; 1189 1366 event.seconds = 0.0001; … … 1191 1368 break; 1192 1369 } 1193 if (dag_prepare_packet (libtrace, packet, erfptr,1194 TRACE_RT_DATA_ERF, flags)) {1370 if (dag_prepare_packet_real(libtrace, FORMAT_DATA_FIRST, packet, 1371 erfptr, TRACE_RT_DATA_ERF, flags)) { 1195 1372 event.type = TRACE_EVENT_TERMINATE; 1196 1373 break; … … 1198 1375 1199 1376 1200 event.size = trace_get_capture_length(packet) + 1201 1202 1377 event.size = trace_get_capture_length(packet) + 1378 trace_get_framing_length(packet); 1379 1203 1380 /* XXX trace_read_packet() normally applies the following 1204 1381 * config options for us, but this function is called via … … 1206 1383 1207 1384 if (libtrace->filter) { 1208 int filtret = trace_apply_filter(libtrace->filter, 1209 packet);1385 int filtret = trace_apply_filter(libtrace->filter, 1386 packet); 1210 1387 if (filtret == -1) { 1211 1388 trace_set_err(libtrace, TRACE_ERR_BAD_FILTER, 1212 1389 "Bad BPF Filter"); 1213 1390 event.type = TRACE_EVENT_TERMINATE; 1214 1391 break; … … 1221 1398 * a sleep event in this case, like we used to 1222 1399 * do! */ 1223 1400 libtrace->filtered_packets ++; 1224 1401 trace_clear_cache(packet); 1225 1402 continue; 1226 1403 } 1227 1404 1228 1405 event.type = TRACE_EVENT_PACKET; 1229 1406 } else { … … 1238 1415 trace_set_capture_length(packet, libtrace->snaplen); 1239 1416 } 1240 1417 libtrace->accepted_packets ++; 1241 1418 break; 1242 } while 1419 } while(1); 1243 1420 1244 1421 return event; … … 1246 1423 1247 1424 /* Gets the number of dropped packets */ 1248 static uint64_t dag_get_dropped_packets(libtrace_t *trace) { 1249 if (trace->format_data == NULL) 1250 return (uint64_t)-1; 1251 return DATA(trace)->drops; 1425 static uint64_t dag_get_dropped_packets(libtrace_t *libtrace) 1426 { 1427 uint64_t sum = 0; 1428 libtrace_list_node_t *tmp = FORMAT_DATA_HEAD; 1429 1430 /* Sum the drop counter for all the packets */ 1431 while (tmp != NULL) { 1432 sum += STREAM_DATA(tmp)->drops; 1433 tmp = tmp->next; 1434 } 1435 1436 return sum; 1252 1437 } 1253 1438 1254 1439 /* Prints some semi-useful help text about the DAG format module */ 1255 1440 static void dag_help(void) { 1256 printf("dag format module: $Revision: 1755 $\n"); 1257 printf("Supported input URIs:\n"); 1258 printf("\tdag:/dev/dagn\n"); 1259 printf("\n"); 1260 printf("\te.g.: dag:/dev/dag0\n"); 1261 printf("\n"); 1262 printf("Supported output URIs:\n"); 1263 printf("\tnone\n"); 1264 printf("\n"); 1441 printf("dag format module: $Revision: 1755 $\n"); 1442 printf("Supported input URIs:\n"); 1443 printf("\tdag:/dev/dagn\n"); 1444 printf("\n"); 1445 printf("\te.g.: dag:/dev/dag0\n"); 1446 printf("\n"); 1447 printf("Supported output URIs:\n"); 1448 printf("\tnone\n"); 1449 printf("\n"); 1450 } 1451 1452 static int dag_pconfig_input(UNUSED libtrace_t *libtrace, 1453 trace_parallel_option_t option, UNUSED void *value) 1454 { 1455 /* We don't support any of these! Normally you configure the DAG card 1456 * externally. */ 1457 switch(option) { 1458 case TRACE_OPTION_SET_HASHER: 1459 case TRACE_OPTION_SET_PERPKT_THREAD_COUNT: 1460 case TRACE_OPTION_TRACETIME: 1461 case TRACE_OPTION_TICK_INTERVAL: 1462 case TRACE_OPTION_GET_CONFIG: 1463 case TRACE_OPTION_SET_CONFIG: 1464 return -1; 1465 } 1466 /* We don't provide a default option to ensure that future options will 1467 * generate a compiler warning. */ 1468 1469 return -1; 1470 } 1471 1472 /* TODO: Should possibly make a more generic dag_start_input, as there's a 1473 * fair bit of code duplication between that and this */ 1474 static int dag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t, 1475 bool reader) 1476 { 1477 struct timeval zero, nopoll; 1478 uint8_t *top, *bottom, *starttop; 1479 struct dag_per_stream_t *stream_data; 1480 top = bottom = NULL; 1481 1482 /* Minimum delay is 10mS */ 1483 zero.tv_sec = 0; 1484 zero.tv_usec = 10000; 1485 nopoll = zero; 1486 1487 if (reader) { 1488 if (t->type == THREAD_PERPKT) { 1489 stream_data = 1490 (struct dag_per_stream_t *) 1491 libtrace_list_get_index(FORMAT_DATA->per_stream, 1492 t->perpkt_num)->data; 1493 1494 /* Pass the per thread data to the thread */ 1495 t->format_data = stream_data; 1496 1497 /* Attach and start the DAG stream */ 1498 printf("t%u: starting and attaching stream #%u\n", 1499 t->perpkt_num, stream_data->dagstream); 1500 if (dag_attach_stream(stream_data->device->fd, 1501 stream_data->dagstream, 0, 1502 0) < 0) { 1503 printf("can't attach DAG stream #%u\n", 1504 stream_data->dagstream); 1505 trace_set_err(libtrace, errno, 1506 "can't attach DAG stream #%u", 1507 stream_data->dagstream); 1508 return -1; 1509 } 1510 if (dag_start_stream(stream_data->device->fd, 1511 stream_data->dagstream) < 0) { 1512 trace_set_err(libtrace, errno, 1513 "can't start DAG stream #%u", 1514 stream_data->dagstream); 1515 printf("can't start DAG stream #%u\n", 1516 stream_data->dagstream); 1517 return -1; 1518 } 1519 1520 /* Ensure that dag_advance_stream will return without 1521 * blocking */ 1522 if(dag_set_stream_poll(stream_data->device->fd, 1523 stream_data->dagstream, 0, &zero, 1524 &nopoll) < 0) { 1525 trace_set_err(libtrace, errno, 1526 "dag_set_stream_poll failed!"); 1527 return -1; 1528 } 1529 1530 /* Clear all the data from the memory hole */ 1531 starttop = dag_advance_stream(stream_data-> 1532 device->fd, 1533 stream_data->dagstream, 1534 &bottom); 1535 1536 top = starttop; 1537 while (starttop - bottom > 0) { 1538 bottom += (starttop - bottom); 1539 top = dag_advance_stream(stream_data-> 1540 device->fd, 1541 stream_data->dagstream, 1542 &bottom); 1543 } 1544 stream_data->top = top; 1545 stream_data->bottom = bottom; 1546 stream_data->pkt_count = 0; 1547 stream_data->drops = 0; 1548 } else { 1549 /* TODO: Figure out why t->type != THREAD_PERPKT in 1550 * order to figure out what this line does */ 1551 t->format_data = FORMAT_DATA_FIRST; 1552 } 1553 } 1554 1555 fprintf(stderr, "t%u: registered thread\n", t->perpkt_num); 1556 1557 return 0; 1265 1558 } 1266 1559 1267 1560 static struct libtrace_format_t dag = { 1268 1269 1270 1561 "dag", 1562 "$Id$", 1563 TRACE_FORMAT_ERF, 1271 1564 dag_probe_filename, /* probe filename */ 1272 1565 NULL, /* probe magic */ 1273 1274 1275 1276 1566 dag_init_input, /* init_input */ 1567 dag_config_input, /* config_input */ 1568 dag_start_input, /* start_input */ 1569 dag_pause_input, /* pause_input */ 1277 1570 dag_init_output, /* init_output */ 1278 1571 NULL, /* config_output */ 1279 1572 dag_start_output, /* start_output */ 1280 1573 dag_fin_input, /* fin_input */ 1281 1574 dag_fin_output, /* fin_output */ 1282 1283 1575 dag_read_packet, /* read_packet */ 1576 dag_prepare_packet, /* prepare_packet */ 1284 1577 NULL, /* fin_packet */ 1285 1578 dag_write_packet, /* write_packet */ 1286 1287 1288 1289 1290 1291 1579 erf_get_link_type, /* get_link_type */ 1580 erf_get_direction, /* get_direction */ 1581 erf_set_direction, /* set_direction */ 1582 erf_get_erf_timestamp, /* get_erf_timestamp */ 1583 NULL, /* get_timeval */ 1584 NULL, /* get_seconds */ 1292 1585 NULL, /* get_timespec */ 1293 1294 1295 1296 1297 1298 1299 1586 NULL, /* seek_erf */ 1587 NULL, /* seek_timeval */ 1588 NULL, /* seek_seconds */ 1589 erf_get_capture_length, /* get_capture_length */ 1590 erf_get_wire_length, /* get_wire_length */ 1591 erf_get_framing_length, /* get_framing_length */ 1592 erf_set_capture_length, /* set_capture_length */ 1300 1593 NULL, /* get_received_packets */ 1301 1594 NULL, /* get_filtered_packets */ 1302 1595 dag_get_dropped_packets, /* get_dropped_packets */ 1303 1596 NULL, /* get_captured_packets */ 1304 NULL, /* get_fd */ 1305 trace_event_dag, /* trace_event */ 1306 dag_help, /* help */ 1307 NULL /* next pointer */ 1597 NULL, /* get_fd */ 1598 trace_event_dag, /* trace_event */ 1599 dag_help, /* help */ 1600 NULL, /* next pointer */ 1601 {true, 0}, /* live packet capture, thread limit TBD */ 1602 dag_pstart_input, 1603 dag_pread_packets, 1604 dag_pause_input, 1605 NULL, 1606 dag_pconfig_input, 1607 dag_pregister_thread, 1608 NULL 1308 1609 }; 1309 1610 1310 void dag_constructor(void) { 1611 void dag_constructor(void) 1612 { 1311 1613 register_format(&dag); 1312 1614 } -
lib/format_dpdk.c
rb585975 r12ae766 3 3 * This file is part of libtrace 4 4 * 5 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton, 5 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton, 6 6 * New Zealand. 7 7 * 8 * Author: Richard Sanger 9 * 8 * Author: Richard Sanger 9 * 10 10 * All rights reserved. 11 11 * 12 * This code has been developed by the University of Waikato WAND 12 * This code has been developed by the University of Waikato WAND 13 13 * research group. For further information please see http://www.wand.net.nz/ 14 14 * … … 36 36 * Intel Data Plane Development Kit is a LIVE capture format. 37 37 * 38 * This format also supports writing which will write packets out to the 39 * network as a form of packet replay. This should not be confused with the 40 * RT protocol which is intended to transfer captured packet records between 38 * This format also supports writing which will write packets out to the 39 * network as a form of packet replay. This should not be confused with the 40 * RT protocol which is intended to transfer captured packet records between 41 41 * RT-speaking programs. 42 42 */ 43 44 #define _GNU_SOURCE 43 45 44 46 #include "config.h" … … 47 49 #include "format_helper.h" 48 50 #include "libtrace_arphrd.h" 51 #include "hash_toeplitz.h" 49 52 50 53 #ifdef HAVE_INTTYPES_H … … 59 62 #include <endian.h> 60 63 #include <string.h> 64 65 #if HAVE_LIBNUMA 66 #include <numa.h> 67 #endif 61 68 62 69 /* We can deal with any minor differences by checking the RTE VERSION … … 151 158 #include <rte_mempool.h> 152 159 #include <rte_mbuf.h> 153 154 /* The default size of memory buffers to use - This is the max size of standard 160 #include <rte_launch.h> 161 #include <rte_lcore.h> 162 #include <rte_per_lcore.h> 163 #include <rte_cycles.h> 164 #include <pthread.h> 165 166 /* The default size of memory buffers to use - This is the max size of standard 155 167 * ethernet packet less the size of the MAC CHECKSUM */ 156 168 #define RX_MBUF_SIZE 1514 157 169 158 /* The minimum number of memory buffers per queue tx or rx. Search for 170 /* The minimum number of memory buffers per queue tx or rx. Search for 159 171 * _MIN_RING_DESC in DPDK. The largest minimum is 64 for 10GBit cards. 160 172 */ … … 174 186 #define NB_TX_MBUF 1024 175 187 176 /* The size of the PCI blacklist needs to be big enough to contain 188 /* The size of the PCI blacklist needs to be big enough to contain 177 189 * every PCI device address (listed by lspci every bus:device.function tuple). 178 190 */ … … 181 193 /* The maximum number of characters the mempool name can be */ 182 194 #define MEMPOOL_NAME_LEN 20 195 196 /* For single threaded libtrace we read packets as a batch/burst 197 * this is the maximum size of said burst */ 198 #define BURST_SIZE 50 183 199 184 200 #define MBUF(x) ((struct rte_mbuf *) x) … … 186 202 #define MBUF_PKTDATA(x) ((char *) x + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM) 187 203 #define FORMAT(x) ((struct dpdk_format_data_t*)(x->format_data)) 204 #define PERPKT_FORMAT(x) ((struct dpdk_per_lcore_t*)(x->format_data)) 205 188 206 #define TV_TO_NS(tv) ((uint64_t) tv.tv_sec*1000000000ull + \ 189 207 (uint64_t) tv.tv_usec*1000ull) 190 208 #define TS_TO_NS(ts) ((uint64_t) ts.tv_sec*1000000000ull + \ 191 209 (uint64_t) ts.tv_nsec) 192 210 193 211 #if RTE_PKTMBUF_HEADROOM != 128 194 212 #warning "RTE_PKT_MBUF_HEADROOM is not set to the default value of 128 - " \ 195 196 213 "any libtrace instance processing these packet must be have the" \ 214 "same RTE_PKTMBUF_HEADROOM set" 197 215 #endif 198 216 199 217 /* ~~~~~~~~~~~~~~~~~~~~~~ Advance settings ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 200 * THESE MAY REQUIRE MODIFICATIONS TO INTEL DPDK 201 * 218 * THESE MAY REQUIRE MODIFICATIONS TO INTEL DPDK 219 * 202 220 * Make sure you understand what these are doing before enabling them. 203 221 * They might make traces incompatable with other builds etc. 204 * 222 * 205 223 * These are also included to show how to do somethings which aren't 206 224 * obvious in the DPDK documentation. 207 225 */ 208 226 209 /* Print verbose messages to std out*/227 /* Print verbose messages to stderr */ 210 228 #define DEBUG 0 211 229 212 /* Use clock_gettime() for nanosecond resolution rather than gettimeofday() 213 * only turn on if you know clock_gettime is a vsyscall on your system 230 /* Use clock_gettime() for nanosecond resolution rather than gettimeofday() 231 * only turn on if you know clock_gettime is a vsyscall on your system 214 232 * overwise could be a large overhead. Again gettimeofday() should be 215 233 * vsyscall also if it's not you should seriously consider updating your … … 218 236 #ifdef HAVE_LIBRT 219 237 /* You can turn this on (set to 1) to prefer clock_gettime */ 220 #define USE_CLOCK_GETTIME 0238 #define USE_CLOCK_GETTIME 1 221 239 #else 222 240 /* DONT CHANGE THIS !!! */ 223 #define USE_CLOCK_GETTIME 0241 #define USE_CLOCK_GETTIME 1 224 242 #endif 225 243 … … 229 247 * hence writing out a port such as int: ring: and dpdk: assumes there 230 248 * is no checksum and will attempt to write the checksum as part of the 231 * packet 249 * packet 232 250 */ 233 251 #define GET_MAC_CRC_CHECKSUM 0 234 252 235 253 /* This requires a modification of the pmd drivers (inside Intel DPDK) 254 * TODO this requires updating (packet sizes are wrong TS most likely also) 236 255 */ 237 256 #define HAS_HW_TIMESTAMPS_82580 0 … … 257 276 }; 258 277 278 struct dpdk_per_lcore_t 279 { 280 uint16_t queue_id; 281 uint8_t port; 282 uint64_t ts_last_sys; /* System timestamp of our most recent packet in nanoseconds */ 283 #if HAS_HW_TIMESTAMPS_82580 284 /* Timestamping only relevent to RX */ 285 uint64_t ts_first_sys; /* Sytem timestamp of the first packet in nanoseconds */ 286 uint32_t wrap_count; /* Number of times the NIC clock has wrapped around completely */ 287 #endif 288 }; 289 259 290 /* Used by both input and output however some fields are not used 260 291 * for output */ … … 263 294 uint8_t port; /* Always 0 we only whitelist a single port - Shared TX & RX */ 264 295 uint8_t nb_ports; /* Total number of usable ports on system should be 1 */ 265 uint8_t paused; /* See paused_state */ 296 uint8_t paused; /* See paused_state */ 266 297 uint16_t queue_id; /* Always 0 we use a single queue - Shared TX & RX */ 298 uint16_t link_speed; /* Link speed 10,100,1000,10000 etc. */ 267 299 int snaplen; /* The snap length for the capture - RX only */ 268 300 /* We always have to setup both rx and tx queues even if we don't want them */ 269 301 int nb_rx_buf; /* The number of packet buffers in the rx ring */ 270 302 int nb_tx_buf; /* The number of packet buffers in the tx ring */ 303 int nic_numa_node; /* The NUMA node that the NIC is attached to */ 271 304 struct rte_mempool * pktmbuf_pool; /* Our packet memory pool */ 272 305 #if DPDK_USE_BLACKLIST … … 275 308 #endif 276 309 char mempool_name[MEMPOOL_NAME_LEN]; /* The name of the mempool that we are using */ 277 #if HAS_HW_TIMESTAMPS_82580 278 /* Timestamping only relevent to RX */ 279 uint64_t ts_first_sys; /* Sytem timestamp of the first packet in nanoseconds */ 280 uint64_t ts_last_sys; /* System timestamp of our most recent packet in nanoseconds */ 281 uint32_t wrap_count; /* Number of times the NIC clock has wrapped around completely */ 282 #endif 310 uint8_t rss_key[40]; // This is the RSS KEY 311 /* To improve performance we always batch reading packets, in a burst */ 312 struct rte_mbuf* burst_pkts[BURST_SIZE]; 313 int burst_size; /* The total number read in the burst */ 314 int burst_offset; /* The offset we are into the burst */ 315 // DPDK normally seems to have a limit of 8 queues for a given card 316 struct dpdk_per_lcore_t per_lcore[RTE_MAX_LCORE]; 283 317 }; 284 318 … … 288 322 }; 289 323 290 /** 324 /** 291 325 * A structure placed in front of the packet where we can store 292 326 * additional information about the given packet. … … 294 328 * | rte_mbuf (pkt) | sizeof(rte_mbuf) 295 329 * +--------------------------+ 296 * | padding | RTE_PKTMBUF_HEADROOM-1-sizeof(dpdk_addt_hdr)297 * +--------------------------+298 330 * | dpdk_addt_hdr | sizeof(dpdk_addt_hdr) 299 331 * +--------------------------+ 300 * | sizeof(dpdk_addt_hdr) | 1 byte301 * +--------------------------+ 332 * | padding | RTE_PKTMBUF_HEADROOM-sizeof(dpdk_addt_hdr) 333 * +--------------------------+ 302 334 * * hw_timestamp_82580 * 16 bytes Optional 303 335 * +--------------------------+ … … 317 349 * We want to blacklist all devices except those on the whitelist 318 350 * (I say list, but yes it is only the one). 319 * 351 * 320 352 * The default behaviour of rte_pci_probe() will map every possible device 321 353 * to its DPDK driver. The DPDK driver will take the ethernet device 322 354 * out of the kernel (i.e. no longer /dev/ethx) and cannot be used. 323 * 324 * So blacklist all devices except the one that we wish to use so that 355 * 356 * So blacklist all devices except the one that we wish to use so that 325 357 * the others can still be used as standard ethernet ports. 326 358 * … … 336 368 337 369 TAILQ_FOREACH(dev, &device_list, next) { 338 if (whitelist != NULL && whitelist->domain == dev->addr.domain 339 340 341 342 370 if (whitelist != NULL && whitelist->domain == dev->addr.domain 371 && whitelist->bus == dev->addr.bus 372 && whitelist->devid == dev->addr.devid 373 && whitelist->function == dev->addr.function) 374 continue; 343 375 if (format_data->nb_blacklist >= sizeof (format_data->blacklist) 344 345 printf("Warning: too many devices to blacklist consider"346 376 / sizeof (format_data->blacklist[0])) { 377 fprintf(stderr, "Warning: too many devices to blacklist consider" 378 " increasing BLACK_LIST_SIZE"); 347 379 break; 348 380 } … … 360 392 char pci_str[20] = {0}; 361 393 snprintf(pci_str, sizeof(pci_str), PCI_PRI_FMT, 362 363 364 365 394 whitelist->domain, 395 whitelist->bus, 396 whitelist->devid, 397 whitelist->function); 366 398 if (rte_eal_devargs_add(RTE_DEVTYPE_WHITELISTED_PCI, pci_str) < 0) { 367 399 return -1; … … 375 407 * Fills in addr, note core is optional and is unchanged if 376 408 * a value for it is not provided. 377 * 409 * 378 410 * i.e. ./libtrace dpdk:0:1:0.0 -> 0:1:0.0 379 * or ./libtrace dpdk:0:1:0.1-2 -> 0:1:0.1 (Using CPU core #2) 411 * or ./libtrace dpdk:0:1:0.1-2 -> 0:1:0.1 (Using CPU core #2) 380 412 */ 381 413 static int parse_pciaddr(char * str, struct rte_pci_addr * addr, long * core) { … … 391 423 } 392 424 425 /** 426 * Convert a pci address to the numa node it is 427 * connected to. 428 * 429 * This checks /sys/bus/pci/devices/XXXX:XX:XX.X/numa_node 430 * so we can call it before DPDK 431 * 432 * @return -1 if unknown otherwise a number 0 or higher of the numa node 433 */ 434 static int pci_to_numa(struct rte_pci_addr * dev_addr) { 435 char path[50] = {0}; 436 FILE *file; 437 438 /* Read from the system */ 439 snprintf(path, sizeof(path), "/sys/bus/pci/devices/"PCI_PRI_FMT"/numa_node", 440 dev_addr->domain, 441 dev_addr->bus, 442 dev_addr->devid, 443 dev_addr->function); 444 445 if((file = fopen(path, "r")) != NULL) { 446 int numa_node = -1; 447 fscanf(file, "%d", &numa_node); 448 fclose(file); 449 return numa_node; 450 } 451 return -1; 452 } 453 393 454 #if DEBUG 394 455 /* For debugging */ … … 399 460 400 461 if (nb_cpu <= 0) { 401 402 462 perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core."); 463 nb_cpu = 1; /* fallback to just 1 core */ 403 464 } 404 465 if (nb_cpu > RTE_MAX_LCORE) 405 466 nb_cpu = RTE_MAX_LCORE; 406 467 407 468 global_config = rte_eal_get_configuration(); 408 469 409 470 if (global_config != NULL) { 410 int i; 411 fprintf(stderr, "Intel DPDK setup\n" 412 "---Version : %s\n" 413 "---Master LCore : %"PRIu32"\n" 414 "---LCore Count : %"PRIu32"\n", 415 rte_version(), 416 global_config->master_lcore, global_config->lcore_count); 417 418 for (i = 0 ; i < nb_cpu; i++) { 419 fprintf(stderr, " ---Core %d : %s\n", i, 420 global_config->lcore_role[i] == ROLE_RTE ? "on" : "off"); 421 } 422 423 const char * proc_type; 424 switch (global_config->process_type) { 425 case RTE_PROC_AUTO: 426 proc_type = "auto"; 427 break; 428 case RTE_PROC_PRIMARY: 429 proc_type = "primary"; 430 break; 431 case RTE_PROC_SECONDARY: 432 proc_type = "secondary"; 433 break; 434 case RTE_PROC_INVALID: 435 proc_type = "invalid"; 436 break; 437 default: 438 proc_type = "something worse than invalid!!"; 439 } 440 fprintf(stderr, "---Process Type : %s\n", proc_type); 441 } 442 443 } 444 #endif 471 int i; 472 fprintf(stderr, "Intel DPDK setup\n" 473 "---Version : %s\n" 474 "---Master LCore : %"PRIu32"\n" 475 "---LCore Count : %"PRIu32"\n", 476 rte_version(), 477 global_config->master_lcore, global_config->lcore_count); 478 479 for (i = 0 ; i < nb_cpu; i++) { 480 fprintf(stderr, " ---Core %d : %s\n", i, 481 global_config->lcore_role[i] == ROLE_RTE ? "on" : "off"); 482 } 483 484 const char * proc_type; 485 switch (global_config->process_type) { 486 case RTE_PROC_AUTO: 487 proc_type = "auto"; 488 break; 489 case RTE_PROC_PRIMARY: 490 proc_type = "primary"; 491 break; 492 case RTE_PROC_SECONDARY: 493 proc_type = "secondary"; 494 break; 495 case RTE_PROC_INVALID: 496 proc_type = "invalid"; 497 break; 498 default: 499 proc_type = "something worse than invalid!!"; 500 } 501 fprintf(stderr, "---Process Type : %s\n", proc_type); 502 } 503 504 } 505 #endif 506 507 /** 508 * Expects to be called from the master lcore and moves it to the given dpdk id 509 * @param core (zero indexed) If core is on the physical system affinity is bound otherwise 510 * affinity is set to all cores. Must be less than RTE_MAX_LCORE 511 * and not already in use. 512 * @return 0 is successful otherwise -1 on error. 513 */ 514 static inline int dpdk_move_master_lcore(size_t core) { 515 struct rte_config *cfg = rte_eal_get_configuration(); 516 cpu_set_t cpuset; 517 int i; 518 519 assert (core < RTE_MAX_LCORE); 520 assert (rte_get_master_lcore() == rte_lcore_id()); 521 522 if (core == rte_lcore_id()) 523 return 0; 524 525 // Make sure we are not overwriting someone else 526 assert(!rte_lcore_is_enabled(core)); 527 528 // Move the core 529 cfg->lcore_role[rte_lcore_id()] = ROLE_OFF; 530 cfg->lcore_role[core] = ROLE_RTE; 531 lcore_config[core].thread_id = lcore_config[rte_lcore_id()].thread_id; 532 rte_eal_get_configuration()->master_lcore = core; 533 RTE_PER_LCORE(_lcore_id) = core; 534 535 // Now change the affinity 536 CPU_ZERO(&cpuset); 537 538 if (lcore_config[core].detected) { 539 CPU_SET(core, &cpuset); 540 } else { 541 for (i = 0; i < RTE_MAX_LCORE; ++i) { 542 if (lcore_config[i].detected) 543 CPU_SET(i, &cpuset); 544 } 545 } 546 547 i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset); 548 if (i != 0) { 549 // TODO proper libtrace style error here!! 550 fprintf(stderr, "pthread_setaffinity_np failed\n"); 551 return -1; 552 } 553 return 0; 554 } 555 445 556 446 557 /** … … 473 584 474 585 static inline int dpdk_init_environment(char * uridata, struct dpdk_format_data_t * format_data, 475 586 char * err, int errlen) { 476 587 int ret; /* Returned error codes */ 477 588 struct rte_pci_addr use_addr; /* The only address that we don't blacklist */ … … 480 591 long nb_cpu; /* The number of CPUs in the system */ 481 592 long my_cpu; /* The CPU number we want to bind to */ 593 int i; 594 struct rte_config *cfg = rte_eal_get_configuration(); 482 595 struct saved_getopts save_opts; 483 596 484 597 #if DEBUG 485 598 rte_set_log_level(RTE_LOG_DEBUG); … … 489 602 /* 490 603 * Using unique file prefixes mean separate memory is used, unlinking 491 * the two processes. However be careful we still cannot access a 604 * the two processes. However be careful we still cannot access a 605 * port that already in use. 606 * 607 * Using unique file prefixes mean separate memory is used, unlinking 608 * the two processes. However be careful we still cannot access a 492 609 * port that already in use. 493 610 */ … … 510 627 /* This initialises the Environment Abstraction Layer (EAL) 511 628 * If we had slave workers these are put into WAITING state 512 * 629 * 513 630 * Basically binds this thread to a fixed core, which we choose as 514 631 * the last core on the machine (assuming fewer interrupts mapped here). … … 521 638 */ 522 639 523 /* Get the number of cpu cores in the system and use the last core */ 640 /* Get the number of cpu cores in the system and use the last core 641 * on the correct numa node */ 524 642 nb_cpu = sysconf(_SC_NPROCESSORS_ONLN); 525 643 if (nb_cpu <= 0) { 526 527 644 perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core."); 645 nb_cpu = 1; /* fallback to the first core */ 528 646 } 529 647 if (nb_cpu > RTE_MAX_LCORE) 530 531 532 my_cpu = nb_cpu;533 /* This allows the user to specify the core - we would try to do this 648 nb_cpu = RTE_MAX_LCORE; 649 650 my_cpu = -1; 651 /* This allows the user to specify the core - we would try to do this 534 652 * automatically but it's hard to tell that this is secondary 535 * before running rte_eal_init(...). Currently we are limited to 1 653 * before running rte_eal_init(...). Currently we are limited to 1 536 654 * instance per core due to the way memory is allocated. */ 537 655 if (parse_pciaddr(uridata, &use_addr, &my_cpu) != 0) { 538 snprintf(err, errlen, "Failed to parse URI"); 539 return -1; 540 } 656 snprintf(err, errlen, "Failed to parse URI"); 657 return -1; 658 } 659 660 #if HAVE_LIBNUMA 661 format_data->nic_numa_node = pci_to_numa(&use_addr); 662 if (my_cpu < 0) { 663 /* If we can assign to a core on the same numa node */ 664 fprintf(stderr, "Using pci card on numa_node%d\n", format_data->nic_numa_node); 665 if(format_data->nic_numa_node >= 0) { 666 int max_node_cpu = -1; 667 struct bitmask *mask = numa_allocate_cpumask(); 668 assert(mask); 669 numa_node_to_cpus(format_data->nic_numa_node, mask); 670 for (i = 0 ; i < nb_cpu; ++i) { 671 if (numa_bitmask_isbitset(mask,i)) 672 max_node_cpu = i+1; 673 } 674 my_cpu = max_node_cpu; 675 } 676 } 677 #endif 678 if (my_cpu < 0) { 679 my_cpu = nb_cpu; 680 } 681 541 682 542 683 snprintf(format_data->mempool_name, MEMPOOL_NAME_LEN, 543 684 "libtrace_pool_%"PRIu32, (uint32_t) nb_cpu); 544 685 545 686 if (!(my_cpu > 0 && my_cpu <= nb_cpu)) { 546 snprintf(err, errlen, 547 "Intel DPDK - User defined a bad CPU number %"PRIu32" must be" 548 " between 1 and %"PRIu32, (uint32_t) my_cpu, (uint32_t) nb_cpu); 549 return -1; 550 } 551 552 /* Make our mask */ 553 snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1)); 687 snprintf(err, errlen, 688 "Intel DPDK - User defined a bad CPU number %"PRIu32" must be" 689 " between 1 and %"PRIu32, (uint32_t) my_cpu, (uint32_t) nb_cpu); 690 return -1; 691 } 692 693 /* Make our mask with all cores turned on this is so that DPDK to gets CPU 694 info older versions */ 695 snprintf(cpu_number, sizeof(cpu_number), "%x", ~(UINT32_MAX<<MIN(31, nb_cpu))); 696 //snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1)); 554 697 555 698 #if !DPDK_USE_BLACKLIST 556 699 /* Black list all ports besides the one that we want to use */ 557 558 559 560 561 700 if ((ret = whitelist_device(format_data, &use_addr)) < 0) { 701 snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed," 702 " are you sure the address is correct?: %s", strerror(-ret)); 703 return -1; 704 } 562 705 #endif 563 706 564 707 /* Give the memory map a unique name */ 565 708 snprintf(mem_map, sizeof(mem_map), "libtrace-%d", (int) getpid()); 566 /* rte_eal_init it makes a call to getopt so we need to reset the 709 /* rte_eal_init it makes a call to getopt so we need to reset the 567 710 * global optind variable of getopt otherwise this fails */ 568 711 save_getopts(&save_opts); 569 712 optind = 1; 570 713 if ((ret = rte_eal_init(argc, argv)) < 0) { 571 snprintf(err, errlen, 572 573 714 snprintf(err, errlen, 715 "Intel DPDK - Initialisation of EAL failed: %s", strerror(-ret)); 716 return -1; 574 717 } 575 718 restore_getopts(&save_opts); 719 // These are still running but will never do anything with DPDK v1.7 we 720 // should remove this XXX in the future 721 for(i = 0; i < RTE_MAX_LCORE; ++i) { 722 if (rte_lcore_is_enabled(i) && i != (int) rte_get_master_lcore()) { 723 cfg->lcore_role[i] = ROLE_OFF; 724 cfg->lcore_count--; 725 } 726 } 727 // Only the master should be running 728 assert(cfg->lcore_count == 1); 729 730 dpdk_move_master_lcore(my_cpu-1); 576 731 577 732 #if DEBUG … … 584 739 */ 585 740 if ((ret = rte_pmd_init_all()) < 0) { 586 snprintf(err, errlen, 587 588 741 snprintf(err, errlen, 742 "Intel DPDK - rte_pmd_init_all failed: %s", strerror(-ret)); 743 return -1; 589 744 } 590 745 #endif … … 594 749 if ((ret = blacklist_devices(format_data, &use_addr)) < 0) { 595 750 snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed," 596 751 " are you sure the address is correct?: %s", strerror(-ret)); 597 752 return -1; 598 753 } … … 602 757 /* This loads DPDK drivers against all ports that are not blacklisted */ 603 758 if ((ret = rte_eal_pci_probe()) < 0) { 604 snprintf(err, errlen, 605 606 759 snprintf(err, errlen, 760 "Intel DPDK - rte_eal_pci_probe failed: %s", strerror(-ret)); 761 return -1; 607 762 } 608 763 #endif … … 611 766 612 767 if (format_data->nb_ports != 1) { 613 snprintf(err, errlen, 614 "Intel DPDK - rte_eth_dev_count returned %d but it should be 1", 615 format_data->nb_ports); 616 return -1; 617 } 768 snprintf(err, errlen, 769 "Intel DPDK - rte_eth_dev_count returned %d but it should be 1", 770 format_data->nb_ports); 771 return -1; 772 } 773 774 struct rte_eth_dev_info dev_info; 775 rte_eth_dev_info_get(0, &dev_info); 776 fprintf(stderr, "Device port=0\n\tmin_rx_bufsize=%d\n\tmax_rx_pktlen=%d\n\tmax rx queues=%d\n\tmax tx queues=%d", 777 (int) dev_info.min_rx_bufsize, (int) dev_info.max_rx_pktlen, (int) dev_info.max_rx_queues, (int) dev_info.max_tx_queues); 618 778 619 779 return 0; … … 623 783 char err[500]; 624 784 err[0] = 0; 625 785 626 786 libtrace->format_data = (struct dpdk_format_data_t *) 627 787 malloc(sizeof(struct dpdk_format_data_t)); 628 788 FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */ 629 789 FORMAT(libtrace)->queue_id = 0; /* Single queue per port */ … … 632 792 FORMAT(libtrace)->nb_rx_buf = NB_RX_MBUF; 633 793 FORMAT(libtrace)->nb_tx_buf = MIN_NB_BUF; 794 FORMAT(libtrace)->nic_numa_node = -1; 634 795 FORMAT(libtrace)->promisc = -1; 635 796 FORMAT(libtrace)->pktmbuf_pool = NULL; … … 639 800 FORMAT(libtrace)->paused = DPDK_NEVER_STARTED; 640 801 FORMAT(libtrace)->mempool_name[0] = 0; 641 #if HAS_HW_TIMESTAMPS_82580 642 FORMAT(libtrace)->ts_first_sys = 0; 643 FORMAT(libtrace)->ts_last_sys = 0; 644 FORMAT(libtrace)->wrap_count = 0; 645 #endif 802 memset(FORMAT(libtrace)->burst_pkts, 0, sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE); 803 FORMAT(libtrace)->burst_size = 0; 804 FORMAT(libtrace)->burst_offset = 0; 646 805 647 806 if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) { 648 649 650 651 807 trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err); 808 free(libtrace->format_data); 809 libtrace->format_data = NULL; 810 return -1; 652 811 } 653 812 return 0; … … 658 817 char err[500]; 659 818 err[0] = 0; 660 819 661 820 libtrace->format_data = (struct dpdk_format_data_t *) 662 821 malloc(sizeof(struct dpdk_format_data_t)); 663 822 FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */ 664 823 FORMAT(libtrace)->queue_id = 0; /* Single queue per port */ … … 667 826 FORMAT(libtrace)->nb_rx_buf = MIN_NB_BUF; 668 827 FORMAT(libtrace)->nb_tx_buf = NB_TX_MBUF; 828 FORMAT(libtrace)->nic_numa_node = -1; 669 829 FORMAT(libtrace)->promisc = -1; 670 830 FORMAT(libtrace)->pktmbuf_pool = NULL; … … 674 834 FORMAT(libtrace)->paused = DPDK_NEVER_STARTED; 675 835 FORMAT(libtrace)->mempool_name[0] = 0; 676 #if HAS_HW_TIMESTAMPS_82580 677 FORMAT(libtrace)->ts_first_sys = 0; 678 FORMAT(libtrace)->ts_last_sys = 0; 679 FORMAT(libtrace)->wrap_count = 0; 680 #endif 836 memset(FORMAT(libtrace)->burst_pkts, 0, sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE); 837 FORMAT(libtrace)->burst_size = 0; 838 FORMAT(libtrace)->burst_offset = 0; 681 839 682 840 if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) { 683 684 685 686 841 trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err); 842 free(libtrace->format_data); 843 libtrace->format_data = NULL; 844 return -1; 687 845 } 688 846 return 0; 689 847 }; 690 848 849 static int dpdk_pconfig_input (libtrace_t *libtrace, 850 trace_parallel_option_t option, 851 void *data) { 852 switch (option) { 853 case TRACE_OPTION_SET_HASHER: 854 switch (*((enum hasher_types *) data)) 855 { 856 case HASHER_BALANCE: 857 case HASHER_UNIDIRECTIONAL: 858 toeplitz_create_unikey(FORMAT(libtrace)->rss_key); 859 return 0; 860 case HASHER_BIDIRECTIONAL: 861 toeplitz_create_bikey(FORMAT(libtrace)->rss_key); 862 return 0; 863 case HASHER_HARDWARE: 864 case HASHER_CUSTOM: 865 // We don't support these 866 return -1; 867 } 868 break; 869 } 870 return -1; 871 } 691 872 /** 692 873 * Note here snaplen excludes the MAC checksum. Packets over 693 874 * the requested snaplen will be dropped. (Excluding MAC checksum) 694 * 875 * 695 876 * I.e the maximum size of a standard ethernet packet is 1518 (Including MAC checksum) 696 877 * So to allow packets upto 1518 this would be set to 1514 and if GET_MAC_CRC_CHECKSUM 697 878 * is set the maximum size of the returned packet would be 1518 otherwise 698 879 * 1514 would be the largest size possibly returned. 699 * 880 * 700 881 */ 701 882 static int dpdk_config_input (libtrace_t *libtrace, 702 703 883 trace_option_t option, 884 void *data) { 704 885 switch (option) { 705 706 /* Only support changing snaplen before a call to start is 707 708 709 710 711 712 886 case TRACE_OPTION_SNAPLEN: 887 /* Only support changing snaplen before a call to start is 888 * made */ 889 if (FORMAT(libtrace)->paused == DPDK_NEVER_STARTED) 890 FORMAT(libtrace)->snaplen=*(int*)data; 891 else 892 return -1; 893 return 0; 713 894 case TRACE_OPTION_PROMISC: 714 895 FORMAT(libtrace)->promisc=*(int*)data; 715 716 717 718 719 720 721 722 723 724 725 726 896 return 0; 897 case TRACE_OPTION_FILTER: 898 /* TODO filtering */ 899 break; 900 case TRACE_OPTION_META_FREQ: 901 break; 902 case TRACE_OPTION_EVENT_REALTIME: 903 break; 904 /* Avoid default: so that future options will cause a warning 905 * here to remind us to implement it, or flag it as 906 * unimplementable 907 */ 727 908 } 728 909 … … 734 915 /* Can set jumbo frames/ or limit the size of a frame by setting both 735 916 * max_rx_pkt_len and jumbo_frame. This can be limited to less than 736 * 917 * 737 918 */ 738 919 static struct rte_eth_conf port_conf = { 739 920 .rxmode = { 921 .mq_mode = ETH_RSS, 740 922 .split_hdr_size = 0, 741 923 .header_split = 0, /**< Header Split disabled */ … … 743 925 .hw_vlan_filter = 0, /**< VLAN filtering disabled */ 744 926 .jumbo_frame = 0, /**< Jumbo Frame Support disabled */ 745 927 .max_rx_pkt_len = 0, /**< Max frame Size if Jumbo enabled */ 746 928 #if GET_MAC_CRC_CHECKSUM 747 929 /* So it appears that if hw_strip_crc is turned off the driver will still … … 756 938 * always cut off the checksum in the future 757 939 */ 758 940 .hw_strip_crc = 1, /**< CRC stripped by hardware */ 759 941 #endif 760 942 }, … … 762 944 .mq_mode = ETH_DCB_NONE, 763 945 }, 946 .rx_adv_conf = { 947 .rss_conf = { 948 // .rss_key = &rss_key, // We set this per format 949 .rss_hf = ETH_RSS_IPV4_UDP | ETH_RSS_IPV6 | ETH_RSS_IPV4 | ETH_RSS_IPV4_TCP | ETH_RSS_IPV6_TCP | ETH_RSS_IPV6_UDP, 950 }, 951 }, 952 .intr_conf = { 953 .lsc = 1 954 } 764 955 }; 765 956 … … 776 967 static const struct rte_eth_txconf tx_conf = { 777 968 .tx_thresh = { 778 779 780 781 782 783 969 /** 970 * TX_PTHRESH prefetch 971 * Set on the NIC, if the number of unprocessed descriptors to queued on 972 * the card fall below this try grab at least hthresh more unprocessed 973 * descriptors. 974 */ 784 975 .pthresh = 36, 785 976 786 787 788 977 /* TX_HTHRESH host 978 * Set on the NIC, the batch size to prefetch unprocessed tx descriptors. 979 */ 789 980 .hthresh = 0, 790 791 792 793 794 795 796 797 798 799 981 982 /* TX_WTHRESH writeback 983 * Set on the NIC, the number of sent descriptors before writing back 984 * status to confirm the transmission. This is done more efficiently as 985 * a bulk DMA-transfer rather than writing one at a time. 986 * Similar to tx_free_thresh however this is applied to the NIC, where 987 * as tx_free_thresh is when DPDK will check these. This is extended 988 * upon by tx_rs_thresh (10Gbit cards) which doesn't write all 989 * descriptors rather only every n'th item, reducing DMA memory bandwidth. 990 */ 800 991 .wthresh = 4, 801 992 }, … … 808 999 809 1000 /* This is the Report Status threshold, used by 10Gbit cards, 810 * This signals the card to only write back status (such as 1001 * This signals the card to only write back status (such as 811 1002 * transmission successful) after this minimum number of transmit 812 1003 * descriptors are seen. The default is 32 (if set to 0) however if set … … 817 1008 }; 818 1009 1010 /** 1011 * A callback for a link state change (LSC). 1012 * 1013 * Packets may be received before this notification. In fact the DPDK IGXBE 1014 * driver likes to put a delay upto 5sec before sending this. 1015 * 1016 * We use this to ensure the link speed is correct for our timestamp 1017 * calculations. Because packets might be received before the link up we still 1018 * update this when the packet is received. 1019 * 1020 * @param port The DPDK port 1021 * @param event The TYPE of event (expected to be RTE_ETH_EVENT_INTR_LSC) 1022 * @param cb_arg The dpdk_format_data_t structure associated with the format 1023 */ 1024 static void dpdk_lsc_callback(uint8_t port, enum rte_eth_event_type event, 1025 void *cb_arg) { 1026 struct dpdk_format_data_t * format_data = cb_arg; 1027 struct rte_eth_link link_info; 1028 assert(event == RTE_ETH_EVENT_INTR_LSC); 1029 assert(port == format_data->port); 1030 1031 rte_eth_link_get_nowait(port, &link_info); 1032 1033 if (link_info.link_status) 1034 format_data->link_speed = link_info.link_speed; 1035 else 1036 format_data->link_speed = 0; 1037 1038 #if DEBUG 1039 fprintf(stderr, "LSC - link status is %s %s speed=%d\n", 1040 link_info.link_status ? "up" : "down", 1041 (link_info.link_duplex == ETH_LINK_FULL_DUPLEX) ? 1042 "full-duplex" : "half-duplex", 1043 (int) link_info.link_speed); 1044 #endif 1045 1046 /* Turns out DPDK drivers might not come back up if the link speed 1047 * changes. So we reset the autoneg procedure. This is very unsafe 1048 * we have have threads reading packets and we stop the port. */ 1049 #if 0 1050 if (!link_info.link_status) { 1051 int ret; 1052 rte_eth_dev_stop(port); 1053 ret = rte_eth_dev_start(port); 1054 if (ret < 0) { 1055 fprintf(stderr, "Resetting the DPDK port failed : %s\n", 1056 strerror(-ret)); 1057 } 1058 } 1059 #endif 1060 } 1061 819 1062 /* Attach memory to the port and start the port or restart the port. 820 1063 */ … … 822 1065 int ret; /* Check return values for errors */ 823 1066 struct rte_eth_link link_info; /* Wait for link */ 824 1067 unsigned cpu_numa_node = rte_lcore_to_socket_id(rte_lcore_id()); 1068 825 1069 /* Already started */ 826 1070 if (format_data->paused == DPDK_RUNNING) 827 828 829 /* First time started we need to alloc our memory, doing this here 1071 return 0; 1072 1073 /* First time started we need to alloc our memory, doing this here 830 1074 * rather than in environment setup because we don't have snaplen then */ 831 1075 if (format_data->paused == DPDK_NEVER_STARTED) { 832 833 834 835 836 837 838 839 840 841 842 1076 if (format_data->snaplen == 0) { 1077 format_data->snaplen = RX_MBUF_SIZE; 1078 port_conf.rxmode.jumbo_frame = 0; 1079 port_conf.rxmode.max_rx_pkt_len = 0; 1080 } else { 1081 /* Use jumbo frames */ 1082 port_conf.rxmode.jumbo_frame = 1; 1083 port_conf.rxmode.max_rx_pkt_len = format_data->snaplen; 1084 } 1085 1086 /* This is additional overhead so make sure we allow space for this */ 843 1087 #if GET_MAC_CRC_CHECKSUM 844 1088 format_data->snaplen += ETHER_CRC_LEN; 845 1089 #endif 846 1090 #if HAS_HW_TIMESTAMPS_82580 847 848 #endif 849 850 851 * from - TODO figure out if there is is a free function (I cannot see one) 852 * NOTE: RX queue requires nb_packets + 1 otherwise it fails to 853 * allocate however that extra 1 packet is not used. 854 855 * TX requires nb_tx_buffers + 1 in the case the queue is full 856 857 858 1091 format_data->snaplen += sizeof(struct hw_timestamp_82580); 1092 #endif 1093 1094 /* Create the mbuf pool, which is the place our packets are allocated 1095 * from - TODO figure out if there is is a free function (I cannot see one) 1096 * NOTE: RX queue requires nb_packets + 1 otherwise it fails to 1097 * allocate however that extra 1 packet is not used. 1098 * (I assume <= vs < error some where in DPDK code) 1099 * TX requires nb_tx_buffers + 1 in the case the queue is full 1100 * so that will fill the new buffer and wait until slots in the 1101 * ring become available. 1102 */ 859 1103 #if DEBUG 860 1104 fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name); 861 1105 #endif 862 863 864 format_data->nb_rx_buf + format_data->nb_tx_buf + 1,865 format_data->snaplen + sizeof(struct rte_mbuf) 866 867 868 869 rte_socket_id(), MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET);870 871 872 snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf"873 "pool failed: %s", strerror(rte_errno));874 875 876 } 877 1106 format_data->pktmbuf_pool = 1107 rte_mempool_create(format_data->mempool_name, 1108 (format_data->nb_rx_buf + format_data->nb_tx_buf + 1), 1109 format_data->snaplen + sizeof(struct rte_mbuf) 1110 + RTE_PKTMBUF_HEADROOM, 1111 128, sizeof(struct rte_pktmbuf_pool_private), 1112 rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL, 1113 cpu_numa_node, 0/*MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET*/); 1114 1115 if (format_data->pktmbuf_pool == NULL) { 1116 snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf" 1117 "pool failed: %s NODE %u", strerror(rte_errno), cpu_numa_node); 1118 return -1; 1119 } 1120 } 1121 878 1122 /* ----------- Now do the setup for the port mapping ------------ */ 879 /* Order of calls must be 1123 /* Order of calls must be 880 1124 * rte_eth_dev_configure() 881 1125 * rte_eth_tx_queue_setup() … … 884 1128 * other rte_eth calls 885 1129 */ 886 1130 1131 1132 port_conf.rx_adv_conf.rss_conf.rss_key = format_data->rss_key; 1133 887 1134 /* This must be called first before another *eth* function 888 1135 * 1 rx, 1 tx queue, port_conf sets checksum stripping etc */ 889 1136 ret = rte_eth_dev_configure(format_data->port, 1, 1, &port_conf); 890 1137 if (ret < 0) { 891 892 893 894 1138 snprintf(err, errlen, "Intel DPDK - Cannot configure device port" 1139 " %"PRIu8" : %s", format_data->port, 1140 strerror(-ret)); 1141 return -1; 895 1142 } 896 1143 /* Initialise the TX queue a minimum value if using this port for … … 898 1145 */ 899 1146 ret = rte_eth_tx_queue_setup(format_data->port, format_data->queue_id, 900 format_data->nb_tx_buf, rte_socket_id(), &tx_conf);1147 format_data->nb_tx_buf, SOCKET_ID_ANY, &tx_conf); 901 1148 if (ret < 0) { 902 903 904 905 1149 snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue on port" 1150 " %"PRIu8" : %s", format_data->port, 1151 strerror(-ret)); 1152 return -1; 906 1153 } 907 1154 /* Initialise the RX queue with some packets from memory */ 908 1155 ret = rte_eth_rx_queue_setup(format_data->port, format_data->queue_id, 909 format_data->nb_rx_buf, rte_socket_id(),910 &rx_conf, format_data->pktmbuf_pool);1156 format_data->nb_rx_buf, cpu_numa_node, 1157 &rx_conf, format_data->pktmbuf_pool); 911 1158 if (ret < 0) { 912 913 914 915 916 } 917 1159 snprintf(err, errlen, "Intel DPDK - Cannot configure RX queue on port" 1160 " %"PRIu8" : %s", format_data->port, 1161 strerror(-ret)); 1162 return -1; 1163 } 1164 918 1165 /* Start device */ 919 1166 ret = rte_eth_dev_start(format_data->port); 920 1167 if (ret < 0) { 921 922 923 1168 snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s", 1169 strerror(-ret)); 1170 return -1; 924 1171 } 925 1172 926 1173 /* Default promiscuous to on */ 927 1174 if (format_data->promisc == -1) 928 929 1175 format_data->promisc = 1; 1176 930 1177 if (format_data->promisc == 1) 931 1178 rte_eth_promiscuous_enable(format_data->port); 932 1179 else 933 rte_eth_promiscuous_disable(format_data->port); 934 935 /* Wait for the link to come up */ 936 rte_eth_link_get(format_data->port, &link_info); 1180 rte_eth_promiscuous_disable(format_data->port); 1181 1182 /* Register a callback for link state changes */ 1183 ret = rte_eth_dev_callback_register(format_data->port, 1184 RTE_ETH_EVENT_INTR_LSC, 1185 dpdk_lsc_callback, 1186 format_data); 1187 /* If this fails it is not a show stopper */ 1188 #if DEBUG 1189 fprintf(stderr, "rte_eth_dev_callback_register failed %d : %s\n", 1190 ret, strerror(-ret)); 1191 #endif 1192 1193 /* Get the current link status */ 1194 rte_eth_link_get_nowait(format_data->port, &link_info); 1195 format_data->link_speed = link_info.link_speed; 937 1196 #if DEBUG 938 1197 fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status, 939 (int) link_info.link_duplex, (int) link_info.link_speed); 940 #endif 941 1198 (int) link_info.link_duplex, (int) link_info.link_speed); 1199 #endif 942 1200 /* We have now successfully started/unpaused */ 943 1201 format_data->paused = DPDK_RUNNING; 944 1202 1203 return 0; 1204 } 1205 1206 /* Attach memory to the port and start (or restart) the port/s. 1207 */ 1208 static int dpdk_start_port_queues (struct dpdk_format_data_t *format_data, char *err, int errlen, uint16_t rx_queues) { 1209 int ret, i; /* Check return values for errors */ 1210 struct rte_eth_link link_info; /* Wait for link */ 1211 1212 /* Already started */ 1213 if (format_data->paused == DPDK_RUNNING) 1214 return 0; 1215 1216 /* First time started we need to alloc our memory, doing this here 1217 * rather than in environment setup because we don't have snaplen then */ 1218 if (format_data->paused == DPDK_NEVER_STARTED) { 1219 if (format_data->snaplen == 0) { 1220 format_data->snaplen = RX_MBUF_SIZE; 1221 port_conf.rxmode.jumbo_frame = 0; 1222 port_conf.rxmode.max_rx_pkt_len = 0; 1223 } else { 1224 /* Use jumbo frames */ 1225 port_conf.rxmode.jumbo_frame = 1; 1226 port_conf.rxmode.max_rx_pkt_len = format_data->snaplen; 1227 } 1228 1229 /* This is additional overhead so make sure we allow space for this */ 1230 #if GET_MAC_CRC_CHECKSUM 1231 format_data->snaplen += ETHER_CRC_LEN; 1232 #endif 1233 #if HAS_HW_TIMESTAMPS_82580 1234 format_data->snaplen += sizeof(struct hw_timestamp_82580); 1235 #endif 1236 1237 /* Create the mbuf pool, which is the place our packets are allocated 1238 * from - TODO figure out if there is a free function (I cannot see one) 1239 * NOTE: RX queue requires nb_packets + 1 otherwise it fails to 1240 * allocate however that extra 1 packet is not used. 1241 * (I assume <= vs < error some where in DPDK code) 1242 * TX requires nb_tx_buffers + 1 in the case the queue is full 1243 * so that will fill the new buffer and wait until slots in the 1244 * ring become available. 1245 */ 1246 #if DEBUG 1247 fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name); 1248 #endif 1249 format_data->pktmbuf_pool = 1250 rte_mempool_create(format_data->mempool_name, 1251 (format_data->nb_rx_buf * rx_queues + format_data->nb_tx_buf + 1)*2, 1252 format_data->snaplen + sizeof(struct rte_mbuf) 1253 + RTE_PKTMBUF_HEADROOM, 1254 128, sizeof(struct rte_pktmbuf_pool_private), 1255 rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL, 1256 format_data->nic_numa_node, 0); 1257 1258 if (format_data->pktmbuf_pool == NULL) { 1259 snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf " 1260 "pool failed: %s", strerror(rte_errno)); 1261 return -1; 1262 } 1263 } 1264 1265 /* ----------- Now do the setup for the port mapping ------------ */ 1266 /* Order of calls must be 1267 * rte_eth_dev_configure() 1268 * rte_eth_tx_queue_setup() 1269 * rte_eth_rx_queue_setup() 1270 * rte_eth_dev_start() 1271 * other rte_eth calls 1272 */ 1273 1274 /* This must be called first before another *eth* function 1275 * 1 rx, 1 tx queue, port_conf sets checksum stripping etc */ 1276 ret = rte_eth_dev_configure(format_data->port, rx_queues, 1, &port_conf); 1277 if (ret < 0) { 1278 snprintf(err, errlen, "Intel DPDK - Cannot configure device port" 1279 " %"PRIu8" : %s", format_data->port, 1280 strerror(-ret)); 1281 return -1; 1282 } 1283 #if DEBUG 1284 fprintf(stderr, "Doing dev configure\n"); 1285 #endif 1286 /* Initialise the TX queue a minimum value if using this port for 1287 * receiving. Otherwise a larger size if writing packets. 1288 */ 1289 ret = rte_eth_tx_queue_setup(format_data->port, format_data->queue_id, 1290 format_data->nb_tx_buf, SOCKET_ID_ANY, &tx_conf); 1291 if (ret < 0) { 1292 snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue on port" 1293 " %"PRIu8" : %s", format_data->port, 1294 strerror(-ret)); 1295 return -1; 1296 } 1297 1298 for (i=0; i < rx_queues; i++) { 1299 #if DEBUG 1300 fprintf(stderr, "Doing queue configure\n"); 1301 #endif 1302 1303 /* Initialise the RX queue with some packets from memory */ 1304 ret = rte_eth_rx_queue_setup(format_data->port, i, 1305 format_data->nb_rx_buf, format_data->nic_numa_node, 1306 &rx_conf, format_data->pktmbuf_pool); 1307 /* Init per_thread data structures */ 1308 format_data->per_lcore[i].port = format_data->port; 1309 format_data->per_lcore[i].queue_id = i; 1310 1311 if (ret < 0) { 1312 snprintf(err, errlen, "Intel DPDK - Cannot configure RX queue on port" 1313 " %"PRIu8" : %s", format_data->port, 1314 strerror(-ret)); 1315 return -1; 1316 } 1317 } 1318 1319 #if DEBUG 1320 fprintf(stderr, "Doing start device\n"); 1321 #endif 1322 /* Start device */ 1323 ret = rte_eth_dev_start(format_data->port); 1324 #if DEBUG 1325 fprintf(stderr, "Done start device\n"); 1326 #endif 1327 if (ret < 0) { 1328 snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s", 1329 strerror(-ret)); 1330 return -1; 1331 } 1332 1333 1334 /* Default promiscuous to on */ 1335 if (format_data->promisc == -1) 1336 format_data->promisc = 1; 1337 1338 if (format_data->promisc == 1) 1339 rte_eth_promiscuous_enable(format_data->port); 1340 else 1341 rte_eth_promiscuous_disable(format_data->port); 1342 1343 1344 /* We have now successfully started/unpased */ 1345 format_data->paused = DPDK_RUNNING; 1346 1347 // Can use remote launch for all 1348 /*RTE_LCORE_FOREACH_SLAVE(i) { 1349 rte_eal_remote_launch(perpkt_threads_entry, (void *)libtrace, i); 1350 }*/ 1351 1352 /* Register a callback for link state changes */ 1353 ret = rte_eth_dev_callback_register(format_data->port, 1354 RTE_ETH_EVENT_INTR_LSC, 1355 dpdk_lsc_callback, 1356 format_data); 1357 /* If this fails it is not a show stopper */ 1358 #if DEBUG 1359 fprintf(stderr, "rte_eth_dev_callback_register failed %d : %s\n", 1360 ret, strerror(-ret)); 1361 #endif 1362 1363 /* Get the current link status */ 1364 rte_eth_link_get_nowait(format_data->port, &link_info); 1365 format_data->link_speed = link_info.link_speed; 1366 #if DEBUG 1367 fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status, 1368 (int) link_info.link_duplex, (int) link_info.link_speed); 1369 struct rte_eth_rss_reta reta_conf = {0}; 1370 reta_conf.mask_lo = ~reta_conf.mask_lo; 1371 reta_conf.mask_hi = ~reta_conf.mask_hi; 1372 int qew = rte_eth_dev_rss_reta_query(format_data->port, &reta_conf); 1373 fprintf(stderr, "err=%d", qew); 1374 for (i = 0; i < ETH_RSS_RETA_NUM_ENTRIES; i++) { 1375 fprintf(stderr, "[%d] = %d\n", i, (int)reta_conf.reta[i]); 1376 } 1377 1378 #endif 1379 945 1380 return 0; 946 1381 } … … 951 1386 952 1387 if (dpdk_start_port(FORMAT(libtrace), err, sizeof(err)) != 0) { 953 954 955 956 1388 trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err); 1389 free(libtrace->format_data); 1390 libtrace->format_data = NULL; 1391 return -1; 957 1392 } 958 1393 return 0; 1394 } 1395 1396 static inline size_t dpdk_get_max_rx_queues (uint8_t port_id) { 1397 struct rte_eth_dev_info dev_info; 1398 rte_eth_dev_info_get(port_id, &dev_info); 1399 return dev_info.max_rx_queues; 1400 } 1401 1402 static inline size_t dpdk_processor_count () { 1403 long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN); 1404 if (nb_cpu <= 0) 1405 return 1; 1406 else 1407 return (size_t) nb_cpu; 1408 } 1409 1410 static int dpdk_pstart_input (libtrace_t *libtrace) { 1411 char err[500]; 1412 int i=0, phys_cores=0; 1413 int tot = libtrace->perpkt_thread_count; 1414 err[0] = 0; 1415 1416 if (rte_lcore_id() != rte_get_master_lcore()) 1417 fprintf(stderr, "Warning dpdk_pstart_input should be called from the master DPDK thread!\n"); 1418 1419 // If the master is not on the last thread we move it there 1420 if (rte_get_master_lcore() != RTE_MAX_LCORE - 1) { 1421 // Consider error handling here 1422 dpdk_move_master_lcore(RTE_MAX_LCORE - 1); 1423 } 1424 1425 // Don't exceed the number of cores in the system/detected by dpdk 1426 // We don't have to force this but performance wont be good if we don't 1427 for (i = 0; i < RTE_MAX_LCORE; ++i) { 1428 if (lcore_config[i].detected) { 1429 if (rte_lcore_is_enabled(i)) 1430 fprintf(stderr, "Found core %d already in use!\n", i); 1431 else 1432 phys_cores++; 1433 } 1434 } 1435 1436 tot = MIN(libtrace->perpkt_thread_count, dpdk_get_max_rx_queues(FORMAT(libtrace)->port)); 1437 tot = MIN(tot, phys_cores); 1438 1439 fprintf(stderr, "Running pstart DPDK tot=%d req=%d phys=%d\n", tot, libtrace->perpkt_thread_count, phys_cores); 1440 1441 if (dpdk_start_port_queues(FORMAT(libtrace), err, sizeof(err), tot) != 0) { 1442 trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err); 1443 free(libtrace->format_data); 1444 libtrace->format_data = NULL; 1445 return -1; 1446 } 1447 1448 // Make sure we only start the number that we should 1449 libtrace->perpkt_thread_count = tot; 1450 return 0; 1451 } 1452 1453 1454 /** 1455 * Register a thread with the DPDK system, 1456 * When we start DPDK in parallel libtrace we move the 'main thread' to the 1457 * MAXIMUM CPU core slot (32) and remove any affinity restrictions DPDK 1458 * gives it. 1459 * 1460 * We then allow a mapper thread to be started on every real core as DPDK would, 1461 * we also bind these to the corresponding CPU cores. 1462 * 1463 * @param libtrace A pointer to the trace 1464 * @param reading True if the thread will be used to read packets, i.e. will 1465 * call pread_packet(), false if thread used to process packet 1466 * in any other manner including statistics functions. 1467 */ 1468 static int dpdk_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t, bool reading) 1469 { 1470 struct rte_config *cfg = rte_eal_get_configuration(); 1471 int i; 1472 int new_id = -1; 1473 1474 // If 'reading packets' fill in cores from 0 up and bind affinity 1475 // otherwise start from the MAX core (which is also the master) and work backwards 1476 // in this case physical cores on the system will not exist so we don't bind 1477 // these to any particular physical core 1478 pthread_mutex_lock(&libtrace->libtrace_lock); 1479 if (reading) { 1480 #if HAVE_LIBNUMA 1481 for (i = 0; i < RTE_MAX_LCORE; ++i) { 1482 if (!rte_lcore_is_enabled(i) && numa_node_of_cpu(i) == FORMAT(libtrace)->nic_numa_node) { 1483 new_id = i; 1484 if (!lcore_config[i].detected) 1485 new_id = -1; 1486 break; 1487 } 1488 } 1489 #endif 1490 /* Retry without the the numa restriction */ 1491 if (new_id == -1) { 1492 for (i = 0; i < RTE_MAX_LCORE; ++i) { 1493 if (!rte_lcore_is_enabled(i)) { 1494 new_id = i; 1495 if (!lcore_config[i].detected) 1496 fprintf(stderr, "Warning the number of 'reading' threads exceed cores on machine!!\n"); 1497 break; 1498 } 1499 } 1500 } 1501 } else { 1502 for (i = RTE_MAX_LCORE-1; i >= 0; --i) { 1503 if (!rte_lcore_is_enabled(i)) { 1504 new_id = i; 1505 break; 1506 } 1507 } 1508 } 1509 1510 if (new_id == -1) { 1511 assert(cfg->lcore_count == RTE_MAX_LCORE); 1512 // TODO proper libtrace style error here!! 1513 fprintf(stderr, "Too many threads for DPDK!!\n"); 1514 pthread_mutex_unlock(&libtrace->libtrace_lock); 1515 return -1; 1516 } 1517 1518 // Enable the core in global DPDK structs 1519 cfg->lcore_role[new_id] = ROLE_RTE; 1520 cfg->lcore_count++; 1521 // Set TLS to reflect our new number 1522 assert(rte_lcore_id() == 0); // I think new threads are going get a default thread number of 0 1523 fprintf(stderr, "original id%d", rte_lcore_id()); 1524 RTE_PER_LCORE(_lcore_id) = new_id; 1525 char name[99]; 1526 pthread_getname_np(pthread_self(), 1527 name, sizeof(name)); 1528 1529 fprintf(stderr, "%s new id%d\n", name, rte_lcore_id()); 1530 1531 if (reading) { 1532 // Set affinity bind to corresponding core 1533 cpu_set_t cpuset; 1534 CPU_ZERO(&cpuset); 1535 CPU_SET(rte_lcore_id(), &cpuset); 1536 i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset); 1537 if (i != 0) { 1538 fprintf(stderr, "Warning pthread_setaffinity_np failed\n"); 1539 pthread_mutex_unlock(&libtrace->libtrace_lock); 1540 return -1; 1541 } 1542 } 1543 1544 // Map our TLS to the thread data 1545 if (reading) { 1546 if(t->type == THREAD_PERPKT) { 1547 t->format_data = &FORMAT(libtrace)->per_lcore[t->perpkt_num]; 1548 } else { 1549 t->format_data = &FORMAT(libtrace)->per_lcore[0]; 1550 } 1551 } 1552 pthread_mutex_unlock(&libtrace->libtrace_lock); 1553 return 0; 1554 } 1555 1556 1557 /** 1558 * Unregister a thread with the DPDK system. 1559 * 1560 * Only previously registered threads should be calling this just before 1561 * they are destroyed. 1562 */ 1563 static void dpdk_punregister_thread(libtrace_t *libtrace UNUSED, libtrace_thread_t *t UNUSED) 1564 { 1565 struct rte_config *cfg = rte_eal_get_configuration(); 1566 1567 assert(rte_lcore_id() < RTE_MAX_LCORE); 1568 pthread_mutex_lock(&libtrace->libtrace_lock); 1569 // Skip if master!! 1570 if (rte_lcore_id() == rte_get_master_lcore()) { 1571 fprintf(stderr, "INFO: we are skipping unregistering the master lcore\n"); 1572 pthread_mutex_unlock(&libtrace->libtrace_lock); 1573 return; 1574 } 1575 1576 // Disable this core in global DPDK structs 1577 cfg->lcore_role[rte_lcore_id()] = ROLE_OFF; 1578 cfg->lcore_count--; 1579 RTE_PER_LCORE(_lcore_id) = -1; // Might make the world burn if used again 1580 assert(cfg->lcore_count >= 1); // We cannot unregister the master LCORE!! 1581 pthread_mutex_unlock(&libtrace->libtrace_lock); 1582 return; 959 1583 } 960 1584 … … 963 1587 char err[500]; 964 1588 err[0] = 0; 965 1589 966 1590 if (dpdk_start_port(FORMAT(libtrace), err, sizeof(err)) != 0) { 967 968 969 970 1591 trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err); 1592 free(libtrace->format_data); 1593 libtrace->format_data = NULL; 1594 return -1; 971 1595 } 972 1596 return 0; 973 1597 } 974 1598 975 static int dpdk_pause_input(libtrace_t * libtrace) {1599 static int dpdk_pause_input(libtrace_t * libtrace) { 976 1600 /* This stops the device, but can be restarted using rte_eth_dev_start() */ 977 1601 if (FORMAT(libtrace)->paused == DPDK_RUNNING) { 978 #if DEBUG 979 fprintf(stderr, "Pausing port\n"); 980 #endif 981 rte_eth_dev_stop(FORMAT(libtrace)->port); 982 FORMAT(libtrace)->paused = DPDK_PAUSED; 983 /* If we pause it the driver will be reset and likely our counter */ 1602 #if DEBUG 1603 fprintf(stderr, "Pausing DPDK port\n"); 1604 #endif 1605 rte_eth_dev_stop(FORMAT(libtrace)->port); 1606 FORMAT(libtrace)->paused = DPDK_PAUSED; 1607 /* Empty the queue of packets */ 1608 for (; FORMAT(libtrace)->burst_offset < FORMAT(libtrace)->burst_size; ++FORMAT(libtrace)->burst_offset) { 1609 rte_pktmbuf_free(FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset]); 1610 } 1611 FORMAT(libtrace)->burst_offset = 0; 1612 FORMAT(libtrace)->burst_size = 0; 1613 /* If we pause it the driver will be reset and likely our counter */ 1614 1615 FORMAT(libtrace)->per_lcore[0].ts_last_sys = 0; 984 1616 #if HAS_HW_TIMESTAMPS_82580 985 FORMAT(libtrace)->ts_first_sys = 0; 986 FORMAT(libtrace)->ts_last_sys = 0; 1617 FORMAT(libtrace)->per_lcore[0].ts_first_sys = 0; 987 1618 #endif 988 1619 } … … 990 1621 } 991 1622 992 static int dpdk_write_packet(libtrace_out_t *trace, 1623 static int dpdk_write_packet(libtrace_out_t *trace, 993 1624 libtrace_packet_t *packet){ 994 1625 struct rte_mbuf* m_buff[1]; 995 1626 996 1627 int wirelen = trace_get_wire_length(packet); 997 1628 int caplen = trace_get_capture_length(packet); 998 1629 999 1630 /* Check for a checksum and remove it */ 1000 1631 if (trace_get_link_type(packet) == TRACE_TYPE_ETH && 1001 1002 1632 wirelen == caplen) 1633 caplen -= ETHER_CRC_LEN; 1003 1634 1004 1635 m_buff[0] = rte_pktmbuf_alloc(FORMAT(trace)->pktmbuf_pool); 1005 1636 if (m_buff[0] == NULL) { 1006 1007 1637 trace_set_err_out(trace, errno, "Cannot get an empty packet buffer"); 1638 return -1; 1008 1639 } else { 1009 1010 1011 1012 1013 1640 int ret; 1641 memcpy(rte_pktmbuf_append(m_buff[0], caplen), packet->payload, caplen); 1642 do { 1643 ret = rte_eth_tx_burst(FORMAT(trace)->queue_id, FORMAT(trace)->port, m_buff, 1); 1644 } while (ret != 1); 1014 1645 } 1015 1646 … … 1020 1651 /* Free our memory structures */ 1021 1652 if (libtrace->format_data != NULL) { 1022 /* Close the device completely, device cannot be restarted */ 1023 if (FORMAT(libtrace)->port != 0xFF) 1024 rte_eth_dev_close(FORMAT(libtrace)->port); 1025 /* filter here if we used it */ 1653 /* Close the device completely, device cannot be restarted */ 1654 if (FORMAT(libtrace)->port != 0xFF) 1655 rte_eth_dev_callback_unregister(FORMAT(libtrace)->port, 1656 RTE_ETH_EVENT_INTR_LSC, 1657 dpdk_lsc_callback, 1658 FORMAT(libtrace)); 1659 rte_eth_dev_close(FORMAT(libtrace)->port); 1660 /* filter here if we used it */ 1026 1661 free(libtrace->format_data); 1027 1662 } … … 1037 1672 /* Free our memory structures */ 1038 1673 if (libtrace->format_data != NULL) { 1039 1040 1041 1042 1674 /* Close the device completely, device cannot be restarted */ 1675 if (FORMAT(libtrace)->port != 0xFF) 1676 rte_eth_dev_close(FORMAT(libtrace)->port); 1677 /* filter here if we used it */ 1043 1678 free(libtrace->format_data); 1044 1679 } … … 1050 1685 } 1051 1686 1052 /** 1053 * Get the start of additional header that we added to a packet.1687 /** 1688 * Get the start of the additional header that we added to a packet. 1054 1689 */ 1055 1690 static inline struct dpdk_addt_hdr * get_addt_hdr (const libtrace_packet_t *packet) { 1056 uint8_t *hdrsize;1057 1691 assert(packet); 1058 1692 assert(packet->buffer); 1059 hdrsize = (uint8_t *) MBUF_PKTDATA(packet->buffer); 1060 /* The byte before the original packet data denotes the size in bytes 1061 * of our additional header that we added sits before the 'size byte' */ 1062 hdrsize--; 1063 return (struct dpdk_addt_hdr *) (hdrsize - *hdrsize); 1693 /* Our header sits straight after the mbuf header */ 1694 return (struct dpdk_addt_hdr *) ((struct rte_mbuf*) packet->buffer + 1); 1064 1695 } 1065 1696 … … 1072 1703 struct dpdk_addt_hdr * hdr = get_addt_hdr(packet); 1073 1704 if (size > hdr->cap_len) { 1074 1705 /* Cannot make a packet bigger */ 1075 1706 return trace_get_capture_length(packet); 1076 1707 } … … 1086 1717 int org_cap_size; /* The original capture size */ 1087 1718 if (hdr->flags & INCLUDES_HW_TIMESTAMP) { 1088 org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) - 1089 (MBUF_PKTDATA(packet->buffer) - (char *) hdr) - 1090 sizeof(struct hw_timestamp_82580); 1719 org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) - 1720 sizeof(struct hw_timestamp_82580); 1091 1721 } else { 1092 org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) - 1093 (MBUF_PKTDATA(packet->buffer) - (char *) hdr); 1722 org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)); 1094 1723 } 1095 1724 if (hdr->flags & INCLUDES_CHECKSUM) { 1096 1725 return org_cap_size; 1097 1726 } else { 1098 1099 1727 /* DPDK packets are always TRACE_TYPE_ETH packets */ 1728 return org_cap_size + ETHER_CRC_LEN; 1100 1729 } 1101 1730 } … … 1103 1732 struct dpdk_addt_hdr * hdr = get_addt_hdr(packet); 1104 1733 if (hdr->flags & INCLUDES_HW_TIMESTAMP) 1105 1106 1734 return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM + 1735 sizeof(struct hw_timestamp_82580); 1107 1736 else 1108 1737 return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM; 1109 1738 } 1110 1739 … … 1114 1743 assert(packet); 1115 1744 if (packet->buffer != buffer && 1116 1117 1745 packet->buf_control == TRACE_CTRL_PACKET) { 1746 free(packet->buffer); 1118 1747 } 1119 1748 1120 1749 if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) { 1121 1750 packet->buf_control = TRACE_CTRL_PACKET; 1122 1751 } else 1123 1752 packet->buf_control = TRACE_CTRL_EXTERNAL; 1124 1753 1125 1754 packet->buffer = buffer; … … 1132 1761 } 1133 1762 1763 1764 /** 1765 * Given a packet size and a link speed, computes the 1766 * time to transmit in nanoseconds. 1767 * 1768 * @param format_data The dpdk format data from which we get the link speed 1769 * and if unset updates it in a thread safe manner 1770 * @param pkt_size The size of the packet in bytes 1771 * @return The wire time in nanoseconds 1772 */ 1773 static inline uint32_t calculate_wire_time(struct dpdk_format_data_t* format_data, uint32_t pkt_size) { 1774 uint32_t wire_time; 1775 /* 20 extra bytes of interframe gap and preamble */ 1776 # if GET_MAC_CRC_CHECKSUM 1777 wire_time = ((pkt_size + 20) * 8000); 1778 # else 1779 wire_time = ((pkt_size + 20 + ETHER_CRC_LEN) * 8000); 1780 # endif 1781 1782 /* Division is really slow and introduces a pipeline stall 1783 * The compiler will optimise this into magical multiplication and shifting 1784 * See http://ridiculousfish.com/blog/posts/labor-of-division-episode-i.html 1785 */ 1786 retry_calc_wiretime: 1787 switch (format_data->link_speed) { 1788 case ETH_LINK_SPEED_40G: 1789 wire_time /= ETH_LINK_SPEED_40G; 1790 break; 1791 case ETH_LINK_SPEED_20G: 1792 wire_time /= ETH_LINK_SPEED_20G; 1793 break; 1794 case ETH_LINK_SPEED_10G: 1795 wire_time /= ETH_LINK_SPEED_10G; 1796 break; 1797 case ETH_LINK_SPEED_1000: 1798 wire_time /= ETH_LINK_SPEED_1000; 1799 break; 1800 case 0: 1801 { 1802 /* Maybe the link was down originally, but now it should be up */ 1803 struct rte_eth_link link = {0}; 1804 rte_eth_link_get_nowait(format_data->port, &link); 1805 if (link.link_status && link.link_speed) { 1806 format_data->link_speed = link.link_speed; 1807 #ifdef DEBUG 1808 fprintf(stderr, "Link has come up updated speed=%d\n", (int) link.link_speed); 1809 #endif 1810 goto retry_calc_wiretime; 1811 } 1812 /* We don't know the link speed, make sure numbers are counting up */ 1813 wire_time = 1; 1814 break; 1815 } 1816 default: 1817 wire_time /= format_data->link_speed; 1818 } 1819 return wire_time; 1820 } 1821 1822 1823 1134 1824 /* 1135 * Does any extra preperation to a captured packet. 1136 * This includes adding our extra header to it with the timestamp 1137 */ 1138 static inline int dpdk_ready_pkt(libtrace_t *libtrace, libtrace_packet_t *packet, 1139 struct rte_mbuf* pkt){ 1140 uint8_t * hdr_size; 1141 struct dpdk_addt_hdr *hdr; 1825 * Does any extra preperation to all captured packets 1826 * This includes adding our extra header to it with the timestamp, 1827 * and any snapping 1828 * 1829 * @param format_data The DPDK format data 1830 * @param plc The DPDK per lcore format data 1831 * @param pkts An array of size nb_pkts of DPDK packets 1832 * @param nb_pkts The number of packets in pkts and optionally packets 1833 * @param packets Optional - If not null nb_pkts of libtrace packets which will be prepared 1834 */ 1835 static inline void dpdk_ready_pkts(struct dpdk_format_data_t *format_data, struct dpdk_per_lcore_t *plc, 1836 struct rte_mbuf **pkts, size_t nb_pkts, libtrace_packet_t **packets) { 1837 struct dpdk_addt_hdr *hdr; 1838 size_t i; 1839 uint64_t cur_sys_time_ns; 1142 1840 #if HAS_HW_TIMESTAMPS_82580 1143 struct hw_timestamp_82580 *hw_ts; 1144 struct timeval cur_sys_time; 1145 uint64_t cur_sys_time_ns; 1146 uint64_t estimated_wraps; 1147 1148 /* Using gettimeofday because it's most likely to be a vsyscall 1149 * We don't want to slow down anything with systemcalls we dont need 1150 * accauracy */ 1151 gettimeofday(&cur_sys_time, NULL); 1841 struct hw_timestamp_82580 *hw_ts; 1842 uint64_t estimated_wraps; 1152 1843 #else 1153 # if USE_CLOCK_GETTIME 1154 struct timespec cur_sys_time; 1155 1156 /* This looks terrible and I feel bad doing it. But it's OK 1157 * on new kernels, because this is a vsyscall */ 1158 clock_gettime(CLOCK_REALTIME, &cur_sys_time); 1159 # else 1160 struct timeval cur_sys_time; 1161 /* Should be a vsyscall */ 1162 gettimeofday(&cur_sys_time, NULL); 1163 # endif 1164 #endif 1165 1166 /* Record the size of our header */ 1167 hdr_size = (uint8_t *) rte_pktmbuf_prepend(pkt, sizeof(uint8_t)); 1168 *hdr_size = sizeof(struct dpdk_addt_hdr); 1169 /* Now put our header in front of that size */ 1170 hdr = (struct dpdk_addt_hdr *) rte_pktmbuf_prepend(pkt, sizeof(struct dpdk_addt_hdr)); 1171 memset(hdr, 0, sizeof(struct dpdk_addt_hdr)); 1172 1844 1845 #endif 1846 1847 #if USE_CLOCK_GETTIME 1848 struct timespec cur_sys_time = {0}; 1849 /* This looks terrible and I feel bad doing it. But it's OK 1850 * on new kernels, because this is a fast vsyscall */ 1851 clock_gettime(CLOCK_REALTIME, &cur_sys_time); 1852 cur_sys_time_ns = TS_TO_NS(cur_sys_time); 1853 #else 1854 struct timeval cur_sys_time = {0}; 1855 /* Also a fast vsyscall */ 1856 gettimeofday(&cur_sys_time, NULL); 1857 cur_sys_time_ns = TV_TO_NS(cur_sys_time); 1858 #endif 1859 1860 /* The system clock is not perfect so when running 1861 * at linerate we could timestamp a packet in the past. 1862 * To avoid this we munge the timestamp to appear 1ns 1863 * after the previous packet. We should eventually catch up 1864 * to system time since a 64byte packet on a 10G link takes 67ns. 1865 * 1866 * Note with parallel readers timestamping packets 1867 * with duplicate stamps or out of order is unavoidable without 1868 * hardware timestamping from the NIC. 1869 */ 1870 #if !HAS_HW_TIMESTAMPS_82580 1871 if (plc->ts_last_sys >= cur_sys_time_ns) { 1872 cur_sys_time_ns = plc->ts_last_sys + 1; 1873 } 1874 #endif 1875 1876 assert(RTE_PKTMBUF_HEADROOM >= sizeof(struct dpdk_addt_hdr)); // TODO static compile time assert sometime?? 1877 for (i = 0 ; i < nb_pkts ; ++i) { 1878 1879 /* We put our header straight after the dpdk header */ 1880 hdr = (struct dpdk_addt_hdr *) (pkts[i] + 1); 1881 memset(hdr, 0, sizeof(struct dpdk_addt_hdr)); 1882 1173 1883 #if GET_MAC_CRC_CHECKSUM 1884 <<<<<<< HEAD 1885 /* Add back in the CRC sum */ 1886 pkts[i]->pkt.pkt_len += ETHER_CRC_LEN; 1887 pkts[i]->pkt.data_len += ETHER_CRC_LEN; 1888 hdr->flags |= INCLUDES_CHECKSUM; 1889 ======= 1174 1890 /* Add back in the CRC sum */ 1175 1891 rte_pktmbuf_pkt_len(pkt) += ETHER_CRC_LEN; 1176 1892 rte_pktmbuf_data_len(pkt) += ETHER_CRC_LEN; 1177 1893 hdr->flags |= INCLUDES_CHECKSUM; 1178 #endif 1894 >>>>>>> master 1895 #endif 1896 1897 hdr->cap_len = rte_pktmbuf_pkt_len(pkts[i]); 1179 1898 1180 1899 #if HAS_HW_TIMESTAMPS_82580 1181 /* Taken from igb_ptp.c part of Intel Linux drivers (Good example code) 1182 * 1183 * +----------+---+ +--------------+ 1184 * 82580 | 24 | 8 | | 32 | 1185 * +----------+---+ +--------------+ 1186 * reserved \______ 40 bits _____/ 1187 * 1188 * The 40 bit 82580 SYSTIM overflows every 1189 * 2^40 * 10^-9 / 60 = 18.3 minutes. 1190 * 1191 * NOTE picture is in Big Endian order, in memory it's acutally in Little 1192 * Endian (for the full 64 bits) i.e. picture is mirrored 1193 */ 1194 1195 /* The timestamp is sitting before our packet and is included in pkt_len */ 1196 hdr->flags |= INCLUDES_HW_TIMESTAMP; 1197 hw_ts = (struct hw_timestamp_82580 *) MBUF_PKTDATA(pkt); 1198 1199 /* Despite what the documentation says this is in Little 1200 * Endian byteorder. Mask the reserved section out. 1201 */ 1202 hdr->timestamp = le64toh(hw_ts->timestamp) & 1203 ~(((~0ull)>>TS_NBITS_82580)<<TS_NBITS_82580); 1204 1205 cur_sys_time_ns = TV_TO_NS(cur_sys_time); 1206 if (unlikely(FORMAT(libtrace)->ts_first_sys == 0)) { 1207 FORMAT(libtrace)->ts_first_sys = cur_sys_time_ns - hdr->timestamp; 1208 FORMAT(libtrace)->ts_last_sys = FORMAT(libtrace)->ts_first_sys; 1209 } 1210 1211 /* This will have serious problems if packets aren't read quickly 1212 * that is within a couple of seconds because our clock cycles every 1213 * 18 seconds */ 1214 estimated_wraps = (cur_sys_time_ns - FORMAT(libtrace)->ts_last_sys) 1215 / (1ull<<TS_NBITS_82580); 1216 1217 /* Estimated_wraps gives the number of times the counter should have 1218 * wrapped (however depending on value last time it could have wrapped 1219 * twice more (if hw clock is close to its max value) or once less (allowing 1220 * for a bit of variance between hw and sys clock). But if the clock 1221 * shouldn't have wrapped once then don't allow it to go backwards in time */ 1222 if (unlikely(estimated_wraps >= 2)) { 1223 /* 2 or more wrap arounds add all but the very last wrap */ 1224 FORMAT(libtrace)->wrap_count += estimated_wraps - 1; 1225 } 1226 1227 /* Set the timestamp to the lowest possible value we're considering */ 1228 hdr->timestamp += FORMAT(libtrace)->ts_first_sys + 1229 FORMAT(libtrace)->wrap_count * (1ull<<TS_NBITS_82580); 1230 1231 /* In most runs only the first if() will need evaluating - i.e our 1232 * estimate is correct. */ 1233 if (unlikely(!WITHIN_VARIANCE(cur_sys_time_ns, 1234 hdr->timestamp, MAXSKEW_82580))) { 1235 /* Failed to match estimated_wraps-1 (or estimated_wraps in ==0 case) */ 1236 FORMAT(libtrace)->wrap_count++; 1237 hdr->timestamp += (1ull<<TS_NBITS_82580); 1238 if (!WITHIN_VARIANCE(cur_sys_time_ns, 1239 hdr->timestamp, MAXSKEW_82580)) { 1240 /* Failed to match estimated_wraps */ 1241 FORMAT(libtrace)->wrap_count++; 1242 hdr->timestamp += (1ull<<TS_NBITS_82580); 1243 if (!WITHIN_VARIANCE(cur_sys_time_ns, 1244 hdr->timestamp, MAXSKEW_82580)) { 1245 if (estimated_wraps == 0) { 1246 /* 0 case Failed to match estimated_wraps+2 */ 1247 printf("WARNING - Hardware Timestamp failed to" 1248 " match using systemtime!\n"); 1249 hdr->timestamp = cur_sys_time_ns; 1250 } else { 1251 /* Failed to match estimated_wraps+1 */ 1252 FORMAT(libtrace)->wrap_count++; 1253 hdr->timestamp += (1ull<<TS_NBITS_82580); 1254 if (!WITHIN_VARIANCE(cur_sys_time_ns, 1255 hdr->timestamp, MAXSKEW_82580)) { 1256 /* Failed to match estimated_wraps+2 */ 1257 printf("WARNING - Hardware Timestamp failed to" 1258 " match using systemtime!!\n"); 1259 } 1260 } 1261 } 1262 } 1263 } 1264 1265 /* Log our previous for the next loop */ 1266 FORMAT(libtrace)->ts_last_sys = TV_TO_NS(cur_sys_time); 1267 1900 /* The timestamp is sitting before our packet and is included in pkt_len */ 1901 hdr->flags |= INCLUDES_HW_TIMESTAMP; 1902 hdr->cap_len -= sizeof(struct hw_timestamp_82580); 1903 hw_ts = (struct hw_timestamp_82580 *) MBUF_PKTDATA(pkts[i]); 1904 1905 /* Taken from igb_ptp.c part of Intel Linux drivers (Good example code) 1906 * 1907 * +----------+---+ +--------------+ 1908 * 82580 | 24 | 8 | | 32 | 1909 * +----------+---+ +--------------+ 1910 * reserved \______ 40 bits _____/ 1911 * 1912 * The 40 bit 82580 SYSTIM overflows every 1913 * 2^40 * 10^-9 / 60 = 18.3 minutes. 1914 * 1915 * NOTE picture is in Big Endian order, in memory it's acutally in Little 1916 * Endian (for the full 64 bits) i.e. picture is mirrored 1917 */ 1918 1919 /* Despite what the documentation says this is in Little 1920 * Endian byteorder. Mask the reserved section out. 1921 */ 1922 hdr->timestamp = le64toh(hw_ts->timestamp) & 1923 ~(((~0ull)>>TS_NBITS_82580)<<TS_NBITS_82580); 1924 1925 if (unlikely(plc->ts_first_sys == 0)) { 1926 plc->ts_first_sys = cur_sys_time_ns - hdr->timestamp; 1927 plc->ts_last_sys = plc->ts_first_sys; 1928 } 1929 1930 /* This will have serious problems if packets aren't read quickly 1931 * that is within a couple of seconds because our clock cycles every 1932 * 18 seconds */ 1933 estimated_wraps = (cur_sys_time_ns - plc->ts_last_sys) 1934 / (1ull<<TS_NBITS_82580); 1935 1936 /* Estimated_wraps gives the number of times the counter should have 1937 * wrapped (however depending on value last time it could have wrapped 1938 * twice more (if hw clock is close to its max value) or once less (allowing 1939 * for a bit of variance between hw and sys clock). But if the clock 1940 * shouldn't have wrapped once then don't allow it to go backwards in time */ 1941 if (unlikely(estimated_wraps >= 2)) { 1942 /* 2 or more wrap arounds add all but the very last wrap */ 1943 plc->wrap_count += estimated_wraps - 1; 1944 } 1945 1946 /* Set the timestamp to the lowest possible value we're considering */ 1947 hdr->timestamp += plc->ts_first_sys + 1948 plc->wrap_count * (1ull<<TS_NBITS_82580); 1949 1950 /* In most runs only the first if() will need evaluating - i.e our 1951 * estimate is correct. */ 1952 if (unlikely(!WITHIN_VARIANCE(cur_sys_time_ns, 1953 hdr->timestamp, MAXSKEW_82580))) { 1954 /* Failed to match estimated_wraps-1 (or estimated_wraps in ==0 case) */ 1955 plc->wrap_count++; 1956 hdr->timestamp += (1ull<<TS_NBITS_82580); 1957 if (!WITHIN_VARIANCE(cur_sys_time_ns, 1958 hdr->timestamp, MAXSKEW_82580)) { 1959 /* Failed to match estimated_wraps */ 1960 plc->wrap_count++; 1961 hdr->timestamp += (1ull<<TS_NBITS_82580); 1962 if (!WITHIN_VARIANCE(cur_sys_time_ns, 1963 hdr->timestamp, MAXSKEW_82580)) { 1964 if (estimated_wraps == 0) { 1965 /* 0 case Failed to match estimated_wraps+2 */ 1966 printf("WARNING - Hardware Timestamp failed to" 1967 " match using systemtime!\n"); 1968 hdr->timestamp = cur_sys_time_ns; 1969 } else { 1970 /* Failed to match estimated_wraps+1 */ 1971 plc->wrap_count++; 1972 hdr->timestamp += (1ull<<TS_NBITS_82580); 1973 if (!WITHIN_VARIANCE(cur_sys_time_ns, 1974 hdr->timestamp, MAXSKEW_82580)) { 1975 /* Failed to match estimated_wraps+2 */ 1976 printf("WARNING - Hardware Timestamp failed to" 1977 " match using systemtime!!\n"); 1978 } 1979 } 1980 } 1981 } 1982 } 1268 1983 #else 1269 # if USE_CLOCK_GETTIME 1270 hdr->timestamp = TS_TO_NS(cur_sys_time); 1271 # else 1272 hdr->timestamp = TV_TO_NS(cur_sys_time); 1273 # endif 1274 #endif 1275 1276 /* Intels samples prefetch into level 0 cache lets assume it is a good 1277 * idea and do the same */ 1278 rte_prefetch0(rte_pktmbuf_mtod(pkt, void *)); 1279 packet->buffer = pkt; 1280 dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0); 1281 1282 /* Set our capture length for the first time */ 1283 hdr->cap_len = dpdk_get_wire_length(packet); 1284 if (!(hdr->flags & INCLUDES_CHECKSUM)) { 1285 hdr->cap_len -= ETHER_CRC_LEN; 1286 } 1287 1288 1289 return dpdk_get_framing_length(packet) + 1290 dpdk_get_capture_length(packet); 1291 } 1984 1985 hdr->timestamp = cur_sys_time_ns; 1986 /* Offset the next packet by the wire time of previous */ 1987 calculate_wire_time(format_data, hdr->cap_len); 1988 1989 #endif 1990 if(packets) { 1991 packets[i]->buffer = pkts[i]; 1992 packets[i]->header = pkts[i]; 1993 #if HAS_HW_TIMESTAMPS_82580 1994 packets[i]->payload = (char *) pkts[i] + sizeof(struct rte_mbuf) + 1995 RTE_PKTMBUF_HEADROOM + sizeof(struct hw_timestamp_82580); 1996 #else 1997 packets[i]->payload = (char *) pkts[i] + sizeof(struct rte_mbuf) + 1998 RTE_PKTMBUF_HEADROOM; 1999 #endif 2000 packets[i]->error = 1; 2001 } 2002 } 2003 2004 plc->ts_last_sys = cur_sys_time_ns; 2005 2006 return; 2007 } 2008 2009 2010 static void dpdk_fin_packet(libtrace_packet_t *packet) 2011 { 2012 if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) { 2013 rte_pktmbuf_free(packet->buffer); 2014 packet->buffer = NULL; 2015 } 2016 } 2017 1292 2018 1293 2019 static int dpdk_read_packet (libtrace_t *libtrace, libtrace_packet_t *packet) { 1294 int nb_rx; /* Number of rx packets we've recevied */ 1295 struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) */ 2020 int nb_rx; /* Number of rx packets we've received */ 1296 2021 1297 2022 /* Free the last packet buffer */ 1298 2023 if (packet->buffer != NULL) { 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 } 1310 2024 /* Buffer is owned by DPDK */ 2025 if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) { 2026 rte_pktmbuf_free(packet->buffer); 2027 packet->buffer = NULL; 2028 } else 2029 /* Buffer is owned by packet i.e. has been malloc'd */ 2030 if (packet->buf_control == TRACE_CTRL_PACKET) { 2031 free(packet->buffer); 2032 packet->buffer = NULL; 2033 } 2034 } 2035 1311 2036 packet->buf_control = TRACE_CTRL_EXTERNAL; 1312 2037 packet->type = TRACE_RT_DATA_DPDK; 1313 2038 2039 /* Check if we already have some packets buffered */ 2040 if (FORMAT(libtrace)->burst_size != FORMAT(libtrace)->burst_offset) { 2041 packet->buffer = FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset++]; 2042 dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0); 2043 return 1; // TODO should be bytes read, which essentially useless anyway 2044 } 1314 2045 /* Wait for a packet */ 1315 2046 while (1) { 1316 /* Poll for a single packet */ 1317 nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port, 1318 FORMAT(libtrace)->queue_id, pkts_burst, 1); 1319 if (nb_rx > 0) { /* Got a packet - otherwise we keep spining */ 1320 return dpdk_ready_pkt(libtrace, packet, pkts_burst[0]); 1321 } 1322 if (libtrace_halt) { 1323 return 0; 1324 } 1325 } 1326 2047 /* Poll for a single packet */ 2048 nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port, 2049 FORMAT(libtrace)->queue_id, FORMAT(libtrace)->burst_pkts, BURST_SIZE); 2050 if (nb_rx > 0) { /* Got a packet - otherwise we keep spining */ 2051 FORMAT(libtrace)->burst_size = nb_rx; 2052 FORMAT(libtrace)->burst_offset = 1; 2053 dpdk_ready_pkts(FORMAT(libtrace), &FORMAT(libtrace)->per_lcore[0], FORMAT(libtrace)->burst_pkts, nb_rx, NULL); 2054 packet->buffer = FORMAT(libtrace)->burst_pkts[0]; 2055 dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0); 2056 return 1; // TODO should be bytes read, which essentially useless anyway 2057 } 2058 if (libtrace_halt) { 2059 return 0; 2060 } 2061 /* Wait a while, polling on memory degrades performance 2062 * This relieves the pressure on memory allowing the NIC to DMA */ 2063 rte_delay_us(10); 2064 } 2065 2066 /* We'll never get here - but if we did it would be bad */ 2067 return -1; 2068 } 2069 2070 static int dpdk_pread_packets (libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packets, size_t nb_packets) { 2071 size_t nb_rx; /* Number of rx packets we've recevied */ 2072 struct rte_mbuf* pkts_burst[nb_packets]; /* Array of pointer(s) */ 2073 size_t i; 2074 2075 for (i = 0 ; i < nb_packets ; ++i) { 2076 /* Free the last packet buffer */ 2077 if (packets[i]->buffer != NULL) { 2078 /* Buffer is owned by DPDK */ 2079 if (packets[i]->buf_control == TRACE_CTRL_EXTERNAL) { 2080 rte_pktmbuf_free(packets[i]->buffer); 2081 packets[i]->buffer = NULL; 2082 } else 2083 /* Buffer is owned by packet i.e. has been malloc'd */ 2084 if (packets[i]->buf_control == TRACE_CTRL_PACKET) { 2085 free(packets[i]->buffer); 2086 packets[i]->buffer = NULL; 2087 } 2088 } 2089 packets[i]->buf_control = TRACE_CTRL_EXTERNAL; 2090 packets[i]->type = TRACE_RT_DATA_DPDK; 2091 } 2092 2093 /* Wait for a packet */ 2094 while (1) { 2095 /* Poll for a single packet */ 2096 nb_rx = rte_eth_rx_burst(PERPKT_FORMAT(t)->port, 2097 PERPKT_FORMAT(t)->queue_id, pkts_burst, nb_packets); 2098 if (nb_rx > 0) { 2099 /* Got some packets - otherwise we keep spining */ 2100 //fprintf(stderr, "Doing P READ PACKET port=%d q=%d\n", (int) FORMAT(libtrace)->port, (int) get_thread_table_num(libtrace)); 2101 dpdk_ready_pkts(FORMAT(libtrace), PERPKT_FORMAT(t), pkts_burst, nb_rx, packets); 2102 return nb_rx; 2103 } 2104 // Check the message queue this could be (Well it shouldn't but anyway) be less than 0 2105 if (libtrace_message_queue_count(&t->messages) > 0) { 2106 printf("Extra message yay"); 2107 return -2; 2108 } 2109 if (libtrace_halt) { 2110 return 0; 2111 } 2112 /* Wait a while, polling on memory degrades performance 2113 * This relieves the pressure on memory allowing the NIC to DMA */ 2114 rte_delay_us(10); 2115 } 2116 1327 2117 /* We'll never get here - but if we did it would be bad */ 1328 2118 return -1; … … 1332 2122 struct timeval tv; 1333 2123 struct dpdk_addt_hdr * hdr = get_addt_hdr(packet); 1334 2124 1335 2125 tv.tv_sec = hdr->timestamp / (uint64_t) 1000000000; 1336 2126 tv.tv_usec = (hdr->timestamp % (uint64_t) 1000000000) / 1000; … … 1341 2131 struct timespec ts; 1342 2132 struct dpdk_addt_hdr * hdr = get_addt_hdr(packet); 1343 2133 1344 2134 ts.tv_sec = hdr->timestamp / (uint64_t) 1000000000; 1345 2135 ts.tv_nsec = hdr->timestamp % (uint64_t) 1000000000; … … 1368 2158 static uint64_t dpdk_get_dropped_packets (libtrace_t *trace) { 1369 2159 struct rte_eth_stats stats = {0}; 1370 2160 1371 2161 if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF) 1372 2162 return UINT64_MAX; 1373 2163 /* Grab the current stats */ 1374 2164 rte_eth_stats_get(FORMAT(trace)->port, &stats); 1375 2165 1376 2166 /* Get the drop counter */ 1377 2167 return (uint64_t) stats.ierrors; … … 1380 2170 static uint64_t dpdk_get_captured_packets (libtrace_t *trace) { 1381 2171 struct rte_eth_stats stats = {0}; 1382 2172 1383 2173 if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF) 1384 2174 return UINT64_MAX; 1385 2175 /* Grab the current stats */ 1386 2176 rte_eth_stats_get(FORMAT(trace)->port, &stats); 1387 2177 1388 2178 /* Get the drop counter */ 1389 2179 return (uint64_t) stats.ipackets; … … 1393 2183 * This is the number of packets filtered by the NIC 1394 2184 * and maybe ahead of number read using libtrace. 1395 * 2185 * 1396 2186 * XXX we are yet to implement any filtering, but if it was this should 1397 2187 * get the result. So this will just return 0 for now. … … 1399 2189 static uint64_t dpdk_get_filtered_packets (libtrace_t *trace) { 1400 2190 struct rte_eth_stats stats = {0}; 1401 2191 1402 2192 if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF) 1403 2193 return UINT64_MAX; 1404 2194 /* Grab the current stats */ 1405 2195 rte_eth_stats_get(FORMAT(trace)->port, &stats); 1406 2196 1407 2197 /* Get the drop counter */ 1408 2198 return (uint64_t) stats.fdirmiss; … … 1414 2204 */ 1415 2205 static libtrace_eventobj_t dpdk_trace_event(libtrace_t *trace, 1416 2206 libtrace_packet_t *packet) { 1417 2207 libtrace_eventobj_t event = {0,0,0.0,0}; 1418 2208 int nb_rx; /* Number of receive packets we've read */ 1419 2209 struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) to rx buffers */ 1420 2210 1421 2211 do { 1422 1423 /* See if we already have a packet waiting */ 1424 nb_rx = rte_eth_rx_burst(FORMAT(trace)->port, 1425 FORMAT(trace)->queue_id, pkts_burst, 1); 1426 1427 if (nb_rx > 0) { 1428 /* Free the last packet buffer */ 1429 if (packet->buffer != NULL) { 1430 /* Buffer is owned by DPDK */ 1431 if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) { 1432 rte_pktmbuf_free(packet->buffer); 1433 packet->buffer = NULL; 1434 } else 1435 /* Buffer is owned by packet i.e. has been malloc'd */ 1436 if (packet->buf_control == TRACE_CTRL_PACKET) { 1437 free(packet->buffer); 1438 packet->buffer = NULL; 1439 } 1440 } 1441 1442 packet->buf_control = TRACE_CTRL_EXTERNAL; 1443 packet->type = TRACE_RT_DATA_DPDK; 1444 event.type = TRACE_EVENT_PACKET; 1445 event.size = dpdk_ready_pkt(trace, packet, pkts_burst[0]); 1446 1447 /* XXX - Check this passes the filter trace_read_packet normally 1448 * does this for us but this wont */ 1449 if (trace->filter) { 1450 if (!trace_apply_filter(trace->filter, packet)) { 1451 /* Failed the filter so we loop for another packet */ 1452 trace->filtered_packets ++; 1453 continue; 1454 } 1455 } 1456 trace->accepted_packets ++; 1457 } else { 1458 /* We only want to sleep for a very short time - we are non-blocking */ 1459 event.type = TRACE_EVENT_SLEEP; 1460 event.seconds = 0.0001; 1461 event.size = 0; 1462 } 1463 1464 /* If we get here we have our event */ 1465 break; 2212 2213 /* See if we already have a packet waiting */ 2214 nb_rx = rte_eth_rx_burst(FORMAT(trace)->port, 2215 FORMAT(trace)->queue_id, pkts_burst, 1); 2216 2217 if (nb_rx > 0) { 2218 /* Free the last packet buffer */ 2219 if (packet->buffer != NULL) { 2220 /* Buffer is owned by DPDK */ 2221 if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) { 2222 rte_pktmbuf_free(packet->buffer); 2223 packet->buffer = NULL; 2224 } else 2225 /* Buffer is owned by packet i.e. has been malloc'd */ 2226 if (packet->buf_control == TRACE_CTRL_PACKET) { 2227 free(packet->buffer); 2228 packet->buffer = NULL; 2229 } 2230 } 2231 2232 packet->buf_control = TRACE_CTRL_EXTERNAL; 2233 packet->type = TRACE_RT_DATA_DPDK; 2234 event.type = TRACE_EVENT_PACKET; 2235 dpdk_ready_pkts(FORMAT(trace), &FORMAT(trace)->per_lcore[0], pkts_burst, 1, &packet); 2236 event.size = 1; // TODO should be bytes read, which essentially useless anyway 2237 2238 /* XXX - Check this passes the filter trace_read_packet normally 2239 * does this for us but this wont */ 2240 if (trace->filter) { 2241 if (!trace_apply_filter(trace->filter, packet)) { 2242 /* Failed the filter so we loop for another packet */ 2243 trace->filtered_packets ++; 2244 continue; 2245 } 2246 } 2247 trace->accepted_packets ++; 2248 } else { 2249 /* We only want to sleep for a very short time - we are non-blocking */ 2250 event.type = TRACE_EVENT_SLEEP; 2251 event.seconds = 0.0001; 2252 event.size = 0; 2253 } 2254 2255 /* If we get here we have our event */ 2256 break; 1466 2257 } while (1); 1467 2258 … … 1488 2279 } 1489 2280 1490 2281 static struct libtrace_format_t dpdk = { 1491 2282 "dpdk", 1492 2283 "$Id: format_dpdk.c 1805 2013-03-08 02:01:35Z salcock $", … … 1505 2296 dpdk_read_packet, /* read_packet */ 1506 2297 dpdk_prepare_packet, /* prepare_packet */ 1507 NULL, /* fin_packet */2298 dpdk_fin_packet, /* fin_packet */ 1508 2299 dpdk_write_packet, /* write_packet */ 1509 2300 dpdk_get_link_type, /* get_link_type */ … … 1528 2319 dpdk_trace_event, /* trace_event */ 1529 2320 dpdk_help, /* help */ 1530 NULL 2321 NULL, /* next pointer */ 2322 {true, 8}, /* Live, NICs typically have 8 threads */ 2323 dpdk_pstart_input, /* pstart_input */ 2324 dpdk_pread_packets, /* pread_packets */ 2325 dpdk_pause_input, /* ppause */ 2326 dpdk_fin_input, /* p_fin */ 2327 dpdk_pconfig_input, /* pconfig_input */ 2328 dpdk_pregister_thread, /* pregister_thread */ 2329 dpdk_punregister_thread /* unpregister_thread */ 1531 2330 }; 1532 2331 -
lib/format_duck.c
rc5ac872 rc5ac872 366 366 NULL, /* trace_event */ 367 367 duck_help, /* help */ 368 NULL /* next pointer */ 368 NULL, /* next pointer */ 369 NON_PARALLEL(false) 369 370 }; 370 371 -
lib/format_erf.c
rc69aecb rc69aecb 837 837 erf_event, /* trace_event */ 838 838 erf_help, /* help */ 839 NULL /* next pointer */ 839 NULL, /* next pointer */ 840 NON_PARALLEL(false) 840 841 }; 841 842 … … 880 881 erf_event, /* trace_event */ 881 882 erf_help, /* help */ 882 NULL /* next pointer */ 883 NULL, /* next pointer */ 884 NON_PARALLEL(false) 883 885 }; 884 886 -
lib/format_legacy.c
r1ca603b rb13b939 552 552 trace_event_trace, /* trace_event */ 553 553 legacyatm_help, /* help */ 554 NULL /* next pointer */ 554 NULL, /* next pointer */ 555 NON_PARALLEL(false) 555 556 }; 556 557 … … 595 596 trace_event_trace, /* trace_event */ 596 597 legacyeth_help, /* help */ 597 NULL /* next pointer */ 598 NULL, /* next pointer */ 599 NON_PARALLEL(false) 598 600 }; 599 601 … … 639 641 legacypos_help, /* help */ 640 642 NULL, /* next pointer */ 643 NON_PARALLEL(false) 641 644 }; 642 645 … … 682 685 legacynzix_help, /* help */ 683 686 NULL, /* next pointer */ 687 NON_PARALLEL(false) 684 688 }; 685 689 -
lib/format_pcap.c
r4649fea r4649fea 834 834 trace_event_trace, /* trace_event */ 835 835 pcap_help, /* help */ 836 NULL /* next pointer */ 836 NULL, /* next pointer */ 837 NON_PARALLEL(false) 837 838 }; 838 839 … … 877 878 trace_event_device, /* trace_event */ 878 879 pcapint_help, /* help */ 879 NULL /* next pointer */ 880 NULL, /* next pointer */ 881 NON_PARALLEL(true) 880 882 }; 881 883 -
lib/format_pcapfile.c
rc69aecb rc69aecb 786 786 pcapfile_event, /* trace_event */ 787 787 pcapfile_help, /* help */ 788 NULL /* next pointer */ 788 NULL, /* next pointer */ 789 NON_PARALLEL(false) 789 790 }; 790 791 -
lib/format_rt.c
rc5ac872 rc5ac872 458 458 /* This may fail on a non-Linux machine */ 459 459 if (trace_is_err(RT_INFO->dummy_ring)) { 460 trace_perror(RT_INFO->dummy_ring, "Creating dead inttrace");460 trace_perror(RT_INFO->dummy_ring, "Creating dead ring trace"); 461 461 return -1; 462 462 } … … 863 863 trace_event_rt, /* trace_event */ 864 864 rt_help, /* help */ 865 NULL /* next pointer */ 865 NULL, /* next pointer */ 866 NON_PARALLEL(true) /* This is normally live */ 866 867 }; 867 868 -
lib/format_tsh.c
rc69aecb rc69aecb 274 274 trace_event_trace, /* trace_event */ 275 275 tsh_help, /* help */ 276 NULL /* next pointer */ 276 NULL, /* next pointer */ 277 NON_PARALLEL(false) 277 278 }; 278 279 … … 322 323 trace_event_trace, /* trace_event */ 323 324 tsh_help, /* help */ 324 NULL /* next pointer */ 325 NULL, /* next pointer */ 326 NON_PARALLEL(false) 325 327 }; 326 328 -
lib/libtrace.h.in
rc5ac872 rc5ac872 117 117 /** DAG driver version installed on the current system */ 118 118 #define DAG_DRIVER_V "@DAG_VERSION_NUM@" 119 120 /** 121 * A version of assert that always runs the first argument even 122 * when not debugging, however only asserts the condition if debugging 123 * Intended for use mainly with pthread locks etc. which have error 124 * returns but *should* never actually fail. 125 */ 126 #ifdef NDEBUG 127 #define ASSERT_RET(run, cond) run 128 #else 129 #define ASSERT_RET(run, cond) assert(run cond) 130 //#define ASSERT_RET(run, cond) run 131 #endif 119 132 120 133 #ifdef __cplusplus … … 197 210 #endif 198 211 212 // Used to fight against false sharing 213 #define CACHE_LINE_SIZE 64 214 #define ALIGN_STRUCT(x) __attribute__((aligned(x))) 215 199 216 #ifdef _MSC_VER 200 217 #ifdef LT_BUILDING_DLL … … 225 242 /** Opaque structure holding information about a bpf filter */ 226 243 typedef struct libtrace_filter_t libtrace_filter_t; 244 245 typedef struct libtrace_thread_t libtrace_thread_t; 227 246 228 247 /** If the packet has allocated its own memory the buffer_control should be … … 512 531 uint8_t transport_proto; /**< Cached transport protocol */ 513 532 uint32_t l4_remaining; /**< Cached transport remaining */ 533 uint64_t order; /**< Notes the order of this packet in relation to the input */ 534 uint64_t hash; /**< A hash of the packet as supplied by the user */ 535 int error; /**< The error status of pread_packet */ 514 536 } libtrace_packet_t; 515 537 … … 3231 3253 /*@}*/ 3232 3254 3255 /** 3256 * A collection of types for convenience used in place of a 3257 * simple void* to allow a any type of data to be stored. 3258 * 3259 * This is expected to be 8 bytes in length. 3260 */ 3261 typedef union { 3262 /* Pointers */ 3263 void *ptr; 3264 libtrace_packet_t *pkt; 3265 3266 /* C99 Integer types */ 3267 /* NOTE: Standard doesn't require 64-bit 3268 * but x32 and x64 gcc does */ 3269 int64_t sint64; 3270 uint64_t uint64; 3271 3272 uint32_t uint32s[2]; 3273 int32_t sint32s[2]; 3274 uint32_t uint32; 3275 int32_t sint32; 3276 3277 uint16_t uint16s[4]; 3278 int16_t sint16s[4]; 3279 uint16_t uint16; 3280 int16_t sint16; 3281 3282 uint8_t uint8s[8]; 3283 int8_t sint8s[8]; 3284 uint8_t uint8; 3285 int8_t sint8; 3286 3287 size_t size; 3288 3289 /* C basic types - we cannot be certian of the size */ 3290 int sint; 3291 unsigned int uint; 3292 3293 signed char schars[8]; 3294 unsigned char uchars[8]; 3295 signed char schar; 3296 unsigned char uchar; 3297 3298 /* Real numbers */ 3299 float rfloat; 3300 double rdouble; 3301 } libtrace_generic_types_t; 3302 3303 typedef struct libtrace_message_t { 3304 int code; 3305 libtrace_generic_types_t additional; 3306 libtrace_thread_t *sender; 3307 } libtrace_message_t; 3308 3309 /** Structure holding information about a result */ 3310 typedef struct libtrace_result_t { 3311 uint64_t key; 3312 libtrace_generic_types_t value; 3313 int type; 3314 } libtrace_result_t; 3315 #define RESULT_NORMAL 0 3316 #define RESULT_PACKET 1 3317 #define RESULT_TICK 2 3318 3319 3320 typedef void* (*fn_per_pkt)(libtrace_t* trace, libtrace_packet_t *p, libtrace_message_t *m, libtrace_thread_t *thread); 3321 typedef void (*fn_reporter)(libtrace_t* trace, libtrace_result_t *r, libtrace_message_t *m); 3322 typedef uint64_t (*fn_hasher)(const libtrace_packet_t* packet, void *data); 3323 3324 DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob, fn_per_pkt per_pkt, fn_reporter reporter); 3325 DLLEXPORT int trace_ppause(libtrace_t *libtrace); 3326 DLLEXPORT int trace_pstop(libtrace_t *libtrace); 3327 DLLEXPORT void trace_join(libtrace_t * trace); 3328 DLLEXPORT void print_contention_stats (libtrace_t *libtrace); 3329 3330 DLLEXPORT void libtrace_result_set_key(libtrace_result_t * result, uint64_t key); 3331 DLLEXPORT uint64_t libtrace_result_get_key(libtrace_result_t * result); 3332 DLLEXPORT void libtrace_result_set_value(libtrace_result_t * result, libtrace_generic_types_t value); 3333 DLLEXPORT libtrace_generic_types_t libtrace_result_get_value(libtrace_result_t * result); 3334 DLLEXPORT void libtrace_result_set_key_value(libtrace_result_t * result, uint64_t key, libtrace_generic_types_t value); 3335 DLLEXPORT void trace_destroy_result(libtrace_result_t ** result); 3336 3337 // Ways to access Global and TLS storage that we provide the user 3338 DLLEXPORT void * trace_get_global(libtrace_t *trace); 3339 DLLEXPORT void * trace_set_global(libtrace_t *trace, void * data); 3340 DLLEXPORT void * trace_set_tls(libtrace_thread_t *t, void * data); 3341 DLLEXPORT void * trace_get_tls(libtrace_thread_t *t); 3342 3343 3344 DLLEXPORT void trace_publish_result(libtrace_t *libtrace, libtrace_thread_t *t, uint64_t key, libtrace_generic_types_t value, int type); 3345 typedef struct libtrace_vector libtrace_vector_t; 3346 3347 DLLEXPORT int trace_post_reporter(libtrace_t *libtrace); 3348 DLLEXPORT int libtrace_thread_get_message_count(libtrace_t * libtrace); 3349 DLLEXPORT int libtrace_thread_get_message(libtrace_t * libtrace, libtrace_message_t * message); 3350 DLLEXPORT int libtrace_thread_try_get_message(libtrace_t * libtrace, libtrace_message_t * message); 3351 DLLEXPORT int trace_send_message_to_reporter(libtrace_t * libtrace, libtrace_message_t * message); 3352 DLLEXPORT int trace_send_message_to_perpkts(libtrace_t * libtrace, libtrace_message_t * message); 3353 DLLEXPORT int trace_send_message_to_thread(libtrace_t * libtrace, libtrace_thread_t *t, libtrace_message_t * message); 3354 DLLEXPORT int trace_finished(libtrace_t * libtrace); 3355 DLLEXPORT uint64_t trace_packet_get_order(libtrace_packet_t * packet); 3356 DLLEXPORT uint64_t trace_packet_get_hash(libtrace_packet_t * packet); 3357 DLLEXPORT void trace_packet_set_order(libtrace_packet_t * packet, uint64_t order); 3358 DLLEXPORT void trace_packet_set_hash(libtrace_packet_t * packet, uint64_t hash); 3359 DLLEXPORT uint64_t tv_to_usec(struct timeval *tv); 3360 3361 DLLEXPORT int retrive_first_packet(libtrace_t *libtrace, libtrace_packet_t **packet, struct timeval **tv); 3362 3363 DLLEXPORT void libtrace_make_packet_safe(libtrace_packet_t *pkt); 3364 DLLEXPORT void libtrace_make_result_safe(libtrace_result_t *res); 3365 3366 typedef enum { 3367 /** 3368 * Sets the hasher function, if NULL(default) no hashing is used a 3369 * cores will get packets on a first in first served basis 3370 */ 3371 TRACE_OPTION_SET_HASHER, 3372 3373 /** 3374 * Libtrace set perpkt thread count 3375 */ 3376 TRACE_OPTION_SET_PERPKT_THREAD_COUNT, 3377 3378 /** 3379 * Delays packets so they are played back in trace-time rather than as fast 3380 * as possible. 3381 */ 3382 TRACE_OPTION_TRACETIME, 3383 3384 /** 3385 * Specifies the interval between tick packets in milliseconds, if 0 3386 * or less this is ignored. 3387 */ 3388 TRACE_OPTION_TICK_INTERVAL, 3389 TRACE_OPTION_GET_CONFIG, 3390 TRACE_OPTION_SET_CONFIG 3391 } trace_parallel_option_t; 3392 3393 enum libtrace_messages { 3394 MESSAGE_STARTING, 3395 MESSAGE_RESUMING, 3396 MESSAGE_STOPPING, 3397 MESSAGE_PAUSING, 3398 MESSAGE_DO_PAUSE, 3399 MESSAGE_DO_STOP, 3400 MESSAGE_FIRST_PACKET, 3401 MESSAGE_PERPKT_ENDED, 3402 MESSAGE_PERPKT_RESUMED, 3403 MESSAGE_PERPKT_PAUSED, 3404 MESSAGE_PERPKT_EOF, 3405 MESSAGE_POST_REPORTER, 3406 MESSAGE_POST_RANGE, 3407 MESSAGE_TICK, 3408 MESSAGE_USER 3409 }; 3410 3411 enum hasher_types { 3412 /** 3413 * Balance load across CPUs best as possible, this is basically to say do 3414 * not care about hash. This might still might be implemented 3415 * using a hash or round robin etc. under the hood depending on the format 3416 */ 3417 HASHER_BALANCE, 3418 3419 /** Use a hash which is bi-directional for TCP flows, that is packets with 3420 * the same hash are sent to the same thread. All non TCP packets will be 3421 * sent to the same thread. UDP may or may not be sent to separate 3422 * threads like TCP, this depends on the format support. 3423 */ 3424 HASHER_BIDIRECTIONAL, 3425 3426 /** 3427 * Use a hash which is uni-directional across TCP flows, that means the 3428 * opposite directions of the same 5 tuple might end up on separate cores. 3429 * Otherwise is identical to HASHER_BIDIRECTIONAL 3430 */ 3431 HASHER_UNIDIRECTIONAL, 3432 3433 /** 3434 * Always use the user supplied hasher, this currently disables native 3435 * support and is likely significantly slower. 3436 */ 3437 HASHER_CUSTOM, 3438 3439 /** 3440 * This is not a valid option, used internally only!!! TODO remove 3441 * Set by the format if the hashing is going to be done in hardware 3442 */ 3443 HASHER_HARDWARE 3444 }; 3445 3446 typedef struct libtrace_info_t { 3447 /** 3448 * True if a live format (i.e. packets have to be tracetime). 3449 * Otherwise false, indicating packets can be read as fast 3450 * as possible from the format. 3451 */ 3452 bool live; 3453 3454 /** 3455 * The maximum number of threads supported by a parallel trace. 1 3456 * if parallel support is not native (in this case libtrace will simulate 3457 * an unlimited number of threads), -1 means unlimited and 0 unknown. 3458 */ 3459 int max_threads; 3460 3461 /* TODO hash fn supported list */ 3462 3463 /* TODO consider time/clock details?? */ 3464 } libtrace_info_t; 3465 3466 DLLEXPORT int trace_parallel_config(libtrace_t *libtrace, trace_parallel_option_t option, void *value); 3467 DLLEXPORT libtrace_packet_t* trace_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet); 3468 DLLEXPORT void trace_free_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet); 3469 DLLEXPORT int trace_set_hasher(libtrace_t *trace, enum hasher_types type, fn_hasher hasher, void *data); 3470 DLLEXPORT libtrace_info_t *trace_get_information(libtrace_t * libtrace); 3471 3472 /** 3473 * Tuning the parallel sizes 3474 */ 3475 struct user_configuration { 3476 // Packet memory cache settings (ocache_init) total 3477 /** 3478 * See diagrams, this sets the maximum size of freelist used to 3479 * maintain packets and their memory buffers. 3480 * NOTE setting this to less than recommend could cause deadlock a 3481 * trace that manages its own packets. 3482 * A unblockable error message will be printed. 3483 */ 3484 size_t packet_cache_size; 3485 /** 3486 * Per thread local cache size for the packet freelist 3487 */ 3488 size_t packet_thread_cache_size; 3489 /** 3490 * If true the total number of packets that can be created by a trace is limited 3491 * to the packet_cache_size, otherwise once packet_cache_size is exceeded alloc 3492 * and free will be used to create and free packets, this will be slower than 3493 * using the freelist and could run a machine out of memory. 3494 * 3495 * However this does make it easier to ensure that deadlocks will not occur 3496 * due to running out of packets 3497 */ 3498 bool fixed_packet_count; 3499 /** 3500 * When reading from a single threaded input source to reduce 3501 * lock contention a 'burst' of packets is read per pkt thread 3502 * this determines the bursts size. 3503 */ 3504 size_t burst_size; 3505 // Each perpkt thread has a queue leading into the reporter 3506 //size_t reporter_queue_size; 3507 3508 /** 3509 * The tick interval - in milliseconds 3510 * When a live trace is used messages are sent at the tick 3511 * interval to ensure that all perpkt threads receive data 3512 * this allows results to be printed in cases flows are 3513 * not being directed to a certian thread, while still 3514 * maintaining order. 3515 */ 3516 size_t tick_interval; 3517 3518 /** 3519 * Like the tick interval but used in the case of file format 3520 * This specifies the number of packets before inserting a tick to 3521 * every thread. 3522 */ 3523 size_t tick_count; 3524 3525 /** 3526 * The number of per packet threads requested, 0 means use default. 3527 * Default typically be the number of processor threads detected less one or two. 3528 */ 3529 size_t perpkt_threads; 3530 3531 /** 3532 * See diagrams, this sets the maximum size of buffers used between 3533 * the single hasher thread and the buffer. 3534 * NOTE setting this to less than recommend could cause deadlock a 3535 * trace that manages its own packets. 3536 * A unblockable warning message will be printed to stderr in this case. 3537 */ 3538 /** The number of packets that can queue per thread from hasher thread */ 3539 size_t hasher_queue_size; 3540 3541 /** 3542 * If true use a polling hasher queue, that means that we will spin/or yeild 3543 * when rather than blocking on a lock. This applies to both the hasher thread 3544 * and perpkts reading the queues. 3545 */ 3546 bool hasher_polling; 3547 3548 /** 3549 * If true the reporter thread will continuously poll waiting for results 3550 * if false they are only checked when a message is received, this message 3551 * is controlled by reporter_thold. 3552 */ 3553 bool reporter_polling; 3554 3555 /** 3556 * Perpkt thread result queue size before triggering the reporter step to read results 3557 */ 3558 size_t reporter_thold; 3559 3560 /** 3561 * Prints a line to standard error for every state change 3562 * for both the trace as a whole and for each thread. 3563 */ 3564 bool debug_state; 3565 }; 3566 #include <stdio.h> 3567 DLLEXPORT void parse_user_config(struct user_configuration* uc, char * str); 3568 DLLEXPORT void parse_user_config_file(struct user_configuration* uc, FILE *file); 3569 DLLEXPORT int libtrace_get_perpkt_count(libtrace_t* t); 3570 3571 /** 3572 * The methods we use to combine multiple outputs into a single output 3573 * This is not considered a stable API however is public. 3574 * Where possible use built in combiners 3575 * 3576 * NOTE this structure is duplicated per trace and as such can 3577 * have functions rewritten, and in fact should if possible. 3578 */ 3579 typedef struct libtrace_combine libtrace_combine_t; 3580 struct libtrace_combine { 3581 3582 /** 3583 * Called at the start of the trace to allow datastructures 3584 * to be initilised and allow functions to be swapped if approriate. 3585 * 3586 * Also factors such as whether the trace is live or not can 3587 * be used to determine the functions used. 3588 * @return 0 if successful, -1 if an error occurs 3589 */ 3590 int (*initialise)(libtrace_t *,libtrace_combine_t *); 3591 3592 /** 3593 * Called when the trace ends, clean up any memory here 3594 * from libtrace_t * init. 3595 */ 3596 void (*destroy)(libtrace_t *, libtrace_combine_t *); 3597 3598 /** 3599 * Publish a result against it's a threads queue. 3600 * If null publish directly, expected to be used 3601 * as a single threaded optimisation and can be 3602 * set to NULL by init if this case is detected. 3603 */ 3604 void (*publish)(libtrace_t *, int thread_id, libtrace_combine_t *, libtrace_result_t *); 3605 3606 /** 3607 * Read as many results as possible from the trace. 3608 * Directy calls the users code to handle results from here. 3609 * 3610 * THIS SHOULD BE NON-BLOCKING AND READ AS MANY AS POSSIBLE 3611 * If publish is NULL, this probably should be NULL also otherwise 3612 * it will not be called. 3613 */ 3614 void (*read)(libtrace_t *, libtrace_combine_t *); 3615 3616 /** 3617 * Called when the trace is finished to flush the final 3618 * results to the reporter thread. 3619 * 3620 * There may be no results, in which case this should 3621 * just return. 3622 * 3623 * Libtrace state: 3624 * Called from reporter thread 3625 * No perpkt threads will be running, i.e. publish will not be 3626 * called again. 3627 * 3628 * If publish is NULL, this probably should be NULL also otherwise 3629 * it will not be called. 3630 */ 3631 void (*read_final)(libtrace_t *, libtrace_combine_t *); 3632 3633 /** 3634 * Pause must make sure any results of the type packet are safe. 3635 * That means trace_copy_packet() and destroy the original. 3636 * This also should be NULL if publish is NULL. 3637 */ 3638 void (*pause)(libtrace_t *, libtrace_combine_t *); 3639 3640 /** 3641 * Data storage for all the combiner threads 3642 */ 3643 void *queues; 3644 3645 /** 3646 * Configuration options, what this does is upto the combiner 3647 * chosen. 3648 */ 3649 libtrace_generic_types_t configuration; 3650 }; 3651 3652 DLLEXPORT void trace_set_combiner(libtrace_t *trace, const libtrace_combine_t *combiner, libtrace_generic_types_t config); 3653 3654 #define READ_EOF 0 3655 #define READ_ERROR -1 3656 #define READ_MESSAGE -2 3657 // Used for inband tick message 3658 #define READ_TICK -3 3659 3660 #define ZERO_USER_CONFIG(config) memset(&config, 0, sizeof(struct user_configuration)); 3661 3233 3662 #ifdef __cplusplus 3234 3663 } /* extern "C" */ -
lib/libtrace_int.h
r6fc1ae7 r6cf3ca0 148 148 #endif 149 149 150 #include "data-struct/ring_buffer.h" 151 #include "data-struct/object_cache.h" 152 #include "data-struct/vector.h" 153 #include "data-struct/message_queue.h" 154 #include "data-struct/deque.h" 155 #include "data-struct/linked_list.h" 156 #include "data-struct/sliding_window.h" 150 157 151 158 //#define RP_BUFSIZE 65536U … … 166 173 bool waiting; 167 174 }; 175 176 enum thread_types { 177 THREAD_EMPTY, 178 THREAD_HASHER, 179 THREAD_PERPKT, 180 THREAD_REPORTER, 181 THREAD_KEEPALIVE 182 }; 183 184 enum thread_states { 185 THREAD_RUNNING, 186 THREAD_FINISHING, 187 THREAD_FINISHED, 188 THREAD_PAUSED, 189 THREAD_STATE_MAX 190 }; 191 192 /** 193 * Information of this thread 194 */ 195 struct libtrace_thread_t { 196 uint64_t accepted_packets; // The number of packets accepted only used if pread 197 uint64_t filtered_packets; 198 // is retreving packets 199 // Set to true once the first packet has been stored 200 bool recorded_first; 201 // For thread safety reason we actually must store this here 202 int64_t tracetime_offset_usec; 203 void* user_data; // TLS for the user to use 204 void* format_data; // TLS for the format to use 205 libtrace_message_queue_t messages; // Message handling 206 libtrace_ringbuffer_t rbuffer; // Input 207 libtrace_t * trace; 208 void* ret; 209 enum thread_types type; 210 enum thread_states state; 211 pthread_t tid; 212 int perpkt_num; // A number from 0-X that represents this perpkt threads number 213 // in the table, intended to quickly identify this thread 214 // -1 represents NA (such as the case this is not a perpkt thread) 215 }; 216 217 /** 218 * Storage to note time value against each. 219 * Used both internally to do trace time playback 220 * and can be used externally to assist applications which need 221 * a trace starting time such as tracertstats. 222 */ 223 struct first_packets { 224 pthread_spinlock_t lock; 225 size_t count; // If == perpkt_thread_count threads we have all 226 size_t first; // Valid if count != 0 227 struct __packet_storage_magic_type { 228 libtrace_packet_t * packet; 229 struct timeval tv; 230 } * packets; 231 }; 232 233 #define TRACE_STATES \ 234 X(STATE_NEW) \ 235 X(STATE_RUNNING) \ 236 X(STATE_PAUSING) \ 237 X(STATE_PAUSED) \ 238 X(STATE_FINSHED) \ 239 X(STATE_DESTROYED) \ 240 X(STATE_JOINED) \ 241 X(STATE_ERROR) 242 243 #define X(a) a, 244 enum trace_state { 245 TRACE_STATES 246 }; 247 #undef X 248 249 #define X(a) case a: return #a; 250 static inline char *get_trace_state_name(enum trace_state ts){ 251 switch(ts) { 252 TRACE_STATES 253 default: 254 return "UNKNOWN"; 255 } 256 } 257 #undef X 168 258 169 259 /** A libtrace input trace … … 188 278 uint64_t filtered_packets; 189 279 /** The filename from the uri for the trace */ 190 char *uridata; 280 char *uridata; 191 281 /** The libtrace IO reader for this trace (if applicable) */ 192 io_t *io; 282 io_t *io; 193 283 /** Error information for the trace */ 194 libtrace_err_t err; 284 libtrace_err_t err; 195 285 /** Boolean flag indicating whether the trace has been started */ 196 bool started; 286 bool started; 287 /** Synchronise writes/reads across this format object and attached threads etc */ 288 pthread_mutex_t libtrace_lock; 289 /** State */ 290 enum trace_state state; 291 /** Use to control pausing threads and finishing threads etc always used with libtrace_lock */ 292 pthread_cond_t perpkt_cond; 293 /* Keep track of counts of threads in any given state */ 294 int perpkt_thread_states[THREAD_STATE_MAX]; 295 296 /** Set to indicate a perpkt's queue is full as such the writing perpkt cannot proceed */ 297 bool perpkt_queue_full; 298 /** Global storage for this trace, shared among all the threads */ 299 void* global_blob; 300 /** The actual freelist */ 301 libtrace_ocache_t packet_freelist; 302 /** User defined per_pkt function called when a pkt is ready */ 303 fn_per_pkt per_pkt; 304 /** User defined reporter function entry point XXX not hooked up */ 305 fn_reporter reporter; 306 /** The hasher function */ 307 enum hasher_types hasher_type; 308 /** The hasher function - NULL implies they don't care or balance */ 309 fn_hasher hasher; // If valid using a separate thread 310 void *hasher_data; 311 /** The pread_packet choosen path for the configuration */ 312 int (*pread)(libtrace_t *, libtrace_thread_t *, libtrace_packet_t **, size_t); 313 314 libtrace_thread_t hasher_thread; 315 libtrace_thread_t reporter_thread; 316 libtrace_thread_t keepalive_thread; 317 int perpkt_thread_count; 318 libtrace_thread_t * perpkt_threads; // All our perpkt threads 319 // Used to keep track of the first packet seen on each thread 320 struct first_packets first_packets; 321 int tracetime; 322 323 /* 324 * Caches statistic counters in the case that our trace is 325 * paused or stopped before this counter is taken 326 */ 327 uint64_t dropped_packets; 328 uint64_t received_packets; 329 struct user_configuration config; 330 libtrace_combine_t combiner; 197 331 }; 332 333 void trace_fin_packet(libtrace_packet_t *packet); 334 void libtrace_zero_thread(libtrace_thread_t * t); 335 void store_first_packet(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t); 336 libtrace_thread_t * get_thread_table(libtrace_t *libtrace); 337 int get_thread_table_num(libtrace_t *libtrace); 338 198 339 199 340 /** A libtrace output trace … … 202 343 struct libtrace_out_t { 203 344 /** The capture format for the output trace */ 204 345 struct libtrace_format_t *format; 205 346 /** Pointer to the "global" data for the capture format module */ 206 347 void *format_data; … … 210 351 libtrace_err_t err; 211 352 /** Boolean flag indicating whether the trace has been started */ 212 bool started; 353 bool started; 213 354 }; 214 355 … … 303 444 } PACKED libtrace_pflog_header_t; 304 445 305 306 307 446 /** A libtrace capture format module */ 308 447 /* All functions should return -1, or NULL on failure */ … … 734 873 /** Prints some useful help information to standard output. */ 735 874 void (*help)(void); 736 875 737 876 /** Next pointer, should always be NULL - used by the format module 738 877 * manager. */ 739 878 struct libtrace_format_t *next; 879 880 /** Holds information about the trace format */ 881 struct libtrace_info_t info; 882 883 /** 884 * Starts or unpauses an input trace in parallel mode - note that 885 * this function is often the one that opens the file or device for 886 * reading. 887 * 888 * @param libtrace The input trace to be started or unpaused 889 * @return 0 upon success. 890 * Otherwise in event of an error -1 is returned. 891 * 892 */ 893 int (*pstart_input)(libtrace_t *trace); 894 895 /** 896 * Read a batch of packets from the input stream related to thread. 897 * At most read nb_packets, however should return with less if packets 898 * are not waiting. However still must return at least 1, 0 still indicates 899 * EOF. 900 * 901 * @param libtrace The input trace 902 * @param t The thread 903 * @param packets An array of packets 904 * @param nb_packets The number of packets in the array (the maximum to read) 905 * @return The number of packets read, or 0 in the case of EOF or -1 in error or -2 to represent 906 * interrupted due to message waiting before packets had been read. 907 */ 908 int (*pread_packets)(libtrace_t *trace, libtrace_thread_t *t, libtrace_packet_t **packets, size_t nb_packets); 909 910 /** Pause a parallel trace 911 * 912 * @param libtrace The input trace to be paused 913 */ 914 int (*ppause_input)(libtrace_t *trace); 915 916 /** Called after all threads have been paused, Finish (close) a parallel trace 917 * 918 * @param libtrace The input trace to be stopped 919 */ 920 int (*pfin_input)(libtrace_t *trace); 921 922 /** Applies a configuration option to an input trace. 923 * 924 * @param libtrace The input trace to apply the option to 925 * @param option The option that is being configured 926 * @param value A pointer to the value that the option is to be 927 * set to 928 * @return 0 if successful, -1 if the option is unsupported or an error 929 * occurs 930 */ 931 int (*pconfig_input)(libtrace_t *libtrace,trace_parallel_option_t option,void *value); 932 933 /** 934 * Register a thread for use with the format or using the packets produced 935 * by it. This is NOT only used for threads reading packets in fact all 936 * threads use this. 937 * 938 * The libtrace lock is not held by this format but can be aquired 939 * by the format. 940 * 941 * Some use cases include setting up any thread local storage required for 942 * to read packets and free packets. For DPDK we require any thread that 943 * may release or read a packet to have have an internal number associated 944 * with it. 945 * 946 * The thread type can be used to see if this thread is going to be used 947 * to read packets or otherwise. 948 * 949 * @return 0 if successful, -1 if the option is unsupported or an error 950 * occurs (such as a maximum of threads being reached) 951 */ 952 int (*pregister_thread)(libtrace_t *libtrace, libtrace_thread_t *t, bool reader); 953 954 /** 955 * If needed any memory allocated with pregister_thread can be released 956 * in this function. The thread will be destroyed directly after this 957 * function is called. 958 */ 959 void (*punregister_thread)(libtrace_t *libtrace, libtrace_thread_t *t); 740 960 }; 961 962 /** Macro to zero out a single thread format */ 963 #define NON_PARALLEL(live) \ 964 {live, 1}, /* trace info */ \ 965 NULL, /* pstart_input */ \ 966 NULL, /* pread_packet */ \ 967 NULL, /* ppause_input */ \ 968 NULL, /* pfin_input */ \ 969 NULL, /* pconfig_input */ \ 970 NULL, /* pregister_thread */ \ 971 NULL /* punregister_thread */ 741 972 742 973 /** The list of registered capture formats */ … … 943 1174 /** Constructor for the Linux Native format module */ 944 1175 void linuxnative_constructor(void); 1176 /** Constructor for the Linux Ring format module */ 1177 void linuxring_constructor(void); 945 1178 /** Constructor for the PCAP format module */ 946 1179 void pcap_constructor(void); -
lib/trace.c
r6fc1ae7 r6fc1ae7 99 99 #include "rt_protocol.h" 100 100 101 #include <pthread.h> 102 #include <signal.h> 103 101 104 #define MAXOPTS 1024 102 105 … … 106 109 107 110 volatile int libtrace_halt = 0; 111 /* Set once pstart is called used for backwards compatibility reasons */ 112 int libtrace_parallel = 0; 108 113 109 114 /* strncpy is not assured to copy the final \0, so we … … 137 142 legacy_constructor(); 138 143 atmhdr_constructor(); 144 linuxring_constructor(); 139 145 linuxnative_constructor(); 140 146 #ifdef HAVE_LIBPCAP … … 253 259 libtrace->filtered_packets = 0; 254 260 libtrace->accepted_packets = 0; 261 262 /* Parallel inits */ 263 ASSERT_RET(pthread_mutex_init(&libtrace->libtrace_lock, NULL), == 0); 264 ASSERT_RET(pthread_cond_init(&libtrace->perpkt_cond, NULL), == 0); 265 libtrace->state = STATE_NEW; 266 libtrace->perpkt_queue_full = false; 267 libtrace->global_blob = NULL; 268 libtrace->per_pkt = NULL; 269 libtrace->reporter = NULL; 270 libtrace->hasher = NULL; 271 libtrace_zero_ocache(&libtrace->packet_freelist); 272 libtrace_zero_thread(&libtrace->hasher_thread); 273 libtrace_zero_thread(&libtrace->reporter_thread); 274 libtrace_zero_thread(&libtrace->keepalive_thread); 275 libtrace->reporter_thread.type = THREAD_EMPTY; 276 libtrace->perpkt_thread_count = 0; 277 libtrace->perpkt_threads = NULL; 278 libtrace->tracetime = 0; 279 libtrace->first_packets.first = 0; 280 libtrace->first_packets.count = 0; 281 libtrace->first_packets.packets = NULL; 282 libtrace->dropped_packets = UINT64_MAX; 283 libtrace->received_packets = UINT64_MAX; 284 libtrace->pread = NULL; 285 ZERO_USER_CONFIG(libtrace->config); 255 286 256 287 /* Parse the URI to determine what sort of trace we are dealing with */ … … 348 379 libtrace->io = NULL; 349 380 libtrace->filtered_packets = 0; 381 382 /* Parallel inits */ 383 ASSERT_RET(pthread_mutex_init(&libtrace->libtrace_lock, NULL), == 0); 384 ASSERT_RET(pthread_cond_init(&libtrace->perpkt_cond, NULL), == 0); 385 libtrace->state = STATE_NEW; // TODO MAYBE DEAD 386 libtrace->perpkt_queue_full = false; 387 libtrace->global_blob = NULL; 388 libtrace->per_pkt = NULL; 389 libtrace->reporter = NULL; 390 libtrace->hasher = NULL; 391 libtrace_zero_ocache(&libtrace->packet_freelist); 392 libtrace_zero_thread(&libtrace->hasher_thread); 393 libtrace_zero_thread(&libtrace->reporter_thread); 394 libtrace_zero_thread(&libtrace->keepalive_thread); 395 libtrace->reporter_thread.type = THREAD_EMPTY; 396 libtrace->perpkt_thread_count = 0; 397 libtrace->perpkt_threads = NULL; 398 libtrace->tracetime = 0; 399 libtrace->pread = NULL; 400 ZERO_USER_CONFIG(libtrace->config); 350 401 351 402 for(tmp=formats_list;tmp;tmp=tmp->next) { … … 583 634 */ 584 635 DLLEXPORT void trace_destroy(libtrace_t *libtrace) { 585 assert(libtrace); 636 int i; 637 assert(libtrace); 638 639 ASSERT_RET(pthread_mutex_destroy(&libtrace->libtrace_lock), == 0); 640 ASSERT_RET(pthread_cond_destroy(&libtrace->perpkt_cond), == 0); 641 642 /* destroy any packets that are still around */ 643 if (libtrace->state != STATE_NEW && libtrace->first_packets.packets) {