Changeset d420777
- Timestamp:
- 08/19/15 13:37:05 (5 years ago)
- Branches:
- 4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
- Children:
- 461582b
- Parents:
- c24de65 (diff), 6210f82 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent. - Files:
-
- 48 added
- 1 deleted
- 29 edited
Legend:
- Unmodified
- Added
- Removed
-
README
r3e5518a r4631115 1 This fork of Libtrace aims to support parallel packet processing. 2 3 This is still work in progress and is full of bugs, some of the original 4 Libtrace functions might not function correctly breaking the supplied tools. 5 1 6 libtrace 3.0.22 2 7 -
configure.in
rc24de65 rd420777 39 39 tools/traceends/Makefile 40 40 examples/Makefile examples/skeleton/Makefile examples/rate/Makefile 41 examples/stats/Makefile examples/tutorial/Makefile 41 examples/stats/Makefile examples/tutorial/Makefile examples/parallel/Makefile 42 42 docs/libtrace.doxygen 43 43 lib/libtrace.h … … 401 401 fi 402 402 403 # If we use DPDK we might be able to use libnuma 404 AC_CHECK_LIB(numa, numa_node_to_cpus, have_numa=1, have_numa=0) 405 403 406 # Checks for various "optional" libraries 404 407 AC_CHECK_LIB(pthread, pthread_create, have_pthread=1, have_pthread=0) 408 409 AC_CHECK_LIB(pthread, pthread_setname_np, have_pthread_setname_np=1, have_pthread_setname_np=0) 405 410 406 411 # Check for ncurses … … 420 425 AC_CHECK_LIB(rt, clock_gettime, have_clock_gettime=1, have_clock_gettime=0) 421 426 LIBS= 427 428 if test "$have_numa" = 1; then 429 LIBTRACE_LIBS="$LIBTRACE_LIBS -lnuma" 430 AC_DEFINE(HAVE_LIBNUMA, 1, [Set to 1 if libnuma is supported]) 431 with_numa=yes 432 else 433 AC_DEFINE(HAVE_LIBNUMA, 0, [Set to 1 if libnuma is supported]) 434 with_numa=no 435 fi 422 436 423 437 if test "$dlfound" = 0; then … … 434 448 if test "$have_pthread" = 1; then 435 449 AC_DEFINE(HAVE_LIBPTHREAD, 1, [Set to 1 if pthreads are supported]) 450 fi 451 452 if test "$have_pthread_setname_np" = 1; then 453 AC_DEFINE(HAVE_PTHREAD_SETNAME_NP, 1, [Set to 1 if pthread_setname_np is found]) 436 454 fi 437 455 … … 453 471 454 472 if test "$have_clock_gettime" = 1; then 455 LIBTRACE_LIBS="$LIBTRACE_LIBS -lrt" 473 LIBTRACE_LIBS="$LIBTRACE_LIBS -lrt" 474 AC_DEFINE(HAVE_CLOCK_GETTIME, 1, [Set to 1 if clock_gettime is supported]) 475 with_clock_gettime=yes 476 else 477 AC_DEFINE(HAVE_CLOCK_GETTIME, 0, [Set to 1 if clock_gettime is supported]) 478 with_clock_gettime=no 456 479 fi 457 480 … … 599 622 600 623 if test x"$libtrace_dpdk" = xtrue; then 601 AC_MSG_NOTICE([Compiled with DPDK live capture support: Yes]) 624 AC_MSG_NOTICE([Compiled with DPDK live capture support: Yes]) 625 reportopt "Compiled with DPDK trace NUMA support" $with_numa 626 reportopt "Compiled with clock_gettime support" $with_clock_gettime 602 627 elif test x"$want_dpdk" != "xno"; then 603 628 # We don't officially support DPDK so only report failure if the user 604 629 # explicitly asked for DPDK. That way, we can hopefully keep it hidden 605 630 # from most users for now... 606 607 608 631 632 AC_MSG_NOTICE([Compiled with DPDK live capture support: No]) 633 AC_MSG_NOTICE([Note: Requires DPDK v1.5 or newer]) 609 634 fi 610 635 reportopt "Compiled with LLVM BPF JIT support" $JIT -
examples/Makefile.am
r8835f5a r16cb2a2 1 1 2 SUBDIRS=skeleton rate stats tutorial 2 SUBDIRS=skeleton rate stats tutorial parallel -
lib/Makefile.am
r3a333e2 rd420777 1 1 lib_LTLIBRARIES = libtrace.la 2 include_HEADERS = libtrace.h dagformat.h lt_inttypes.h daglegacy.h rt_protocol.h erftypes.h 2 include_HEADERS = libtrace.h dagformat.h lt_inttypes.h daglegacy.h rt_protocol.h erftypes.h libtrace_parallel.h 3 3 4 AM_CFLAGS=@LIBCFLAGS@ @CFLAG_VISIBILITY@ 5 AM_CXXFLAGS=@LIBCXXFLAGS@ @CFLAG_VISIBILITY@ 4 AM_CFLAGS=@LIBCFLAGS@ @CFLAG_VISIBILITY@ -pthread 5 AM_CXXFLAGS=@LIBCXXFLAGS@ @CFLAG_VISIBILITY@ -pthread 6 6 7 7 extra_DIST = format_template.c 8 NATIVEFORMATS=format_linux .c8 NATIVEFORMATS=format_linux_common.c format_linux_ring.c format_linux_int.c format_linux_common.h 9 9 BPFFORMATS=format_bpf.c 10 10 … … 29 29 NATIVEFORMATS+= format_dpdk.c 30 30 # So we also make libtrace.mk in dpdk otherwise automake tries to expand 31 # it to early which I cannot seem to stop unless we use a path that31 # it too early which I cannot seem to stop unless we use a path that 32 32 # doesn't exist currently 33 33 export RTE_SDK=@RTE_SDK@ … … 44 44 endif 45 45 46 libtrace_la_SOURCES = trace.c common.h \46 libtrace_la_SOURCES = trace.c trace_parallel.c common.h \ 47 47 format_erf.c format_pcap.c format_legacy.c \ 48 48 format_rt.c format_helper.c format_helper.h format_pcapfile.c \ … … 57 57 $(DAGSOURCE) format_erf.h \ 58 58 $(BPFJITSOURCE) \ 59 libtrace_arphrd.h 59 libtrace_arphrd.h \ 60 data-struct/ring_buffer.c data-struct/vector.c \ 61 data-struct/message_queue.c data-struct/deque.c \ 62 data-struct/sliding_window.c data-struct/object_cache.c \ 63 data-struct/linked_list.c hash_toeplitz.c combiner_ordered.c \ 64 combiner_sorted.c combiner_unordered.c \ 65 pthread_spinlock.c pthread_spinlock.h 60 66 61 67 if DAG2_4 -
lib/format_atmhdr.c
r5952ff0 r5ab626a 227 227 NULL, /* get_filtered_packets */ 228 228 NULL, /* get_dropped_packets */ 229 NULL, /* get_ captured_packets */229 NULL, /* get_statistics */ 230 230 NULL, /* get_fd */ 231 231 trace_event_trace, /* trace_event */ 232 232 NULL, /* help */ 233 NULL /* next pointer */ 233 NULL, /* next pointer */ 234 NON_PARALLEL(false) 234 235 }; 235 236 -
lib/format_bpf.c
r08f5060 r5ab626a 610 610 NULL, /* get_filtered_packets */ 611 611 bpf_get_dropped_packets,/* get_dropped_packets */ 612 NULL, /* get_ captured_packets */612 NULL, /* get_statistics */ 613 613 bpf_get_fd, /* get_fd */ 614 614 trace_event_device, /* trace_event */ 615 615 bpf_help, /* help */ 616 NULL 616 NULL, /* next pointer */ 617 NON_PARALLEL(true) 617 618 }; 618 619 #else /* HAVE_DECL_BIOCSETIF */ … … 656 657 bpf_get_framing_length, /* get_framing_length */ 657 658 NULL, /* set_capture_length */ 658 NULL, /* get_received_packets */659 NULL, /* get_received_packets */ 659 660 NULL, /* get_filtered_packets */ 660 NULL, /* get_dropped_packets */661 NULL, /* get_ captured_packets */661 NULL, /* get_dropped_packets */ 662 NULL, /* get_statistics */ 662 663 NULL, /* get_fd */ 663 664 NULL, /* trace_event */ 664 665 bpf_help, /* help */ 665 NULL 666 NULL, /* next pointer */ 667 NON_PARALLEL(true) 666 668 }; 667 669 #endif /* HAVE_DECL_BIOCSETIF */ -
lib/format_dag24.c
rc70f59f r5ab626a 554 554 NULL, /* get_filtered_packets */ 555 555 dag_get_dropped_packets, /* get_dropped_packets */ 556 NULL, /* get_ captured_packets */556 NULL, /* get_statistics */ 557 557 NULL, /* get_fd */ 558 558 trace_event_dag, /* trace_event */ 559 559 dag_help, /* help */ 560 NULL /* next pointer */ 560 NULL, /* next pointer */ 561 NON_PARALLEL(true) 561 562 }; 562 563 -
lib/format_dag25.c
r0054c50 rd420777 79 79 */ 80 80 81 82 81 #define DATA(x) ((struct dag_format_data_t *)x->format_data) 83 82 #define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data) 83 #define STREAM_DATA(x) ((struct dag_per_stream_t *)x->data) 84 84 85 85 #define FORMAT_DATA DATA(libtrace) … … 87 87 88 88 #define DUCK FORMAT_DATA->duck 89 90 #define FORMAT_DATA_HEAD FORMAT_DATA->per_stream->head 91 #define FORMAT_DATA_FIRST ((struct dag_per_stream_t *)FORMAT_DATA_HEAD->data) 92 89 93 static struct libtrace_format_t dag; 90 94 … … 104 108 /* "Global" data that is stored for each DAG output trace */ 105 109 struct dag_format_data_out_t { 106 /* String containing the DAG device name */107 char *device_name;108 110 /* The DAG device being used for writing */ 109 111 struct dag_dev_t *device; … … 118 120 }; 119 121 122 /* Data that is stored against each input stream */ 123 struct dag_per_stream_t { 124 /* DAG stream number */ 125 uint16_t dagstream; 126 /* Pointer to the last unread byte in the DAG memory */ 127 uint8_t *top; 128 /* Pointer to the first unread byte in the DAG memory */ 129 uint8_t *bottom; 130 /* Amount of data processed from the bottom pointer */ 131 uint32_t processed; 132 /* Number of packets seen by the stream */ 133 uint64_t pkt_count; 134 /* Drop count for this particular stream */ 135 uint64_t drops; 136 /* Boolean values to indicate if a particular interface has been seen 137 * or not. This is limited to four interfaces, which is enough to 138 * support all current DAG cards */ 139 uint8_t seeninterface[4]; 140 }; 141 120 142 /* "Global" data that is stored for each DAG input trace */ 121 143 struct dag_format_data_t { 122 123 /* Data required for regular DUCK reporting */ 144 /* DAG device */ 145 struct dag_dev_t *device; 146 /* Boolean flag indicating whether the trace is currently attached */ 147 int stream_attached; 148 /* Data stored against each DAG input stream */ 149 libtrace_list_t *per_stream; 150 151 /* Data required for regular DUCK reporting. 152 * We put this on a new cache line otherwise we have a lot of false 153 * sharing caused by updating the last_pkt. 154 * This should only ever be accessed by the first thread stream, 155 * that includes both read and write operations. 156 */ 124 157 struct { 125 158 /* Timestamp of the last DUCK report */ 126 159 uint32_t last_duck; 127 160 /* The number of seconds between each DUCK report */ 128 161 uint32_t duck_freq; 129 162 /* Timestamp of the last packet read from the DAG card */ 130 163 uint32_t last_pkt; 131 164 /* Dummy trace to ensure DUCK packets are dealt with using the 132 165 * DUCK format functions */ 133 libtrace_t *dummy_duck; 134 } duck; 135 136 /* String containing the DAG device name */ 137 char *device_name; 138 /* The DAG device that we are reading from */ 139 struct dag_dev_t *device; 140 /* The DAG stream that we are reading from */ 141 unsigned int dagstream; 142 /* Boolean flag indicating whether the stream is currently attached */ 143 int stream_attached; 144 /* Pointer to the first unread byte in the DAG memory hole */ 145 uint8_t *bottom; 146 /* Pointer to the last unread byte in the DAG memory hole */ 147 uint8_t *top; 148 /* The amount of data processed thus far from the bottom pointer */ 149 uint32_t processed; 150 /* The number of packets that have been dropped */ 151 uint64_t drops; 152 153 uint8_t seeninterface[4]; 166 libtrace_t *dummy_duck; 167 } duck ALIGN_STRUCT(CACHE_LINE_SIZE); 154 168 }; 155 169 … … 207 221 208 222 /* Initialises the DAG output data structure */ 209 static void dag_init_format_out_data(libtrace_out_t *libtrace) { 210 libtrace->format_data = (struct dag_format_data_out_t *) malloc(sizeof(struct dag_format_data_out_t)); 223 static void dag_init_format_out_data(libtrace_out_t *libtrace) 224 { 225 libtrace->format_data = (struct dag_format_data_out_t *) 226 malloc(sizeof(struct dag_format_data_out_t)); 211 227 // no DUCK on output 212 228 FORMAT_DATA_OUT->stream_attached = 0; 213 229 FORMAT_DATA_OUT->device = NULL; 214 FORMAT_DATA_OUT->device_name = NULL;215 230 FORMAT_DATA_OUT->dagstream = 0; 216 231 FORMAT_DATA_OUT->waiting = 0; … … 219 234 220 235 /* Initialises the DAG input data structure */ 221 static void dag_init_format_data(libtrace_t *libtrace) { 236 static void dag_init_format_data(libtrace_t *libtrace) 237 { 238 struct dag_per_stream_t stream_data; 239 222 240 libtrace->format_data = (struct dag_format_data_t *) 223 241 malloc(sizeof(struct dag_format_data_t)); 224 242 DUCK.last_duck = 0; 225 226 227 228 FORMAT_DATA->stream_attached = 0; 229 FORMAT_DATA-> drops = 0;230 FORMAT_DATA->device_name = NULL;231 FORMAT_DATA->device = NULL;232 FORMAT_DATA->dagstream = 0; 233 FORMAT_DATA->processed = 0;234 FORMAT_DATA->bottom = NULL;235 FORMAT_DATA->top = NULL;236 memset(FORMAT_DATA->seeninterface, 0, sizeof(FORMAT_DATA->seeninterface));243 DUCK.duck_freq = 0; 244 DUCK.last_pkt = 0; 245 DUCK.dummy_duck = NULL; 246 247 FORMAT_DATA->per_stream = 248 libtrace_list_init(sizeof(stream_data)); 249 assert(FORMAT_DATA->per_stream != NULL); 250 251 /* We'll start with just one instance of stream_data, and we'll 252 * add more later if we need them */ 253 memset(&stream_data, 0, sizeof(stream_data)); 254 libtrace_list_push_back(FORMAT_DATA->per_stream, &stream_data); 237 255 } 238 256 … … 241 259 * 242 260 * NOTE: This function assumes the open_dag_mutex is held by the caller */ 243 static struct dag_dev_t *dag_find_open_device(char *dev_name) { 261 static struct dag_dev_t *dag_find_open_device(char *dev_name) 262 { 244 263 struct dag_dev_t *dag_dev; 245 264 … … 252 271 dag_dev->ref_count ++; 253 272 return dag_dev; 254 255 273 } 256 274 dag_dev = dag_dev->next; 257 275 } 258 276 return NULL; 259 260 261 277 } 262 278 … … 267 283 * 268 284 * NOTE: This function assumes the open_dag_mutex is held by the caller */ 269 static void dag_close_device(struct dag_dev_t *dev) { 285 static void dag_close_device(struct dag_dev_t *dev) 286 { 270 287 /* Need to remove from the device list */ 271 272 288 assert(dev->ref_count == 0); 273 289 … … 290 306 * 291 307 * NOTE: this function should only be called when opening a DAG device for 292 * writing - there is little practical difference between this and the 308 * writing - there is little practical difference between this and the 293 309 * function below that covers the reading case, but we need the output trace 294 * object to report errors properly so the two functions take slightly 310 * object to report errors properly so the two functions take slightly 295 311 * different arguments. This is really lame and there should be a much better 296 312 * way of doing this. 297 313 * 298 * NOTE: This function assumes the open_dag_mutex is held by the caller 314 * NOTE: This function assumes the open_dag_mutex is held by the caller 299 315 */ 300 static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace, char *dev_name) { 316 static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace, 317 char *dev_name) 318 { 301 319 struct stat buf; 302 320 int fd; … … 307 325 trace_set_err_out(libtrace,errno,"stat(%s)",dev_name); 308 326 return NULL; 309 }327 } 310 328 311 329 /* Make sure it is the appropriate type of device */ … … 344 362 * 345 363 * NOTE: this function should only be called when opening a DAG device for 346 * reading - there is little practical difference between this and the 364 * reading - there is little practical difference between this and the 347 365 * function above that covers the writing case, but we need the input trace 348 * object to report errors properly so the two functions take slightly 366 * object to report errors properly so the two functions take slightly 349 367 * different arguments. This is really lame and there should be a much better 350 368 * way of doing this. … … 357 375 358 376 /* Make sure the device exists */ 359 360 361 362 377 if (stat(dev_name, &buf) == -1) { 378 trace_set_err(libtrace,errno,"stat(%s)",dev_name); 379 return NULL; 380 } 363 381 364 382 /* Make sure it is the appropriate type of device */ … … 366 384 /* Try opening the DAG device */ 367 385 if((fd = dag_open(dev_name)) < 0) { 368 369 370 371 386 trace_set_err(libtrace,errno,"Cannot open DAG %s", 387 dev_name); 388 return NULL; 389 } 372 390 } else { 373 391 trace_set_err(libtrace,errno,"Not a valid dag device: %s", 374 375 376 392 dev_name); 393 return NULL; 394 } 377 395 378 396 /* Add the device to our device list - it is just a doubly linked … … 395 413 396 414 /* Creates and initialises a DAG output trace */ 397 static int dag_init_output(libtrace_out_t *libtrace) { 415 static int dag_init_output(libtrace_out_t *libtrace) 416 { 417 /* Upon successful creation, the device name is stored against the 418 * device and free when it is free()d */ 419 char *dag_dev_name = NULL; 398 420 char *scan = NULL; 399 421 struct dag_dev_t *dag_device = NULL; 400 422 int stream = 1; 401 423 402 424 /* XXX I don't know if this is important or not, but this function 403 425 * isn't present in all of the driver releases that this code is … … 409 431 410 432 dag_init_format_out_data(libtrace); 411 /* Grab the mutex while we're likely to be messing with the device 433 /* Grab the mutex while we're likely to be messing with the device 412 434 * list */ 413 435 pthread_mutex_lock(&open_dag_mutex); 414 436 415 437 /* Specific streams are signified using a comma in the libtrace URI, 416 438 * e.g. dag:/dev/dag0,1 refers to stream 1 on the dag0 device. … … 418 440 * If no stream is specified, we will write using stream 1 */ 419 441 if ((scan = strchr(libtrace->uridata,',')) == NULL) { 420 FORMAT_DATA_OUT->device_name = strdup(libtrace->uridata);442 dag_dev_name = strdup(libtrace->uridata); 421 443 } else { 422 FORMAT_DATA_OUT->device_name = 423 (char *)strndup(libtrace->uridata, 444 dag_dev_name = (char *)strndup(libtrace->uridata, 424 445 (size_t)(scan - libtrace->uridata)); 425 446 stream = atoi(++scan); … … 428 449 429 450 /* See if our DAG device is already open */ 430 dag_device = dag_find_open_device( FORMAT_DATA_OUT->device_name);451 dag_device = dag_find_open_device(dag_dev_name); 431 452 432 453 if (dag_device == NULL) { 433 454 /* Device not yet opened - open it ourselves */ 434 dag_device = dag_open_output_device(libtrace, 435 FORMAT_DATA_OUT->device_name); 455 dag_device = dag_open_output_device(libtrace, dag_dev_name); 456 } else { 457 /* Otherwise, just use the existing one */ 458 free(dag_dev_name); 459 dag_dev_name = NULL; 436 460 } 437 461 438 462 /* Make sure we have successfully opened a DAG device */ 439 463 if (dag_device == NULL) { 440 if (FORMAT_DATA_OUT->device_name) { 441 free(FORMAT_DATA_OUT->device_name); 442 FORMAT_DATA_OUT->device_name = NULL; 464 if (dag_dev_name) { 465 free(dag_dev_name); 443 466 } 444 467 pthread_mutex_unlock(&open_dag_mutex); … … 453 476 /* Creates and initialises a DAG input trace */ 454 477 static int dag_init_input(libtrace_t *libtrace) { 478 /* Upon successful creation, the device name is stored against the 479 * device and free when it is free()d */ 480 char *dag_dev_name = NULL; 455 481 char *scan = NULL; 456 482 int stream = 0; … … 458 484 459 485 dag_init_format_data(libtrace); 460 /* Grab the mutex while we're likely to be messing with the device 486 /* Grab the mutex while we're likely to be messing with the device 461 487 * list */ 462 488 pthread_mutex_lock(&open_dag_mutex); 463 464 465 /* Specific streams are signified using a comma in the libtrace URI, 489 490 491 /* DAG cards support multiple streams. In a single threaded capture, 492 * these are specified using a comma in the libtrace URI, 466 493 * e.g. dag:/dev/dag0,2 refers to stream 2 on the dag0 device. 467 494 * 468 * If no stream is specified, we will read from stream 0 */ 495 * If no stream is specified, we will read from stream 0 with 496 * one thread 497 */ 469 498 if ((scan = strchr(libtrace->uridata,',')) == NULL) { 470 FORMAT_DATA->device_name = strdup(libtrace->uridata);499 dag_dev_name = strdup(libtrace->uridata); 471 500 } else { 472 FORMAT_DATA->device_name = (char *)strndup(libtrace->uridata,501 dag_dev_name = (char *)strndup(libtrace->uridata, 473 502 (size_t)(scan - libtrace->uridata)); 474 503 stream = atoi(++scan); 475 504 } 476 505 477 FORMAT_DATA ->dagstream = stream;506 FORMAT_DATA_FIRST->dagstream = stream; 478 507 479 508 /* See if our DAG device is already open */ 480 dag_device = dag_find_open_device( FORMAT_DATA->device_name);509 dag_device = dag_find_open_device(dag_dev_name); 481 510 482 511 if (dag_device == NULL) { 483 512 /* Device not yet opened - open it ourselves */ 484 dag_device=dag_open_device(libtrace, FORMAT_DATA->device_name); 513 dag_device = dag_open_device(libtrace, dag_dev_name); 514 } else { 515 /* Otherwise, just use the existing one */ 516 free(dag_dev_name); 517 dag_dev_name = NULL; 485 518 } 486 519 487 520 /* Make sure we have successfully opened a DAG device */ 488 521 if (dag_device == NULL) { 489 if ( FORMAT_DATA->device_name)490 free( FORMAT_DATA->device_name);491 FORMAT_DATA->device_name = NULL;522 if (dag_dev_name) 523 free(dag_dev_name); 524 dag_dev_name = NULL; 492 525 pthread_mutex_unlock(&open_dag_mutex); 493 526 return -1; … … 496 529 FORMAT_DATA->device = dag_device; 497 530 498 /* See Config_Status_API_Programming_Guide.pdf from the Endace Dag Documentation */ 499 /* Check kBooleanAttributeActive is true -- no point capturing on an interface that's disabled 500 501 * The symptom of the port being disabled is that libtrace will appear to hang. 502 */ 531 /* See Config_Status_API_Programming_Guide.pdf from the Endace 532 Dag Documentation */ 533 /* Check kBooleanAttributeActive is true -- no point capturing 534 * on an interface that's disabled 535 * 536 * The symptom of the port being disabled is that libtrace 537 * will appear to hang. */ 503 538 /* Check kBooleanAttributeFault is false */ 504 539 /* Check kBooleanAttributeLocalFault is false */ … … 506 541 /* Check kBooleanAttributePeerLink ? */ 507 542 508 /* Set kBooleanAttributePromisc/kBooleanPromiscuousMode based on libtrace promisc attribute?*/ 543 /* Set kBooleanAttributePromisc/kBooleanPromiscuousMode based 544 on libtrace promisc attribute?*/ 509 545 /* Set kUint32AttributeSnapLength to the snaplength */ 510 546 511 547 pthread_mutex_unlock(&open_dag_mutex); 512 548 return 0; 513 549 } 514 550 515 551 /* Configures a DAG input trace */ 516 552 static int dag_config_input(libtrace_t *libtrace, trace_option_t option, 517 void *data) { 518 char conf_str[4096]; 553 void *data) 554 { 555 char conf_str[4096]; 519 556 switch(option) { 520 case TRACE_OPTION_META_FREQ: 521 /* This option is used to specify the frequency of DUCK 522 * updates */ 523 DUCK.duck_freq = *(int *)data; 524 return 0; 525 case TRACE_OPTION_SNAPLEN: 526 /* Tell the card our new snap length */ 527 snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data); 528 if (dag_configure(FORMAT_DATA->device->fd, 529 conf_str) != 0) { 530 trace_set_err(libtrace, errno, "Failed to configure snaplen on DAG card: %s", libtrace->uridata); 531 return -1; 532 } 533 return 0; 534 case TRACE_OPTION_PROMISC: 535 /* DAG already operates in a promisc fashion */ 536 return -1; 537 case TRACE_OPTION_FILTER: 538 /* We don't yet support pushing filters into DAG 539 * cards */ 540 return -1; 541 case TRACE_OPTION_EVENT_REALTIME: 542 /* Live capture is always going to be realtime */ 557 case TRACE_OPTION_META_FREQ: 558 /* This option is used to specify the frequency of DUCK 559 * updates */ 560 DUCK.duck_freq = *(int *)data; 561 return 0; 562 case TRACE_OPTION_SNAPLEN: 563 /* Tell the card our new snap length */ 564 snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data); 565 if (dag_configure(FORMAT_DATA->device->fd, 566 conf_str) != 0) { 567 trace_set_err(libtrace, errno, "Failed to configure " 568 "snaplen on DAG card: %s", 569 libtrace->uridata); 543 570 return -1; 544 } 571 } 572 return 0; 573 case TRACE_OPTION_PROMISC: 574 /* DAG already operates in a promisc fashion */ 575 return -1; 576 case TRACE_OPTION_FILTER: 577 /* We don't yet support pushing filters into DAG 578 * cards */ 579 return -1; 580 case TRACE_OPTION_EVENT_REALTIME: 581 /* Live capture is always going to be realtime */ 582 return -1; 583 case TRACE_OPTION_HASHER: 584 /* Lets just say we did this, it's currently still up to 585 * the user to configure this correctly. */ 586 return 0; 587 } 545 588 return -1; 546 589 } 547 590 548 591 /* Starts a DAG output trace */ 549 static int dag_start_output(libtrace_out_t *libtrace) { 592 static int dag_start_output(libtrace_out_t *libtrace) 593 { 550 594 struct timeval zero, nopoll; 551 595 … … 555 599 556 600 /* Attach and start the DAG stream */ 557 558 601 if (dag_attach_stream(FORMAT_DATA_OUT->device->fd, 559 602 FORMAT_DATA_OUT->dagstream, 0, 4 * 1024 * 1024) < 0) { … … 570 613 571 614 /* We don't want the dag card to do any sleeping */ 572 573 615 dag_set_stream_poll(FORMAT_DATA_OUT->device->fd, 574 616 FORMAT_DATA_OUT->dagstream, 0, &zero, … … 578 620 } 579 621 580 /* Starts a DAG input trace */ 581 static int dag_start_input(libtrace_t *libtrace) {582 583 622 static int dag_start_input_stream(libtrace_t *libtrace, 623 struct dag_per_stream_t * stream) { 624 struct timeval zero, nopoll; 625 uint8_t *top, *bottom, *starttop; 584 626 top = bottom = NULL; 585 627 586 628 zero.tv_sec = 0; 587 588 629 zero.tv_usec = 10000; 630 nopoll = zero; 589 631 590 632 /* Attach and start the DAG stream */ 591 633 if (dag_attach_stream(FORMAT_DATA->device->fd, 592 FORMAT_DATA->dagstream, 0, 0) < 0) { 593 trace_set_err(libtrace, errno, "Cannot attach DAG stream"); 594 return -1; 595 } 634 stream->dagstream, 0, 0) < 0) { 635 trace_set_err(libtrace, errno, "Cannot attach DAG stream #%u", 636 stream->dagstream); 637 return -1; 638 } 596 639 597 640 if (dag_start_stream(FORMAT_DATA->device->fd, 598 FORMAT_DATA->dagstream) < 0) { 599 trace_set_err(libtrace, errno, "Cannot start DAG stream"); 600 return -1; 601 } 641 stream->dagstream) < 0) { 642 trace_set_err(libtrace, errno, "Cannot start DAG stream #%u", 643 stream->dagstream); 644 return -1; 645 } 602 646 FORMAT_DATA->stream_attached = 1; 603 647 604 648 /* We don't want the dag card to do any sleeping */ 605 dag_set_stream_poll(FORMAT_DATA->device->fd, 606 FORMAT_DATA->dagstream, 0, &zero, 607 &nopoll); 649 if (dag_set_stream_poll(FORMAT_DATA->device->fd, 650 stream->dagstream, 0, &zero, 651 &nopoll) < 0) { 652 trace_set_err(libtrace, errno, 653 "dag_set_stream_poll failed!"); 654 return -1; 655 } 608 656 609 657 starttop = dag_advance_stream(FORMAT_DATA->device->fd, 610 FORMAT_DATA->dagstream,611 658 stream->dagstream, 659 &bottom); 612 660 613 661 /* Should probably flush the memory hole now */ … … 616 664 bottom += (starttop - bottom); 617 665 top = dag_advance_stream(FORMAT_DATA->device->fd, 618 FORMAT_DATA->dagstream,619 &bottom);620 } 621 FORMAT_DATA->top = top;622 FORMAT_DATA->bottom = bottom;623 FORMAT_DATA->processed = 0;624 FORMAT_DATA->drops = 0;666 stream->dagstream, 667 &bottom); 668 } 669 stream->top = top; 670 stream->bottom = bottom; 671 stream->processed = 0; 672 stream->drops = 0; 625 673 626 674 return 0; 675 676 } 677 678 /* Starts a DAG input trace */ 679 static int dag_start_input(libtrace_t *libtrace) 680 { 681 return dag_start_input_stream(libtrace, FORMAT_DATA_FIRST); 682 } 683 684 static int dag_pstart_input(libtrace_t *libtrace) 685 { 686 char *scan, *tok; 687 uint16_t stream_count = 0, max_streams; 688 int iserror = 0; 689 struct dag_per_stream_t stream_data; 690 691 /* Check we aren't trying to create more threads than the DAG card can 692 * handle */ 693 max_streams = dag_rx_get_stream_count(FORMAT_DATA->device->fd); 694 if (libtrace->perpkt_thread_count > max_streams) { 695 trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT, 696 "trying to create too many threads (max is %u)", 697 max_streams); 698 iserror = 1; 699 goto cleanup; 700 } 701 702 /* Get the stream names from the uri */ 703 if ((scan = strchr(libtrace->uridata, ',')) == NULL) { 704 trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT, 705 "format uri doesn't specify the DAG streams"); 706 iserror = 1; 707 goto cleanup; 708 } 709 710 scan++; 711 712 tok = strtok(scan, ","); 713 while (tok != NULL) { 714 /* Ensure we haven't specified too many streams */ 715 if (stream_count >= libtrace->perpkt_thread_count) { 716 trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT, 717 "format uri specifies too many streams. " 718 "Max is %u", max_streams); 719 iserror = 1; 720 goto cleanup; 721 } 722 723 /* Save the stream details */ 724 if (stream_count == 0) { 725 /* Special case where we update the existing stream 726 * data structure */ 727 FORMAT_DATA_FIRST->dagstream = (uint16_t)atoi(tok); 728 } else { 729 memset(&stream_data, 0, sizeof(stream_data)); 730 stream_data.dagstream = (uint16_t)atoi(tok); 731 libtrace_list_push_back(FORMAT_DATA->per_stream, 732 &stream_data); 733 } 734 735 stream_count++; 736 tok = strtok(NULL, ","); 737 } 738 739 FORMAT_DATA->stream_attached = 1; 740 741 cleanup: 742 if (iserror) { 743 return -1; 744 } else { 745 return 0; 746 } 627 747 } 628 748 629 749 /* Pauses a DAG output trace */ 630 static int dag_pause_output(libtrace_out_t *libtrace) {631 750 static int dag_pause_output(libtrace_out_t *libtrace) 751 { 632 752 /* Stop and detach the stream */ 633 753 if (dag_stop_stream(FORMAT_DATA_OUT->device->fd, 634 FORMAT_DATA_OUT->dagstream) < 0) {754 FORMAT_DATA_OUT->dagstream) < 0) { 635 755 trace_set_err_out(libtrace, errno, "Could not stop DAG stream"); 636 756 return -1; 637 757 } 638 758 if (dag_detach_stream(FORMAT_DATA_OUT->device->fd, 639 FORMAT_DATA_OUT->dagstream) < 0) { 640 trace_set_err_out(libtrace, errno, "Could not detach DAG stream"); 759 FORMAT_DATA_OUT->dagstream) < 0) { 760 trace_set_err_out(libtrace, errno, 761 "Could not detach DAG stream"); 641 762 return -1; 642 763 } … … 646 767 647 768 /* Pauses a DAG input trace */ 648 static int dag_pause_input(libtrace_t *libtrace) { 649 650 /* Stop and detach the stream */ 651 if (dag_stop_stream(FORMAT_DATA->device->fd, 652 FORMAT_DATA->dagstream) < 0) { 653 trace_set_err(libtrace, errno, "Could not stop DAG stream"); 654 return -1; 655 } 656 if (dag_detach_stream(FORMAT_DATA->device->fd, 657 FORMAT_DATA->dagstream) < 0) { 658 trace_set_err(libtrace, errno, "Could not detach DAG stream"); 659 return -1; 660 } 769 static int dag_pause_input(libtrace_t *libtrace) 770 { 771 libtrace_list_node_t *tmp = FORMAT_DATA_HEAD; 772 773 /* Stop and detach each stream */ 774 while (tmp != NULL) { 775 if (dag_stop_stream(FORMAT_DATA->device->fd, 776 STREAM_DATA(tmp)->dagstream) < 0) { 777 trace_set_err(libtrace, errno, 778 "Could not stop DAG stream"); 779 printf("Count not stop DAG stream\n"); 780 return -1; 781 } 782 if (dag_detach_stream(FORMAT_DATA->device->fd, 783 STREAM_DATA(tmp)->dagstream) < 0) { 784 trace_set_err(libtrace, errno, 785 "Could not detach DAG stream"); 786 printf("Count not detach DAG stream\n"); 787 return -1; 788 } 789 790 tmp = tmp->next; 791 } 792 661 793 FORMAT_DATA->stream_attached = 0; 662 794 return 0; 663 795 } 664 796 797 798 665 799 /* Closes a DAG input trace */ 666 static int dag_fin_input(libtrace_t *libtrace) { 800 static int dag_fin_input(libtrace_t *libtrace) 801 { 667 802 /* Need the lock, since we're going to be handling the device list */ 668 803 pthread_mutex_lock(&open_dag_mutex); 669 804 670 805 /* Detach the stream if we are not paused */ 671 806 if (FORMAT_DATA->stream_attached) 672 807 dag_pause_input(libtrace); 673 FORMAT_DATA->device->ref_count 808 FORMAT_DATA->device->ref_count--; 674 809 675 810 /* Close the DAG device if there are no more references to it */ 676 811 if (FORMAT_DATA->device->ref_count == 0) 677 812 dag_close_device(FORMAT_DATA->device); 813 678 814 if (DUCK.dummy_duck) 679 815 trace_destroy_dead(DUCK.dummy_duck); 680 if (FORMAT_DATA->device_name) 681 free(FORMAT_DATA->device_name); 816 817 /* Clear the list */ 818 libtrace_list_deinit(FORMAT_DATA->per_stream); 682 819 free(libtrace->format_data); 683 820 pthread_mutex_unlock(&open_dag_mutex); 684 821 return 0; /* success */ 685 822 } 686 823 687 824 /* Closes a DAG output trace */ 688 static int dag_fin_output(libtrace_out_t *libtrace) { 689 825 static int dag_fin_output(libtrace_out_t *libtrace) 826 { 827 690 828 /* Commit any outstanding traffic in the txbuffer */ 691 829 if (FORMAT_DATA_OUT->waiting) { 692 dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream, 693 FORMAT_DATA_OUT->waiting ); 694 } 695 696 /* Wait until the buffer is nearly clear before exiting the program, 830 dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, 831 FORMAT_DATA_OUT->dagstream, 832 FORMAT_DATA_OUT->waiting ); 833 } 834 835 /* Wait until the buffer is nearly clear before exiting the program, 697 836 * as we will lose packets otherwise */ 698 dag_tx_get_stream_space (FORMAT_DATA_OUT->device->fd,699 FORMAT_DATA_OUT->dagstream,700 dag_get_stream_buffer_size(FORMAT_DATA_OUT->device->fd,701 FORMAT_DATA_OUT->dagstream) - 8702 );837 dag_tx_get_stream_space 838 (FORMAT_DATA_OUT->device->fd, 839 FORMAT_DATA_OUT->dagstream, 840 dag_get_stream_buffer_size(FORMAT_DATA_OUT->device->fd, 841 FORMAT_DATA_OUT->dagstream) - 8); 703 842 704 843 /* Need the lock, since we're going to be handling the device list */ … … 713 852 if (FORMAT_DATA_OUT->device->ref_count == 0) 714 853 dag_close_device(FORMAT_DATA_OUT->device); 715 if (FORMAT_DATA_OUT->device_name)716 free(FORMAT_DATA_OUT->device_name);717 854 free(libtrace->format_data); 718 855 pthread_mutex_unlock(&open_dag_mutex); … … 750 887 751 888 /* Allocate memory for the DUCK data */ 752 753 754 755 756 757 758 759 760 761 889 if (packet->buf_control == TRACE_CTRL_EXTERNAL || 890 !packet->buffer) { 891 packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE); 892 packet->buf_control = TRACE_CTRL_PACKET; 893 if (!packet->buffer) { 894 trace_set_err(libtrace, errno, 895 "Cannot allocate packet buffer"); 896 return -1; 897 } 898 } 762 899 763 900 /* DUCK doesn't have a format header */ 764 765 766 767 768 769 770 771 901 packet->header = 0; 902 packet->payload = packet->buffer; 903 904 /* No need to check if we can get DUCK or not - we're modern 905 * enough so just grab the DUCK info */ 906 if ((ioctl(FORMAT_DATA->device->fd, LIBTRACE_DUCK_IOCTL, 907 (duckinf_t *)packet->payload) < 0)) { 908 trace_set_err(libtrace, errno, "Error using DUCK ioctl"); 772 909 DUCK.duck_freq = 0; 773 774 775 776 910 return -1; 911 } 912 913 packet->type = LIBTRACE_DUCK_VERSION; 777 914 778 915 /* Set the packet's trace to point at a DUCK trace, so that the 779 916 * DUCK format functions will be called on the packet rather than the 780 917 * DAG ones */ 781 if (!DUCK.dummy_duck) 782 DUCK.dummy_duck = trace_create_dead("duck:dummy"); 783 packet->trace = DUCK.dummy_duck; 784 DUCK.last_duck = DUCK.last_pkt; 785 return sizeof(duckinf_t); 918 if (!DUCK.dummy_duck) 919 DUCK.dummy_duck = trace_create_dead("duck:dummy"); 920 packet->trace = DUCK.dummy_duck; 921 DUCK.last_duck = DUCK.last_pkt; 922 packet->error = sizeof(duckinf_t); 923 return sizeof(duckinf_t); 786 924 } 787 925 788 926 /* Determines the amount of data available to read from the DAG card */ 789 static int dag_available(libtrace_t *libtrace) { 790 uint32_t diff = FORMAT_DATA->top - FORMAT_DATA->bottom; 927 static int dag_available(libtrace_t *libtrace, 928 struct dag_per_stream_t *stream_data) 929 { 930 uint32_t diff = stream_data->top - stream_data->bottom; 791 931 792 932 /* If we've processed more than 4MB of data since we last called 793 933 * dag_advance_stream, then we should call it again to allow the 794 934 * space occupied by that 4MB to be released */ 795 if (diff >= dag_record_size && FORMAT_DATA->processed < 4 * 1024 * 1024)935 if (diff >= dag_record_size && stream_data->processed < 4 * 1024 * 1024) 796 936 return diff; 797 937 798 938 /* Update the top and bottom pointers */ 799 FORMAT_DATA->top = dag_advance_stream(FORMAT_DATA->device->fd,800 FORMAT_DATA->dagstream,801 &(FORMAT_DATA->bottom));802 803 if ( FORMAT_DATA->top == NULL) {939 stream_data->top = dag_advance_stream(FORMAT_DATA->device->fd, 940 stream_data->dagstream, 941 &(stream_data->bottom)); 942 943 if (stream_data->top == NULL) { 804 944 trace_set_err(libtrace, errno, "dag_advance_stream failed!"); 805 945 return -1; 806 946 } 807 FORMAT_DATA->processed = 0;808 diff = FORMAT_DATA->top - FORMAT_DATA->bottom;947 stream_data->processed = 0; 948 diff = stream_data->top - stream_data->bottom; 809 949 return diff; 810 950 } 811 951 812 952 /* Returns a pointer to the start of the next complete ERF record */ 813 static dag_record_t *dag_get_record(libtrace_t *libtrace) { 814 dag_record_t *erfptr = NULL; 815 uint16_t size; 816 erfptr = (dag_record_t *)FORMAT_DATA->bottom; 953 static dag_record_t *dag_get_record(struct dag_per_stream_t *stream_data) 954 { 955 dag_record_t *erfptr = NULL; 956 uint16_t size; 957 958 erfptr = (dag_record_t *)stream_data->bottom; 817 959 if (!erfptr) 818 return NULL; 819 size = ntohs(erfptr->rlen); 820 assert( size >= dag_record_size ); 960 return NULL; 961 962 size = ntohs(erfptr->rlen); 963 assert( size >= dag_record_size ); 964 821 965 /* Make certain we have the full packet available */ 822 if (size > ( FORMAT_DATA->top - FORMAT_DATA->bottom))966 if (size > (stream_data->top - stream_data->bottom)) 823 967 return NULL; 824 FORMAT_DATA->bottom += size; 825 FORMAT_DATA->processed += size; 968 969 stream_data->bottom += size; 970 stream_data->processed += size; 826 971 return erfptr; 827 972 } … … 829 974 /* Converts a buffer containing a recently read DAG packet record into a 830 975 * libtrace packet */ 831 static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet, 832 void *buffer, libtrace_rt_types_t rt_type, uint32_t flags) { 833 976 static int dag_prepare_packet_stream(libtrace_t *libtrace, 977 struct dag_per_stream_t *stream_data, 978 libtrace_packet_t *packet, 979 void *buffer, libtrace_rt_types_t rt_type, 980 uint32_t flags) 981 { 834 982 dag_record_t *erfptr; 835 983 836 984 /* If the packet previously owned a buffer that is not the buffer 837 838 985 * that contains the new packet data, we're going to need to free the 986 * old one to avoid memory leaks */ 839 987 if (packet->buffer != buffer && 840 988 packet->buf_control == TRACE_CTRL_PACKET) { 841 989 free(packet->buffer); 842 990 } … … 851 999 erfptr = (dag_record_t *)buffer; 852 1000 packet->buffer = erfptr; 853 854 1001 packet->header = erfptr; 1002 packet->type = rt_type; 855 1003 856 1004 if (erfptr->flags.rxerror == 1) { 857 858 859 860 861 862 863 864 1005 /* rxerror means the payload is corrupt - drop the payload 1006 * by tweaking rlen */ 1007 packet->payload = NULL; 1008 erfptr->rlen = htons(erf_get_framing_length(packet)); 1009 } else { 1010 packet->payload = (char*)packet->buffer 1011 + erf_get_framing_length(packet); 1012 } 865 1013 866 1014 if (libtrace->format_data == NULL) { … … 869 1017 870 1018 /* Update the dropped packets counter */ 871 872 /* No loss counter for DSM coloured records - have to use 873 * some other API */ 1019 /* No loss counter for DSM coloured records - have to use some 1020 * other API */ 874 1021 if (erfptr->type == TYPE_DSM_COLOR_ETH) { 875 1022 /* TODO */ 876 1023 } else { 877 1024 /* Use the ERF loss counter */ 878 if (FORMAT_DATA->seeninterface[erfptr->flags.iface] == 0) { 879 FORMAT_DATA->seeninterface[erfptr->flags.iface] = 1; 1025 if (stream_data->seeninterface[erfptr->flags.iface] 1026 == 0) { 1027 stream_data->seeninterface[erfptr->flags.iface] 1028 = 1; 880 1029 } else { 881 FORMAT_DATA->drops += ntohs(erfptr->lctr);1030 stream_data->drops += ntohs(erfptr->lctr); 882 1031 } 883 1032 } 884 1033 885 1034 return 0; 1035 } 1036 1037 static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet, 1038 void *buffer, libtrace_rt_types_t rt_type, 1039 uint32_t flags) 1040 { 1041 return dag_prepare_packet_stream(libtrace, FORMAT_DATA_FIRST, packet, 1042 buffer, rt_type, flags); 886 1043 } 887 1044 … … 896 1053 /* Pushes an ERF record onto the transmit stream */ 897 1054 static int dag_dump_packet(libtrace_out_t *libtrace, 898 dag_record_t *erfptr, unsigned int pad, void *buffer) { 1055 dag_record_t *erfptr, unsigned int pad, 1056 void *buffer) 1057 { 899 1058 int size; 900 1059 901 1060 /* 902 * If we've got 0 bytes waiting in the txqueue, assume that we haven't903 * requested any space yet, and request some, storing the pointer at904 * FORMAT_DATA_OUT->txbuffer.1061 * If we've got 0 bytes waiting in the txqueue, assume that we 1062 * haven't requested any space yet, and request some, storing 1063 * the pointer at FORMAT_DATA_OUT->txbuffer. 905 1064 * 906 1065 * The amount to request is slightly magical at the moment - it's … … 909 1068 */ 910 1069 if (FORMAT_DATA_OUT->waiting == 0) { 911 FORMAT_DATA_OUT->txbuffer = dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd, 912 FORMAT_DATA_OUT->dagstream, 16908288); 1070 FORMAT_DATA_OUT->txbuffer = 1071 dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd, 1072 FORMAT_DATA_OUT->dagstream, 1073 16908288); 913 1074 } 914 1075 … … 917 1078 * are in contiguous memory 918 1079 */ 919 memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting,erfptr,(dag_record_size + pad)); 1080 memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, erfptr, 1081 (dag_record_size + pad)); 920 1082 FORMAT_DATA_OUT->waiting += (dag_record_size + pad); 921 922 923 1083 924 1084 /* … … 927 1087 */ 928 1088 size = ntohs(erfptr->rlen)-(dag_record_size + pad); 929 memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting,buffer,size); 1089 memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, buffer, 1090 size); 930 1091 FORMAT_DATA_OUT->waiting += size; 931 1092 … … 936 1097 * case there is still data in the buffer at program exit. 937 1098 */ 938 939 1099 if (FORMAT_DATA_OUT->waiting >= 16*1024*1024) { 940 FORMAT_DATA_OUT->txbuffer = dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream, 941 FORMAT_DATA_OUT->waiting ); 1100 FORMAT_DATA_OUT->txbuffer = 1101 dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, 1102 FORMAT_DATA_OUT->dagstream, 1103 FORMAT_DATA_OUT->waiting); 942 1104 FORMAT_DATA_OUT->waiting = 0; 943 1105 } 944 1106 945 1107 return size + pad + dag_record_size; 946 947 1108 } 948 1109 … … 950 1111 * if one is found, false otherwise */ 951 1112 static bool find_compatible_linktype(libtrace_out_t *libtrace, 952 libtrace_packet_t *packet, char *type)953 { 954 //Keep trying to simplify the packet until we can find955 //something we can do with it1113 libtrace_packet_t *packet, char *type) 1114 { 1115 /* Keep trying to simplify the packet until we can find 1116 * something we can do with it */ 956 1117 957 1118 do { 958 *type =libtrace_to_erf_type(trace_get_link_type(packet));959 960 / / Success1119 *type = libtrace_to_erf_type(trace_get_link_type(packet)); 1120 1121 /* Success */ 961 1122 if (*type != (char)-1) 962 1123 return true; … … 964 1125 if (!demote_packet(packet)) { 965 1126 trace_set_err_out(libtrace, 966 TRACE_ERR_NO_CONVERSION,967 "No erf type for packet (%i)",968 trace_get_link_type(packet));1127 TRACE_ERR_NO_CONVERSION, 1128 "No erf type for packet (%i)", 1129 trace_get_link_type(packet)); 969 1130 return false; 970 1131 } … … 976 1137 977 1138 /* Writes a packet to the provided DAG output trace */ 978 static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet) {979 /* 980 * This is heavily borrowed from erf_write_packet(). Yes, CnP coding981 * sucks, sorry about that.1139 static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet) 1140 { 1141 /* This is heavily borrowed from erf_write_packet(). Yes, CnP 1142 * coding sucks, sorry about that. 982 1143 */ 983 1144 unsigned int pad = 0; … … 988 1149 989 1150 if(!packet->header) { 990 /* No header, probably an RT packet. Lifted from 1151 /* No header, probably an RT packet. Lifted from 991 1152 * erf_write_packet(). */ 992 1153 return -1; … … 1007 1168 1008 1169 if (packet->type == TRACE_RT_DATA_ERF) { 1009 numbytes = dag_dump_packet(libtrace, 1010 header, 1011 pad, 1012 payload 1013 ); 1014 1170 numbytes = dag_dump_packet(libtrace, header, pad, payload); 1015 1171 } else { 1016 1172 /* Build up a new packet header from the existing header */ 1017 1173 1018 /* Simplify the packet first - if we can't do this, break 1174 /* Simplify the packet first - if we can't do this, break 1019 1175 * early */ 1020 1176 if (!find_compatible_linktype(libtrace,packet,&erf_type)) … … 1035 1191 1036 1192 /* Packet length (rlen includes format overhead) */ 1037 assert(trace_get_capture_length(packet)>0 1038 && trace_get_capture_length(packet)<=65536); 1039 assert(erf_get_framing_length(packet)>0 1040 && trace_get_framing_length(packet)<=65536); 1041 assert(trace_get_capture_length(packet)+erf_get_framing_length(packet)>0 1042 &&trace_get_capture_length(packet)+erf_get_framing_length(packet)<=65536); 1193 assert(trace_get_capture_length(packet) > 0 1194 && trace_get_capture_length(packet) <= 65536); 1195 assert(erf_get_framing_length(packet) > 0 1196 && trace_get_framing_length(packet) <= 65536); 1197 assert(trace_get_capture_length(packet) + 1198 erf_get_framing_length(packet) > 0 1199 && trace_get_capture_length(packet) + 1200 erf_get_framing_length(packet) <= 65536); 1043 1201 1044 1202 erfhdr.rlen = htons(trace_get_capture_length(packet) 1045 + erf_get_framing_length(packet));1203 + erf_get_framing_length(packet)); 1046 1204 1047 1205 … … 1052 1210 1053 1211 /* Write it out */ 1054 numbytes = dag_dump_packet(libtrace, 1055 &erfhdr, 1056 pad, 1057 payload); 1058 1212 numbytes = dag_dump_packet(libtrace, &erfhdr, pad, payload); 1059 1213 } 1060 1214 … … 1066 1220 * If DUCK reporting is enabled, the packet returned may be a DUCK update 1067 1221 */ 1068 static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) { 1069 int size = 0; 1070 struct timeval tv; 1071 dag_record_t *erfptr = NULL; 1222 static int dag_read_packet_stream(libtrace_t *libtrace, 1223 struct dag_per_stream_t *stream_data, 1224 libtrace_thread_t *t, /* Optional */ 1225 libtrace_packet_t *packet) 1226 { 1227 int size = 0; 1228 dag_record_t *erfptr = NULL; 1229 struct timeval tv; 1072 1230 int numbytes = 0; 1073 1231 uint32_t flags = 0; 1074 struct timeval maxwait; 1075 struct timeval pollwait; 1232 struct timeval maxwait, pollwait; 1076 1233 1077 1234 pollwait.tv_sec = 0; … … 1080 1237 maxwait.tv_usec = 250000; 1081 1238 1082 /* Check if we're due for a DUCK report */ 1083 size = dag_get_duckinfo(libtrace, packet); 1084 1085 if (size != 0) 1086 return size; 1239 /* Check if we're due for a DUCK report - only report on the first thread */ 1240 if (stream_data == FORMAT_DATA_FIRST) { 1241 size = dag_get_duckinfo(libtrace, packet); 1242 if (size != 0) 1243 return size; 1244 } 1087 1245 1088 1246 … … 1092 1250 /* If the packet buffer is currently owned by libtrace, free it so 1093 1251 * that we can set the packet to point into the DAG memory hole */ 1094 if (packet->buf_control == TRACE_CTRL_PACKET) { 1095 free(packet->buffer); 1096 packet->buffer = 0; 1097 } 1098 1099 if (dag_set_stream_poll(FORMAT_DATA->device->fd, 1100 FORMAT_DATA->dagstream, sizeof(dag_record_t), &maxwait, 1101 &pollwait) == -1) 1102 { 1252 if (packet->buf_control == TRACE_CTRL_PACKET) { 1253 free(packet->buffer); 1254 packet->buffer = 0; 1255 } 1256 1257 if (dag_set_stream_poll(FORMAT_DATA->device->fd, stream_data->dagstream, 1258 sizeof(dag_record_t), &maxwait, 1259 &pollwait) == -1) { 1103 1260 trace_set_err(libtrace, errno, "dag_set_stream_poll"); 1104 1261 return -1; 1105 1262 } 1106 1107 1263 1108 1264 /* Grab a full ERF record */ 1109 1265 do { 1110 numbytes = dag_available(libtrace );1266 numbytes = dag_available(libtrace, stream_data); 1111 1267 if (numbytes < 0) 1112 1268 return numbytes; 1113 1269 if (numbytes < dag_record_size) { 1270 /* Check the message queue if we have one to check */ 1271 if (t != NULL && 1272 libtrace_message_queue_count(&t->messages) > 0) 1273 return -2; 1274 1114 1275 if (libtrace_halt) 1115 1276 return 0; … … 1117 1278 continue; 1118 1279 } 1119 erfptr = dag_get_record( libtrace);1280 erfptr = dag_get_record(stream_data); 1120 1281 } while (erfptr == NULL); 1121 1282 1283 packet->trace = libtrace; 1122 1284 /* Prepare the libtrace packet */ 1123 if (dag_prepare_packet(libtrace, packet, erfptr, TRACE_RT_DATA_ERF, 1124 flags)) 1125 return -1; 1126 1127 /* Update the DUCK timer */ 1128 tv = trace_get_timeval(packet); 1129 DUCK.last_pkt = tv.tv_sec; 1130 1131 return packet->payload ? htons(erfptr->rlen) : 1132 erf_get_framing_length(packet); 1285 if (dag_prepare_packet_stream(libtrace, stream_data, packet, erfptr, 1286 TRACE_RT_DATA_ERF, flags)) 1287 return -1; 1288 1289 /* Update the DUCK timer - don't re-order this check (false-sharing) */ 1290 if (stream_data == FORMAT_DATA_FIRST && DUCK.duck_freq != 0) { 1291 tv = trace_get_timeval(packet); 1292 DUCK.last_pkt = tv.tv_sec; 1293 } 1294 1295 packet->error = packet->payload ? htons(erfptr->rlen) : 1296 erf_get_framing_length(packet); 1297 1298 return packet->error; 1299 } 1300 1301 static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) 1302 { 1303 return dag_read_packet_stream(libtrace, FORMAT_DATA_FIRST, NULL, packet); 1304 } 1305 1306 static int dag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t, 1307 libtrace_packet_t **packets, size_t nb_packets) 1308 { 1309 int ret; 1310 size_t read_packets = 0; 1311 int numbytes = 0; 1312 1313 struct dag_per_stream_t *stream_data = 1314 (struct dag_per_stream_t *)t->format_data; 1315 1316 /* Read as many packets as we can, but read atleast one packet */ 1317 do { 1318 ret = dag_read_packet_stream(libtrace, stream_data, t, 1319 packets[read_packets]); 1320 if (ret < 0) 1321 return ret; 1322 1323 read_packets++; 1324 1325 /* Make sure we don't read too many packets..! */ 1326 if (read_packets >= nb_packets) 1327 break; 1328 1329 numbytes = dag_available(libtrace, stream_data); 1330 } while (numbytes >= dag_record_size); 1331 1332 return read_packets; 1133 1333 } 1134 1334 … … 1138 1338 */ 1139 1339 static libtrace_eventobj_t trace_event_dag(libtrace_t *libtrace, 1140 libtrace_packet_t *packet) { 1141 libtrace_eventobj_t event = {0,0,0.0,0}; 1340 libtrace_packet_t *packet) 1341 { 1342 libtrace_eventobj_t event = {0,0,0.0,0}; 1142 1343 dag_record_t *erfptr = NULL; 1143 1344 int numbytes; … … 1158 1359 } 1159 1360 1160 if (dag_set_stream_poll(FORMAT_DATA->device->fd, 1161 FORMAT_DATA->dagstream, 0, &minwait, 1162 &minwait) == -1) 1163 { 1361 if (dag_set_stream_poll(FORMAT_DATA->device->fd, 1362 FORMAT_DATA_FIRST->dagstream, 0, &minwait, 1363 &minwait) == -1) { 1164 1364 trace_set_err(libtrace, errno, "dag_set_stream_poll"); 1165 1365 event.type = TRACE_EVENT_TERMINATE; … … 1170 1370 erfptr = NULL; 1171 1371 numbytes = 0; 1172 1372 1173 1373 /* Need to call dag_available so that the top pointer will get 1174 1374 * updated, otherwise we'll never see any data! */ 1175 numbytes = dag_available(libtrace );1176 1177 /* May as well not bother calling dag_get_record if 1375 numbytes = dag_available(libtrace, FORMAT_DATA_FIRST); 1376 1377 /* May as well not bother calling dag_get_record if 1178 1378 * dag_available suggests that there's no data */ 1179 1379 if (numbytes != 0) 1180 erfptr = dag_get_record( libtrace);1380 erfptr = dag_get_record(FORMAT_DATA_FIRST); 1181 1381 if (erfptr == NULL) { 1182 1382 /* No packet available - sleep for a very short time */ 1183 1383 if (libtrace_halt) { 1184 1384 event.type = TRACE_EVENT_TERMINATE; 1185 } else { 1385 } else { 1186 1386 event.type = TRACE_EVENT_SLEEP; 1187 1387 event.seconds = 0.0001; … … 1189 1389 break; 1190 1390 } 1191 if (dag_prepare_packet (libtrace, packet, erfptr,1192 TRACE_RT_DATA_ERF, flags)) {1391 if (dag_prepare_packet_stream(libtrace, FORMAT_DATA_FIRST, packet, 1392 erfptr, TRACE_RT_DATA_ERF, flags)) { 1193 1393 event.type = TRACE_EVENT_TERMINATE; 1194 1394 break; … … 1196 1396 1197 1397 1198 event.size = trace_get_capture_length(packet) + 1199 1200 1398 event.size = trace_get_capture_length(packet) + 1399 trace_get_framing_length(packet); 1400 1201 1401 /* XXX trace_read_packet() normally applies the following 1202 1402 * config options for us, but this function is called via … … 1204 1404 1205 1405 if (libtrace->filter) { 1206 int filtret = trace_apply_filter(libtrace->filter, 1207 packet);1406 int filtret = trace_apply_filter(libtrace->filter, 1407 packet); 1208 1408 if (filtret == -1) { 1209 1409 trace_set_err(libtrace, TRACE_ERR_BAD_FILTER, 1210 1410 "Bad BPF Filter"); 1211 1411 event.type = TRACE_EVENT_TERMINATE; 1212 1412 break; … … 1219 1419 * a sleep event in this case, like we used to 1220 1420 * do! */ 1221 1421 libtrace->filtered_packets ++; 1222 1422 trace_clear_cache(packet); 1223 1423 continue; 1224 1424 } 1225 1425 1226 1426 event.type = TRACE_EVENT_PACKET; 1227 1427 } else { … … 1236 1436 trace_set_capture_length(packet, libtrace->snaplen); 1237 1437 } 1238 1438 libtrace->accepted_packets ++; 1239 1439 break; 1240 } while 1440 } while(1); 1241 1441 1242 1442 return event; 1243 1443 } 1244 1444 1245 /* Gets the number of dropped packets */ 1246 static uint64_t dag_get_dropped_packets(libtrace_t *trace) { 1247 if (trace->format_data == NULL) 1248 return (uint64_t)-1; 1249 return DATA(trace)->drops; 1445 static void dag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat) 1446 { 1447 libtrace_list_node_t *tmp; 1448 assert(stat && libtrace); 1449 tmp = FORMAT_DATA_HEAD; 1450 1451 /* Dropped packets */ 1452 stat->dropped_valid = 1; 1453 stat->dropped = 0; 1454 while (tmp != NULL) { 1455 stat->dropped += STREAM_DATA(tmp)->drops; 1456 tmp = tmp->next; 1457 } 1458 1459 } 1460 1461 static void dag_get_thread_statisitics(libtrace_t *libtrace, libtrace_thread_t *t, 1462 libtrace_stat_t *stat) { 1463 struct dag_per_stream_t *stream_data = t->format_data; 1464 assert(stat && libtrace); 1465 1466 stat->dropped_valid = 1; 1467 stat->dropped = stream_data->drops; 1468 1469 stat->filtered_valid = 1; 1470 stat->filtered = 0; 1250 1471 } 1251 1472 1252 1473 /* Prints some semi-useful help text about the DAG format module */ 1253 1474 static void dag_help(void) { 1254 printf("dag format module: $Revision: 1755 $\n"); 1255 printf("Supported input URIs:\n"); 1256 printf("\tdag:/dev/dagn\n"); 1257 printf("\n"); 1258 printf("\te.g.: dag:/dev/dag0\n"); 1259 printf("\n"); 1260 printf("Supported output URIs:\n"); 1261 printf("\tnone\n"); 1262 printf("\n"); 1475 printf("dag format module: $Revision: 1755 $\n"); 1476 printf("Supported input URIs:\n"); 1477 printf("\tdag:/dev/dagn\n"); 1478 printf("\n"); 1479 printf("\te.g.: dag:/dev/dag0\n"); 1480 printf("\n"); 1481 printf("Supported output URIs:\n"); 1482 printf("\tnone\n"); 1483 printf("\n"); 1484 } 1485 1486 static int dag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t, 1487 bool reader) 1488 { 1489 struct dag_per_stream_t *stream_data; 1490 libtrace_list_node_t *node; 1491 1492 if (reader && t->type == THREAD_PERPKT) { 1493 fprintf(stderr, "t%u: registered reader thread", t->perpkt_num); 1494 node = libtrace_list_get_index(FORMAT_DATA->per_stream, 1495 t->perpkt_num); 1496 if (node == NULL) { 1497 trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, 1498 "Too few streams supplied for the" 1499 " number of threads lanuched"); 1500 return -1; 1501 } 1502 stream_data = node->data; 1503 1504 /* Pass the per thread data to the thread */ 1505 t->format_data = stream_data; 1506 1507 /* Attach and start the DAG stream */ 1508 printf("t%u: starting and attaching stream #%u\n", 1509 t->perpkt_num, stream_data->dagstream); 1510 if (dag_start_input_stream(libtrace, stream_data) < 0) 1511 return -1; 1512 } 1513 1514 fprintf(stderr, "t%u: registered thread\n", t->perpkt_num); 1515 1516 return 0; 1263 1517 } 1264 1518 1265 1519 static struct libtrace_format_t dag = { 1266 1267 1268 1520 "dag", 1521 "$Id$", 1522 TRACE_FORMAT_ERF, 1269 1523 dag_probe_filename, /* probe filename */ 1270 1524 NULL, /* probe magic */ 1271 1272 1273 1274 1525 dag_init_input, /* init_input */ 1526 dag_config_input, /* config_input */ 1527 dag_start_input, /* start_input */ 1528 dag_pause_input, /* pause_input */ 1275 1529 dag_init_output, /* init_output */ 1276 1530 NULL, /* config_output */ 1277 1531 dag_start_output, /* start_output */ 1278 1532 dag_fin_input, /* fin_input */ 1279 1533 dag_fin_output, /* fin_output */ 1280 1281 1534 dag_read_packet, /* read_packet */ 1535 dag_prepare_packet, /* prepare_packet */ 1282 1536 NULL, /* fin_packet */ 1283 1537 dag_write_packet, /* write_packet */ 1284 1285 1286 1287 1288 1289 1538 erf_get_link_type, /* get_link_type */ 1539 erf_get_direction, /* get_direction */ 1540 erf_set_direction, /* set_direction */ 1541 erf_get_erf_timestamp, /* get_erf_timestamp */ 1542 NULL, /* get_timeval */ 1543 NULL, /* get_seconds */ 1290 1544 NULL, /* get_timespec */ 1291 1292 1293 1294 1295 1296 1297 1545 NULL, /* seek_erf */ 1546 NULL, /* seek_timeval */ 1547 NULL, /* seek_seconds */ 1548 erf_get_capture_length, /* get_capture_length */ 1549 erf_get_wire_length, /* get_wire_length */ 1550 erf_get_framing_length, /* get_framing_length */ 1551 erf_set_capture_length, /* set_capture_length */ 1298 1552 NULL, /* get_received_packets */ 1299 1553 NULL, /* get_filtered_packets */ 1300 dag_get_dropped_packets, /* get_dropped_packets */ 1301 NULL, /* get_captured_packets */ 1302 NULL, /* get_fd */ 1303 trace_event_dag, /* trace_event */ 1304 dag_help, /* help */ 1305 NULL /* next pointer */ 1554 NULL, /* get_dropped_packets */ 1555 dag_get_statistics, /* get_statistics */ 1556 NULL, /* get_fd */ 1557 trace_event_dag, /* trace_event */ 1558 dag_help, /* help */ 1559 NULL, /* next pointer */ 1560 {true, 0}, /* live packet capture, thread limit TBD */ 1561 dag_pstart_input, 1562 dag_pread_packets, 1563 dag_pause_input, 1564 NULL, 1565 dag_pregister_thread, 1566 NULL, 1567 dag_get_thread_statisitics /* get thread stats */ 1306 1568 }; 1307 1569 1308 void dag_constructor(void) { 1570 void dag_constructor(void) 1571 { 1309 1572 register_format(&dag); 1310 1573 } -
lib/format_dpdk.c
rb585975 r773a2a3 3 3 * This file is part of libtrace 4 4 * 5 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton, 5 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton, 6 6 * New Zealand. 7 7 * 8 * Author: Richard Sanger 9 * 8 * Author: Richard Sanger 9 * 10 10 * All rights reserved. 11 11 * 12 * This code has been developed by the University of Waikato WAND 12 * This code has been developed by the University of Waikato WAND 13 13 * research group. For further information please see http://www.wand.net.nz/ 14 14 * … … 36 36 * Intel Data Plane Development Kit is a LIVE capture format. 37 37 * 38 * This format also supports writing which will write packets out to the 39 * network as a form of packet replay. This should not be confused with the 40 * RT protocol which is intended to transfer captured packet records between 38 * This format also supports writing which will write packets out to the 39 * network as a form of packet replay. This should not be confused with the 40 * RT protocol which is intended to transfer captured packet records between 41 41 * RT-speaking programs. 42 42 */ 43 44 #define _GNU_SOURCE 43 45 44 46 #include "config.h" … … 47 49 #include "format_helper.h" 48 50 #include "libtrace_arphrd.h" 51 #include "hash_toeplitz.h" 49 52 50 53 #ifdef HAVE_INTTYPES_H … … 59 62 #include <endian.h> 60 63 #include <string.h> 64 65 #if HAVE_LIBNUMA 66 #include <numa.h> 67 #endif 61 68 62 69 /* We can deal with any minor differences by checking the RTE VERSION … … 151 158 #include <rte_mempool.h> 152 159 #include <rte_mbuf.h> 153 154 /* The default size of memory buffers to use - This is the max size of standard 160 #include <rte_launch.h> 161 #include <rte_lcore.h> 162 #include <rte_per_lcore.h> 163 #include <rte_cycles.h> 164 #include <pthread.h> 165 #ifdef __FreeBSD__ 166 #include <pthread_np.h> 167 #endif 168 169 170 /* The default size of memory buffers to use - This is the max size of standard 155 171 * ethernet packet less the size of the MAC CHECKSUM */ 156 172 #define RX_MBUF_SIZE 1514 157 173 158 /* The minimum number of memory buffers per queue tx or rx. Search for 174 /* The minimum number of memory buffers per queue tx or rx. Search for 159 175 * _MIN_RING_DESC in DPDK. The largest minimum is 64 for 10GBit cards. 160 176 */ … … 174 190 #define NB_TX_MBUF 1024 175 191 176 /* The size of the PCI blacklist needs to be big enough to contain 192 /* The size of the PCI blacklist needs to be big enough to contain 177 193 * every PCI device address (listed by lspci every bus:device.function tuple). 178 194 */ … … 181 197 /* The maximum number of characters the mempool name can be */ 182 198 #define MEMPOOL_NAME_LEN 20 199 200 /* For single threaded libtrace we read packets as a batch/burst 201 * this is the maximum size of said burst */ 202 #define BURST_SIZE 50 183 203 184 204 #define MBUF(x) ((struct rte_mbuf *) x) … … 186 206 #define MBUF_PKTDATA(x) ((char *) x + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM) 187 207 #define FORMAT(x) ((struct dpdk_format_data_t*)(x->format_data)) 208 #define PERPKT_FORMAT(x) ((struct dpdk_per_lcore_t*)(x->format_data)) 209 210 #define FORMAT_DATA_HEAD(x) FORMAT(x)->per_stream->head 211 #define FORMAT_DATA_FIRST(x) ((dpdk_per_stream_t *)FORMAT_DATA_HEAD(x)->data) 212 188 213 #define TV_TO_NS(tv) ((uint64_t) tv.tv_sec*1000000000ull + \ 189 214 (uint64_t) tv.tv_usec*1000ull) 190 215 #define TS_TO_NS(ts) ((uint64_t) ts.tv_sec*1000000000ull + \ 191 216 (uint64_t) ts.tv_nsec) 192 217 193 218 #if RTE_PKTMBUF_HEADROOM != 128 194 219 #warning "RTE_PKT_MBUF_HEADROOM is not set to the default value of 128 - " \ 195 196 220 "any libtrace instance processing these packet must be have the" \ 221 "same RTE_PKTMBUF_HEADROOM set" 197 222 #endif 198 223 199 224 /* ~~~~~~~~~~~~~~~~~~~~~~ Advance settings ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 200 * THESE MAY REQUIRE MODIFICATIONS TO INTEL DPDK 201 * 225 * THESE MAY REQUIRE MODIFICATIONS TO INTEL DPDK 226 * 202 227 * Make sure you understand what these are doing before enabling them. 203 228 * They might make traces incompatable with other builds etc. 204 * 229 * 205 230 * These are also included to show how to do somethings which aren't 206 231 * obvious in the DPDK documentation. 207 232 */ 208 233 209 /* Print verbose messages to std out*/234 /* Print verbose messages to stderr */ 210 235 #define DEBUG 0 211 236 212 /* Use clock_gettime() for nanosecond resolution rather than gettimeofday() 213 * only turn on if you know clock_gettime is a vsyscall on your system 237 /* Use clock_gettime() for nanosecond resolution rather than gettimeofday() 238 * only turn on if you know clock_gettime is a vsyscall on your system 214 239 * overwise could be a large overhead. Again gettimeofday() should be 215 240 * vsyscall also if it's not you should seriously consider updating your 216 241 * kernel. 217 242 */ 218 #ifdef HAVE_ LIBRT243 #ifdef HAVE_CLOCK_GETTIME 219 244 /* You can turn this on (set to 1) to prefer clock_gettime */ 220 #define USE_CLOCK_GETTIME 0245 #define USE_CLOCK_GETTIME 1 221 246 #else 222 /* DON T CHANGE THIS !!! */247 /* DON'T CHANGE THIS !!! */ 223 248 #define USE_CLOCK_GETTIME 0 224 249 #endif … … 229 254 * hence writing out a port such as int: ring: and dpdk: assumes there 230 255 * is no checksum and will attempt to write the checksum as part of the 231 * packet 256 * packet 232 257 */ 233 258 #define GET_MAC_CRC_CHECKSUM 0 234 259 235 260 /* This requires a modification of the pmd drivers (inside Intel DPDK) 261 * TODO this requires updating (packet sizes are wrong TS most likely also) 236 262 */ 237 263 #define HAS_HW_TIMESTAMPS_82580 0 … … 244 270 #endif 245 271 272 static pthread_mutex_t dpdk_lock = PTHREAD_MUTEX_INITIALIZER; 273 /* Memory pools Per NUMA node */ 274 static struct rte_mempool * mem_pools[4][RTE_MAX_LCORE] = {{0}}; 275 246 276 /* As per Intel 82580 specification - mismatch in 82580 datasheet 247 277 * it states ts is stored in Big Endian, however its actually Little */ 248 278 struct hw_timestamp_82580 { 249 250 279 uint64_t reserved; 280 uint64_t timestamp; /* Little Endian only lower 40 bits are valid */ 251 281 }; 252 282 253 283 enum paused_state { 254 255 256 284 DPDK_NEVER_STARTED, 285 DPDK_RUNNING, 286 DPDK_PAUSED, 257 287 }; 288 289 struct dpdk_per_stream_t 290 { 291 uint16_t queue_id; 292 uint64_t ts_last_sys; /* System timestamp of our most recent packet in nanoseconds */ 293 struct rte_mempool *mempool; 294 int lcore; 295 #if HAS_HW_TIMESTAMPS_82580 296 /* Timestamping only relevent to RX */ 297 uint64_t ts_first_sys; /* Sytem timestamp of the first packet in nanoseconds */ 298 uint32_t wrap_count; /* Number of times the NIC clock has wrapped around completely */ 299 #endif 300 } ALIGN_STRUCT(CACHE_LINE_SIZE); 301 302 #if HAS_HW_TIMESTAMPS_82580 303 #define DPDK_EMPTY_STREAM {-1, 0, NULL, -1, 0, 0} 304 #else 305 #define DPDK_EMPTY_STREAM {-1, 0, NULL, -1} 306 #endif 307 308 typedef struct dpdk_per_stream_t dpdk_per_stream_t; 258 309 259 310 /* Used by both input and output however some fields are not used 260 311 * for output */ 261 312 struct dpdk_format_data_t { 262 int8_t promisc; /* promiscuous mode - RX only */ 263 uint8_t port; /* Always 0 we only whitelist a single port - Shared TX & RX */ 264 uint8_t nb_ports; /* Total number of usable ports on system should be 1 */ 265 uint8_t paused; /* See paused_state */ 266 uint16_t queue_id; /* Always 0 we use a single queue - Shared TX & RX */ 267 int snaplen; /* The snap length for the capture - RX only */ 268 /* We always have to setup both rx and tx queues even if we don't want them */ 269 int nb_rx_buf; /* The number of packet buffers in the rx ring */ 270 int nb_tx_buf; /* The number of packet buffers in the tx ring */ 271 struct rte_mempool * pktmbuf_pool; /* Our packet memory pool */ 313 int8_t promisc; /* promiscuous mode - RX only */ 314 uint8_t port; /* Always 0 we only whitelist a single port - Shared TX & RX */ 315 uint8_t nb_ports; /* Total number of usable ports on system should be 1 */ 316 uint8_t paused; /* See paused_state */ 317 uint16_t link_speed; /* Link speed 10,100,1000,10000 etc. */ 318 int snaplen; /* The snap length for the capture - RX only */ 319 /* We always have to setup both rx and tx queues even if we don't want them */ 320 int nb_rx_buf; /* The number of packet buffers in the rx ring */ 321 int nb_tx_buf; /* The number of packet buffers in the tx ring */ 322 int nic_numa_node; /* The NUMA node that the NIC is attached to */ 323 struct rte_mempool * pktmbuf_pool; /* Our packet memory pool */ 272 324 #if DPDK_USE_BLACKLIST 273 325 struct rte_pci_addr blacklist[BLACK_LIST_SIZE]; /* Holds our device blacklist */ 274 326 unsigned int nb_blacklist; /* Number of blacklist items in are valid */ 275 327 #endif 276 char mempool_name[MEMPOOL_NAME_LEN]; /* The name of the mempool that we are using */ 277 #if HAS_HW_TIMESTAMPS_82580 278 /* Timestamping only relevent to RX */ 279 uint64_t ts_first_sys; /* Sytem timestamp of the first packet in nanoseconds */ 280 uint64_t ts_last_sys; /* System timestamp of our most recent packet in nanoseconds */ 281 uint32_t wrap_count; /* Number of times the NIC clock has wrapped around completely */ 282 #endif 328 char mempool_name[MEMPOOL_NAME_LEN]; /* The name of the mempool that we are using */ 329 uint8_t rss_key[40]; // This is the RSS KEY 330 /* To improve single-threaded performance we always batch reading 331 * packets, in a burst, otherwise the parallel library does this for us */ 332 struct rte_mbuf* burst_pkts[BURST_SIZE]; 333 int burst_size; /* The total number read in the burst */ 334 int burst_offset; /* The offset we are into the burst */ 335 336 /* Our parallel streams */ 337 libtrace_list_t *per_stream; 283 338 }; 284 339 285 340 enum dpdk_addt_hdr_flags { 286 287 341 INCLUDES_CHECKSUM = 0x1, 342 INCLUDES_HW_TIMESTAMP = 0x2, /* Used with 82580 driver */ 288 343 }; 289 344 290 /** 345 /** 291 346 * A structure placed in front of the packet where we can store 292 347 * additional information about the given packet. … … 294 349 * | rte_mbuf (pkt) | sizeof(rte_mbuf) 295 350 * +--------------------------+ 296 * | padding | RTE_PKTMBUF_HEADROOM-1-sizeof(dpdk_addt_hdr)297 * +--------------------------+298 351 * | dpdk_addt_hdr | sizeof(dpdk_addt_hdr) 299 352 * +--------------------------+ 300 * | sizeof(dpdk_addt_hdr) | 1 byte301 * +--------------------------+ 353 * | padding | RTE_PKTMBUF_HEADROOM-sizeof(dpdk_addt_hdr) 354 * +--------------------------+ 302 355 * * hw_timestamp_82580 * 16 bytes Optional 303 356 * +--------------------------+ … … 306 359 */ 307 360 struct dpdk_addt_hdr { 308 309 310 311 312 313 361 uint64_t timestamp; 362 uint8_t flags; 363 uint8_t direction; 364 uint8_t reserved1; 365 uint8_t reserved2; 366 uint32_t cap_len; /* The size to say the capture is */ 314 367 }; 315 368 … … 317 370 * We want to blacklist all devices except those on the whitelist 318 371 * (I say list, but yes it is only the one). 319 * 372 * 320 373 * The default behaviour of rte_pci_probe() will map every possible device 321 374 * to its DPDK driver. The DPDK driver will take the ethernet device 322 375 * out of the kernel (i.e. no longer /dev/ethx) and cannot be used. 323 * 324 * So blacklist all devices except the one that we wish to use so that 376 * 377 * So blacklist all devices except the one that we wish to use so that 325 378 * the others can still be used as standard ethernet ports. 326 379 * … … 336 389 337 390 TAILQ_FOREACH(dev, &device_list, next) { 338 if (whitelist != NULL && whitelist->domain == dev->addr.domain 339 340 341 342 391 if (whitelist != NULL && whitelist->domain == dev->addr.domain 392 && whitelist->bus == dev->addr.bus 393 && whitelist->devid == dev->addr.devid 394 && whitelist->function == dev->addr.function) 395 continue; 343 396 if (format_data->nb_blacklist >= sizeof (format_data->blacklist) 344 345 printf("Warning: too many devices to blacklist consider"346 397 / sizeof (format_data->blacklist[0])) { 398 fprintf(stderr, "Warning: too many devices to blacklist consider" 399 " increasing BLACK_LIST_SIZE"); 347 400 break; 348 401 } … … 360 413 char pci_str[20] = {0}; 361 414 snprintf(pci_str, sizeof(pci_str), PCI_PRI_FMT, 362 363 364 365 415 whitelist->domain, 416 whitelist->bus, 417 whitelist->devid, 418 whitelist->function); 366 419 if (rte_eal_devargs_add(RTE_DEVTYPE_WHITELISTED_PCI, pci_str) < 0) { 367 420 return -1; … … 375 428 * Fills in addr, note core is optional and is unchanged if 376 429 * a value for it is not provided. 377 * 430 * 378 431 * i.e. ./libtrace dpdk:0:1:0.0 -> 0:1:0.0 379 * or ./libtrace dpdk:0:1:0.1-2 -> 0:1:0.1 (Using CPU core #2) 432 * or ./libtrace dpdk:0:1:0.1-2 -> 0:1:0.1 (Using CPU core #2) 380 433 */ 381 434 static int parse_pciaddr(char * str, struct rte_pci_addr * addr, long * core) { 382 int matches; 383 assert(str); 384 matches = sscanf(str, "%4"SCNx16":%2"SCNx8":%2"SCNx8".%2"SCNx8"-%ld", 385 &addr->domain, &addr->bus, &addr->devid, &addr->function, core); 386 if (matches >= 4) { 387 return 0; 388 } else { 389 return -1; 390 } 435 int matches; 436 assert(str); 437 matches = sscanf(str, "%4"SCNx16":%2"SCNx8":%2"SCNx8".%2"SCNx8"-%ld", 438 &addr->domain, &addr->bus, &addr->devid, 439 &addr->function, core); 440 if (matches >= 4) { 441 return 0; 442 } else { 443 return -1; 444 } 445 } 446 447 /** 448 * Convert a pci address to the numa node it is 449 * connected to. 450 * 451 * This checks /sys/bus/pci/devices/XXXX:XX:XX.X/numa_node 452 * so we can call it before DPDK 453 * 454 * @return -1 if unknown otherwise a number 0 or higher of the numa node 455 */ 456 static int pci_to_numa(struct rte_pci_addr * dev_addr) { 457 char path[50] = {0}; 458 FILE *file; 459 460 /* Read from the system */ 461 snprintf(path, sizeof(path), "/sys/bus/pci/devices/"PCI_PRI_FMT"/numa_node", 462 dev_addr->domain, 463 dev_addr->bus, 464 dev_addr->devid, 465 dev_addr->function); 466 467 if((file = fopen(path, "r")) != NULL) { 468 int numa_node = -1; 469 fscanf(file, "%d", &numa_node); 470 fclose(file); 471 return numa_node; 472 } 473 return -1; 391 474 } 392 475 … … 395 478 static inline void dump_configuration() 396 479 { 397 struct rte_config * global_config; 398 long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN); 399 400 if (nb_cpu <= 0) { 401 perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core."); 402 nb_cpu = 1; /* fallback to just 1 core */ 403 } 404 if (nb_cpu > RTE_MAX_LCORE) 405 nb_cpu = RTE_MAX_LCORE; 406 407 global_config = rte_eal_get_configuration(); 408 409 if (global_config != NULL) { 410 int i; 411 fprintf(stderr, "Intel DPDK setup\n" 412 "---Version : %s\n" 413 "---Master LCore : %"PRIu32"\n" 414 "---LCore Count : %"PRIu32"\n", 415 rte_version(), 416 global_config->master_lcore, global_config->lcore_count); 417 418 for (i = 0 ; i < nb_cpu; i++) { 419 fprintf(stderr, " ---Core %d : %s\n", i, 420 global_config->lcore_role[i] == ROLE_RTE ? "on" : "off"); 421 } 422 423 const char * proc_type; 424 switch (global_config->process_type) { 425 case RTE_PROC_AUTO: 426 proc_type = "auto"; 427 break; 428 case RTE_PROC_PRIMARY: 429 proc_type = "primary"; 430 break; 431 case RTE_PROC_SECONDARY: 432 proc_type = "secondary"; 433 break; 434 case RTE_PROC_INVALID: 435 proc_type = "invalid"; 436 break; 437 default: 438 proc_type = "something worse than invalid!!"; 439 } 440 fprintf(stderr, "---Process Type : %s\n", proc_type); 441 } 442 443 } 444 #endif 480 struct rte_config * global_config; 481 long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN); 482 483 if (nb_cpu <= 0) { 484 perror("sysconf(_SC_NPROCESSORS_ONLN) failed." 485 " Falling back to the first core."); 486 nb_cpu = 1; /* fallback to just 1 core */ 487 } 488 if (nb_cpu > RTE_MAX_LCORE) 489 nb_cpu = RTE_MAX_LCORE; 490 491 global_config = rte_eal_get_configuration(); 492 493 if (global_config != NULL) { 494 int i; 495 fprintf(stderr, "Intel DPDK setup\n" 496 "---Version : %s\n" 497 "---Master LCore : %"PRIu32"\n" 498 "---LCore Count : %"PRIu32"\n", 499 rte_version(), 500 global_config->master_lcore, global_config->lcore_count); 501 502 for (i = 0 ; i < nb_cpu; i++) { 503 fprintf(stderr, " ---Core %d : %s\n", i, 504 global_config->lcore_role[i] == ROLE_RTE ? "on" : "off"); 505 } 506 507 const char * proc_type; 508 switch (global_config->process_type) { 509 case RTE_PROC_AUTO: 510 proc_type = "auto"; 511 break; 512 case RTE_PROC_PRIMARY: 513 proc_type = "primary"; 514 break; 515 case RTE_PROC_SECONDARY: 516 proc_type = "secondary"; 517 break; 518 case RTE_PROC_INVALID: 519 proc_type = "invalid"; 520 break; 521 default: 522 proc_type = "something worse than invalid!!"; 523 } 524 fprintf(stderr, "---Process Type : %s\n", proc_type); 525 } 526 527 } 528 #endif 529 530 /** 531 * Expects to be called from the master lcore and moves it to the given dpdk id 532 * @param core (zero indexed) If core is on the physical system affinity is bound otherwise 533 * affinity is set to all cores. Must be less than RTE_MAX_LCORE 534 * and not already in use. 535 * @return 0 is successful otherwise -1 on error. 536 */ 537 static inline int dpdk_move_master_lcore(libtrace_t *libtrace, size_t core) { 538 struct rte_config *cfg = rte_eal_get_configuration(); 539 cpu_set_t cpuset; 540 int i; 541 542 assert (core < RTE_MAX_LCORE); 543 assert (rte_get_master_lcore() == rte_lcore_id()); 544 545 if (core == rte_lcore_id()) 546 return 0; 547 548 /* Make sure we are not overwriting someone else */ 549 assert(!rte_lcore_is_enabled(core)); 550 551 /* Move the core */ 552 cfg->lcore_role[rte_lcore_id()] = ROLE_OFF; 553 cfg->lcore_role[core] = ROLE_RTE; 554 lcore_config[core].thread_id = lcore_config[rte_lcore_id()].thread_id; 555 rte_eal_get_configuration()->master_lcore = core; 556 RTE_PER_LCORE(_lcore_id) = core; 557 558 /* Now change the affinity, either mapped to a single core or all accepted */ 559 CPU_ZERO(&cpuset); 560 561 if (lcore_config[core].detected) { 562 CPU_SET(core, &cpuset); 563 } else { 564 for (i = 0; i < RTE_MAX_LCORE; ++i) { 565 if (lcore_config[i].detected) 566 CPU_SET(i, &cpuset); 567 } 568 } 569 570 i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset); 571 if (i != 0) { 572 trace_set_err(libtrace, errno, "pthread_setaffinity_np failed\n"); 573 return -1; 574 } 575 return 0; 576 } 445 577 446 578 /** … … 474 606 static inline int dpdk_init_environment(char * uridata, struct dpdk_format_data_t * format_data, 475 607 char * err, int errlen) { 476 int ret; /* Returned error codes */ 477 struct rte_pci_addr use_addr; /* The only address that we don't blacklist */ 478 char cpu_number[10] = {0}; /* The CPU mask we want to bind to */ 479 char mem_map[20] = {0}; /* The memory name */ 480 long nb_cpu; /* The number of CPUs in the system */ 481 long my_cpu; /* The CPU number we want to bind to */ 608 int ret; /* Returned error codes */ 609 struct rte_pci_addr use_addr; /* The only address that we don't blacklist */ 610 char cpu_number[10] = {0}; /* The CPU mask we want to bind to */ 611 char mem_map[20] = {0}; /* The memory name */ 612 long nb_cpu; /* The number of CPUs in the system */ 613 long my_cpu; /* The CPU number we want to bind to */ 614 int i; 615 struct rte_config *cfg = rte_eal_get_configuration(); 482 616 struct saved_getopts save_opts; 483 484 #if DEBUG 485 rte_set_log_level(RTE_LOG_DEBUG); 486 #else 487 rte_set_log_level(RTE_LOG_WARNING); 488 #endif 489 /* 490 * Using unique file prefixes mean separate memory is used, unlinking 491 * the two processes. However be careful we still cannot access a 492 * port that already in use. 493 */ 494 char* argv[] = {"libtrace", 495 "-c", cpu_number, 496 "-n", "1", 497 "--proc-type", "auto", 498 "--file-prefix", mem_map, 499 "-m", "256", 617 618 /* This initialises the Environment Abstraction Layer (EAL) 619 * If we had slave workers these are put into WAITING state 620 * 621 * Basically binds this thread to a fixed core, which we choose as 622 * the last core on the machine (assuming fewer interrupts mapped here). 623 * "-c" controls the cpu mask 0x1=1st core 0x2=2nd 0x4=3rd and so on 624 * "-n" the number of memory channels into the CPU (hardware specific) 625 * - Most likely to be half the number of ram slots in your machine. 626 * We could count ram slots by "dmidecode -t 17 | grep -c 'Size:'" 627 * Controls where in memory packets are stored such that they are spread 628 * across the channels. We just use 1 to be safe. 629 * 630 * Using unique file prefixes mean separate memory is used, unlinking 631 * the two processes. However be careful we still cannot access a 632 * port that already in use. 633 */ 634 char* argv[] = {"libtrace", 635 "-c", cpu_number, 636 "-n", "1", 637 "--proc-type", "auto", 638 "--file-prefix", mem_map, 639 "-m", "512", 500 640 #if DPDK_USE_LOG_LEVEL 501 641 # if DEBUG 502 642 "--log-level", "8", /* RTE_LOG_DEBUG */ 503 643 # else 504 644 "--log-level", "5", /* RTE_LOG_WARNING */ 505 645 # endif 506 646 #endif 507 NULL}; 508 int argc = sizeof(argv) / sizeof(argv[0]) - 1; 509 510 /* This initialises the Environment Abstraction Layer (EAL) 511 * If we had slave workers these are put into WAITING state 512 * 513 * Basically binds this thread to a fixed core, which we choose as 514 * the last core on the machine (assuming fewer interrupts mapped here). 515 * "-c" controls the cpu mask 0x1=1st core 0x2=2nd 0x4=3rd and so on 516 * "-n" the number of memory channels into the CPU (hardware specific) 517 * - Most likely to be half the number of ram slots in your machine. 518 * We could count ram slots by "dmidecode -t 17 | grep -c 'Size:'" 519 * Controls where in memory packets are stored and should spread across 520 * the channels. We just use 1 to be safe. 521 */ 522 523 /* Get the number of cpu cores in the system and use the last core */ 524 nb_cpu = sysconf(_SC_NPROCESSORS_ONLN); 525 if (nb_cpu <= 0) { 526 perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core."); 527 nb_cpu = 1; /* fallback to the first core */ 528 } 529 if (nb_cpu > RTE_MAX_LCORE) 530 nb_cpu = RTE_MAX_LCORE; 531 532 my_cpu = nb_cpu; 533 /* This allows the user to specify the core - we would try to do this 534 * automatically but it's hard to tell that this is secondary 535 * before running rte_eal_init(...). Currently we are limited to 1 536 * instance per core due to the way memory is allocated. */ 537 if (parse_pciaddr(uridata, &use_addr, &my_cpu) != 0) { 538 snprintf(err, errlen, "Failed to parse URI"); 539 return -1; 540 } 541 542 snprintf(format_data->mempool_name, MEMPOOL_NAME_LEN, 543 "libtrace_pool_%"PRIu32, (uint32_t) nb_cpu); 544 545 if (!(my_cpu > 0 && my_cpu <= nb_cpu)) { 546 snprintf(err, errlen, 547 "Intel DPDK - User defined a bad CPU number %"PRIu32" must be" 548 " between 1 and %"PRIu32, (uint32_t) my_cpu, (uint32_t) nb_cpu); 549 return -1; 550 } 551 552 /* Make our mask */ 553 snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1)); 647 NULL}; 648 int argc = sizeof(argv) / sizeof(argv[0]) - 1; 649 650 #if DEBUG 651 rte_set_log_level(RTE_LOG_DEBUG); 652 #else 653 rte_set_log_level(RTE_LOG_WARNING); 654 #endif 655 656 /* Get the number of cpu cores in the system and use the last core 657 * on the correct numa node */ 658 nb_cpu = sysconf(_SC_NPROCESSORS_ONLN); 659 if (nb_cpu <= 0) { 660 perror("sysconf(_SC_NPROCESSORS_ONLN) failed." 661 " Falling back to the first core."); 662 nb_cpu = 1; /* fallback to the first core */ 663 } 664 if (nb_cpu > RTE_MAX_LCORE) 665 nb_cpu = RTE_MAX_LCORE; 666 667 my_cpu = -1; 668 /* This allows the user to specify the core - we would try to do this 669 * automatically but it's hard to tell that this is secondary 670 * before running rte_eal_init(...). Currently we are limited to 1 671 * instance per core due to the way memory is allocated. */ 672 if (parse_pciaddr(uridata, &use_addr, &my_cpu) != 0) { 673 snprintf(err, errlen, "Failed to parse URI"); 674 return -1; 675 } 676 677 #if HAVE_LIBNUMA 678 format_data->nic_numa_node = pci_to_numa(&use_addr); 679 if (my_cpu < 0) { 680 #if DEBUG 681 /* If we can assign to a core on the same numa node */ 682 fprintf(stderr, "Using pci card on numa_node%d\n", format_data->nic_numa_node); 683 #endif 684 if(format_data->nic_numa_node >= 0) { 685 int max_node_cpu = -1; 686 struct bitmask *mask = numa_allocate_cpumask(); 687 assert(mask); 688 numa_node_to_cpus(format_data->nic_numa_node, mask); 689 for (i = 0 ; i < nb_cpu; ++i) { 690 if (numa_bitmask_isbitset(mask,i)) 691 max_node_cpu = i+1; 692 } 693 my_cpu = max_node_cpu; 694 } 695 } 696 #endif 697 if (my_cpu < 0) { 698 my_cpu = nb_cpu; 699 } 700 701 702 snprintf(format_data->mempool_name, MEMPOOL_NAME_LEN, 703 "libtrace_pool_%"PRIu32, (uint32_t) nb_cpu); 704 705 if (!(my_cpu > 0 && my_cpu <= nb_cpu)) { 706 snprintf(err, errlen, 707 "Intel DPDK - User defined a bad CPU number %"PRIu32" must be" 708 " between 1 and %"PRIu32, (uint32_t) my_cpu, (uint32_t) nb_cpu); 709 return -1; 710 } 711 712 /* Make our mask with all cores turned on this is so that DPDK 713 * gets all CPU info in older versions */ 714 snprintf(cpu_number, sizeof(cpu_number), "%x", ~(UINT32_MAX<<MIN(31, nb_cpu))); 715 //snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1)); 554 716 555 717 #if !DPDK_USE_BLACKLIST 556 557 558 559 560 561 718 /* Black list all ports besides the one that we want to use */ 719 if ((ret = whitelist_device(format_data, &use_addr)) < 0) { 720 snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed," 721 " are you sure the address is correct?: %s", strerror(-ret)); 722 return -1; 723 } 562 724 #endif 563 725 564 726 /* Give the memory map a unique name */ 565 727 snprintf(mem_map, sizeof(mem_map), "libtrace-%d", (int) getpid()); 566 /* rte_eal_init it makes a call to getopt so we need to reset the 567 728 /* rte_eal_init it makes a call to getopt so we need to reset the 729 * global optind variable of getopt otherwise this fails */ 568 730 save_getopts(&save_opts); 569 570 571 snprintf(err, errlen, 572 573 574 731 optind = 1; 732 if ((ret = rte_eal_init(argc, argv)) < 0) { 733 snprintf(err, errlen, 734 "Intel DPDK - Initialisation of EAL failed: %s", strerror(-ret)); 735 return -1; 736 } 575 737 restore_getopts(&save_opts); 738 // These are still running but will never do anything with DPDK v1.7 we 739 // should remove this XXX in the future 740 for(i = 0; i < RTE_MAX_LCORE; ++i) { 741 if (rte_lcore_is_enabled(i) && i != (int) rte_get_master_lcore()) { 742 cfg->lcore_role[i] = ROLE_OFF; 743 cfg->lcore_count--; 744 } 745 } 746 // Only the master should be running 747 assert(cfg->lcore_count == 1); 748 749 // TODO XXX TODO 750 dpdk_move_master_lcore(NULL, my_cpu-1); 576 751 577 752 #if DEBUG 578 753 dump_configuration(); 579 754 #endif 580 755 581 756 #if DPDK_USE_PMD_INIT 582 583 584 585 586 snprintf(err, errlen, 587 588 589 757 /* This registers all available NICs with Intel DPDK 758 * These are not loaded until rte_eal_pci_probe() is called. 759 */ 760 if ((ret = rte_pmd_init_all()) < 0) { 761 snprintf(err, errlen, 762 "Intel DPDK - rte_pmd_init_all failed: %s", strerror(-ret)); 763 return -1; 764 } 590 765 #endif 591 766 592 767 #if DPDK_USE_BLACKLIST 593 768 /* Blacklist all ports besides the one that we want to use */ 594 769 if ((ret = blacklist_devices(format_data, &use_addr)) < 0) { 595 770 snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed," … … 600 775 601 776 #if DPDK_USE_PCI_PROBE 602 777 /* This loads DPDK drivers against all ports that are not blacklisted */ 603 778 if ((ret = rte_eal_pci_probe()) < 0) { 604 snprintf(err, errlen, 605 606 607 608 #endif 609 610 611 612 613 snprintf(err, errlen, 614 615 616 617 618 619 779 snprintf(err, errlen, 780 "Intel DPDK - rte_eal_pci_probe failed: %s", strerror(-ret)); 781 return -1; 782 } 783 #endif 784 785 format_data->nb_ports = rte_eth_dev_count(); 786 787 if (format_data->nb_ports != 1) { 788 snprintf(err, errlen, 789 "Intel DPDK - rte_eth_dev_count returned %d but it should be 1", 790 format_data->nb_ports); 791 return -1; 792 } 793 794 return 0; 620 795 } 621 796 622 797 static int dpdk_init_input (libtrace_t *libtrace) { 623 char err[500]; 624 err[0] = 0; 625 626 libtrace->format_data = (struct dpdk_format_data_t *) 627 malloc(sizeof(struct dpdk_format_data_t)); 628 FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */ 629 FORMAT(libtrace)->queue_id = 0; /* Single queue per port */ 630 FORMAT(libtrace)->nb_ports = 0; 631 FORMAT(libtrace)->snaplen = 0; /* Use default */ 632 FORMAT(libtrace)->nb_rx_buf = NB_RX_MBUF; 633 FORMAT(libtrace)->nb_tx_buf = MIN_NB_BUF; 634 FORMAT(libtrace)->promisc = -1; 635 FORMAT(libtrace)->pktmbuf_pool = NULL; 798 dpdk_per_stream_t stream = DPDK_EMPTY_STREAM; 799 char err[500]; 800 err[0] = 0; 801 802 libtrace->format_data = (struct dpdk_format_data_t *) 803 malloc(sizeof(struct dpdk_format_data_t)); 804 FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */ 805 FORMAT(libtrace)->nb_ports = 0; 806 FORMAT(libtrace)->snaplen = 0; /* Use default */ 807 FORMAT(libtrace)->nb_rx_buf = NB_RX_MBUF; 808 FORMAT(libtrace)->nb_tx_buf = MIN_NB_BUF; 809 FORMAT(libtrace)->nic_numa_node = -1; 810 FORMAT(libtrace)->promisc = -1; 811 FORMAT(libtrace)->pktmbuf_pool = NULL; 636 812 #if DPDK_USE_BLACKLIST 637 FORMAT(libtrace)->nb_blacklist = 0; 638 #endif 639 FORMAT(libtrace)->paused = DPDK_NEVER_STARTED; 640 FORMAT(libtrace)->mempool_name[0] = 0; 641 #if HAS_HW_TIMESTAMPS_82580 642 FORMAT(libtrace)->ts_first_sys = 0; 643 FORMAT(libtrace)->ts_last_sys = 0; 644 FORMAT(libtrace)->wrap_count = 0; 645 #endif 646 647 if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) { 648 trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err); 649 free(libtrace->format_data); 650 libtrace->format_data = NULL; 651 return -1; 652 } 653 return 0; 654 }; 813 FORMAT(libtrace)->nb_blacklist = 0; 814 #endif 815 FORMAT(libtrace)->paused = DPDK_NEVER_STARTED; 816 FORMAT(libtrace)->mempool_name[0] = 0; 817 memset(FORMAT(libtrace)->burst_pkts, 0, 818 sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE); 819 FORMAT(libtrace)->burst_size = 0; 820 FORMAT(libtrace)->burst_offset = 0; 821 822 /* Make our first stream */ 823 FORMAT(libtrace)->per_stream = libtrace_list_init(sizeof(struct dpdk_per_stream_t)); 824 libtrace_list_push_back(FORMAT(libtrace)->per_stream, &stream); 825 826 if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) { 827 trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err); 828 free(libtrace->format_data); 829 libtrace->format_data = NULL; 830 return -1; 831 } 832 return 0; 833 } 655 834 656 835 static int dpdk_init_output(libtrace_out_t *libtrace) 657 836 { 658 659 660 661 662 663 664 FORMAT(libtrace)->queue_id = 0; /* Single queue per port */ 665 FORMAT(libtrace)->nb_ports = 0; 666 FORMAT(libtrace)->snaplen = 0; /* Use default */ 667 FORMAT(libtrace)->nb_rx_buf = MIN_NB_BUF;668 FORMAT(libtrace)->nb_tx_buf = NB_TX_MBUF;669 670 837 char err[500]; 838 err[0] = 0; 839 840 libtrace->format_data = (struct dpdk_format_data_t *) 841 malloc(sizeof(struct dpdk_format_data_t)); 842 FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */ 843 FORMAT(libtrace)->nb_ports = 0; 844 FORMAT(libtrace)->snaplen = 0; /* Use default */ 845 FORMAT(libtrace)->nb_rx_buf = MIN_NB_BUF; 846 FORMAT(libtrace)->nb_tx_buf = NB_TX_MBUF; 847 FORMAT(libtrace)->nic_numa_node = -1; 848 FORMAT(libtrace)->promisc = -1; 849 FORMAT(libtrace)->pktmbuf_pool = NULL; 671 850 #if DPDK_USE_BLACKLIST 672 FORMAT(libtrace)->nb_blacklist = 0; 673 #endif 674 FORMAT(libtrace)->paused = DPDK_NEVER_STARTED; 675 FORMAT(libtrace)->mempool_name[0] = 0; 676 #if HAS_HW_TIMESTAMPS_82580 677 FORMAT(libtrace)->ts_first_sys = 0; 678 FORMAT(libtrace)->ts_last_sys = 0; 679 FORMAT(libtrace)->wrap_count = 0; 680 #endif 681 682 if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) { 683 trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err); 684 free(libtrace->format_data); 685 libtrace->format_data = NULL; 686 return -1; 687 } 688 return 0; 689 }; 851 FORMAT(libtrace)->nb_blacklist = 0; 852 #endif 853 FORMAT(libtrace)->paused = DPDK_NEVER_STARTED; 854 FORMAT(libtrace)->mempool_name[0] = 0; 855 memset(FORMAT(libtrace)->burst_pkts, 0, sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE); 856 FORMAT(libtrace)->burst_size = 0; 857 FORMAT(libtrace)->burst_offset = 0; 858 859 if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) { 860 trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err); 861 free(libtrace->format_data); 862 libtrace->format_data = NULL; 863 return -1; 864 } 865 return 0; 866 } 690 867 691 868 /** 692 869 * Note here snaplen excludes the MAC checksum. Packets over 693 870 * the requested snaplen will be dropped. (Excluding MAC checksum) 694 * 871 * 695 872 * I.e the maximum size of a standard ethernet packet is 1518 (Including MAC checksum) 696 873 * So to allow packets upto 1518 this would be set to 1514 and if GET_MAC_CRC_CHECKSUM 697 874 * is set the maximum size of the returned packet would be 1518 otherwise 698 875 * 1514 would be the largest size possibly returned. 699 * 876 * 700 877 */ 701 878 static int dpdk_config_input (libtrace_t *libtrace, 702 trace_option_t option, 703 void *data) { 704 switch (option) { 705 case TRACE_OPTION_SNAPLEN: 706 /* Only support changing snaplen before a call to start is 707 * made */ 708 if (FORMAT(libtrace)->paused == DPDK_NEVER_STARTED) 709 FORMAT(libtrace)->snaplen=*(int*)data; 710 else 711 return -1; 712 return 0; 713 case TRACE_OPTION_PROMISC: 714 FORMAT(libtrace)->promisc=*(int*)data; 715 return 0; 716 case TRACE_OPTION_FILTER: 717 /* TODO filtering */ 718 break; 719 case TRACE_OPTION_META_FREQ: 720 break; 721 case TRACE_OPTION_EVENT_REALTIME: 722 break; 723 /* Avoid default: so that future options will cause a warning 724 * here to remind us to implement it, or flag it as 725 * unimplementable 726 */ 727 } 879 trace_option_t option, 880 void *data) { 881 switch (option) { 882 case TRACE_OPTION_SNAPLEN: 883 /* Only support changing snaplen before a call to start is 884 * made */ 885 if (FORMAT(libtrace)->paused == DPDK_NEVER_STARTED) 886 FORMAT(libtrace)->snaplen=*(int*)data; 887 else 888 return -1; 889 return 0; 890 case TRACE_OPTION_PROMISC: 891 FORMAT(libtrace)->promisc=*(int*)data; 892 return 0; 893 case TRACE_OPTION_HASHER: 894 switch (*((enum hasher_types *) data)) 895 { 896 case HASHER_BALANCE: 897 case HASHER_UNIDIRECTIONAL: 898 toeplitz_create_unikey(FORMAT(libtrace)->rss_key); 899 return 0; 900 case HASHER_BIDIRECTIONAL: 901 toeplitz_create_bikey(FORMAT(libtrace)->rss_key); 902 return 0; 903 case HASHER_CUSTOM: 904 // We don't support these 905 return -1; 906 } 907 break; 908 case TRACE_OPTION_FILTER: 909 /* TODO filtering */ 910 case TRACE_OPTION_META_FREQ: 911 case TRACE_OPTION_EVENT_REALTIME: 912 break; 913 /* Avoid default: so that future options will cause a warning 914 * here to remind us to implement it, or flag it as 915 * unimplementable 916 */ 917 } 728 918 729 919 /* Don't set an error - trace_config will try to deal with the 730 920 * option and will set an error if it fails */ 731 921 return -1; 732 922 } 733 923 734 924 /* Can set jumbo frames/ or limit the size of a frame by setting both 735 925 * max_rx_pkt_len and jumbo_frame. This can be limited to less than 736 * 926 * 737 927 */ 738 928 static struct rte_eth_conf port_conf = { 739 929 .rxmode = { 930 .mq_mode = ETH_RSS, 740 931 .split_hdr_size = 0, 741 932 .header_split = 0, /**< Header Split disabled */ … … 743 934 .hw_vlan_filter = 0, /**< VLAN filtering disabled */ 744 935 .jumbo_frame = 0, /**< Jumbo Frame Support disabled */ 745 936 .max_rx_pkt_len = 0, /**< Max frame Size if Jumbo enabled */ 746 937 #if GET_MAC_CRC_CHECKSUM 747 938 /* So it appears that if hw_strip_crc is turned off the driver will still … … 756 947 * always cut off the checksum in the future 757 948 */ 758 949 .hw_strip_crc = 1, /**< CRC stripped by hardware */ 759 950 #endif 760 951 }, … … 762 953 .mq_mode = ETH_DCB_NONE, 763 954 }, 955 .rx_adv_conf = { 956 .rss_conf = { 957 // .rss_key = &rss_key, // We set this per format 958 .rss_hf = ETH_RSS_IPV4_UDP | ETH_RSS_IPV6 | ETH_RSS_IPV4 | ETH_RSS_IPV4_TCP | ETH_RSS_IPV6_TCP | ETH_RSS_IPV6_UDP, 959 }, 960 }, 961 .intr_conf = { 962 .lsc = 1 963 } 764 964 }; 765 965 … … 770 970 .wthresh = 4,/* RX_WTHRESH writeback */ 771 971 }, 772 773 972 .rx_free_thresh = 0, 973 .rx_drop_en = 0, /* Drop packets oldest packets if out of space */ 774 974 }; 775 975 776 976 static const struct rte_eth_txconf tx_conf = { 777 977 .tx_thresh = { 778 /**779 780 781 782 783 978 /* 979 * TX_PTHRESH prefetch 980 * Set on the NIC, if the number of unprocessed descriptors to queued on 981 * the card fall below this try grab at least hthresh more unprocessed 982 * descriptors. 983 */ 784 984 .pthresh = 36, 785 985 786 787 788 986 /* TX_HTHRESH host 987 * Set on the NIC, the batch size to prefetch unprocessed tx descriptors. 988 */ 789 989 .hthresh = 0, 790 791 792 793 794 795 796 797 798 799 990 991 /* TX_WTHRESH writeback 992 * Set on the NIC, the number of sent descriptors before writing back 993 * status to confirm the transmission. This is done more efficiently as 994 * a bulk DMA-transfer rather than writing one at a time. 995 * Similar to tx_free_thresh however this is applied to the NIC, where 996 * as tx_free_thresh is when DPDK will check these. This is extended 997 * upon by tx_rs_thresh (10Gbit cards) which doesn't write all 998 * descriptors rather only every n'th item, reducing DMA memory bandwidth. 999 */ 800 1000 .wthresh = 4, 801 1001 }, 802 1002 803 804 805 806 1003 /* Used internally by DPDK rather than passed to the NIC. The number of 1004 * packet descriptors to send before checking for any responses written 1005 * back (to confirm the transmission). Default = 32 if set to 0) 1006 */ 807 1007 .tx_free_thresh = 0, 808 1008 809 810 * This signals the card to only write back status (such as 811 812 813 814 815 1009 /* This is the Report Status threshold, used by 10Gbit cards, 1010 * This signals the card to only write back status (such as 1011 * transmission successful) after this minimum number of transmit 1012 * descriptors are seen. The default is 32 (if set to 0) however if set 1013 * to greater than 1 TX wthresh must be set to zero, because this is kindof 1014 * a replacement. See the dpdk programmers guide for more restrictions. 1015 */ 816 1016 .tx_rs_thresh = 1, 817 1017 }; 818 1018 819 /* Attach memory to the port and start the port or restart the port. 820 */ 821 static int dpdk_start_port (struct dpdk_format_data_t * format_data, char *err, int errlen){ 822 int ret; /* Check return values for errors */ 823 struct rte_eth_link link_info; /* Wait for link */ 824 825 /* Already started */ 826 if (format_data->paused == DPDK_RUNNING) 827 return 0; 828 829 /* First time started we need to alloc our memory, doing this here 830 * rather than in environment setup because we don't have snaplen then */ 831 if (format_data->paused == DPDK_NEVER_STARTED) { 832 if (format_data->snaplen == 0) { 833 format_data->snaplen = RX_MBUF_SIZE; 834 port_conf.rxmode.jumbo_frame = 0; 835 port_conf.rxmode.max_rx_pkt_len = 0; 836 } else { 837 /* Use jumbo frames */ 838 port_conf.rxmode.jumbo_frame = 1; 839 port_conf.rxmode.max_rx_pkt_len = format_data->snaplen; 840 } 841 842 /* This is additional overhead so make sure we allow space for this */ 1019 /** 1020 * A callback for a link state change (LSC). 1021 * 1022 * Packets may be received before this notification. In fact the DPDK IGXBE 1023 * driver likes to put a delay upto 5sec before sending this. 1024 * 1025 * We use this to ensure the link speed is correct for our timestamp 1026 * calculations. Because packets might be received before the link up we still 1027 * update this when the packet is received. 1028 * 1029 * @param port The DPDK port 1030 * @param event The TYPE of event (expected to be RTE_ETH_EVENT_INTR_LSC) 1031 * @param cb_arg The dpdk_format_data_t structure associated with the format 1032 */ 1033 static void dpdk_lsc_callback(uint8_t port, enum rte_eth_event_type event, 1034 void *cb_arg) { 1035 struct dpdk_format_data_t * format_data = cb_arg; 1036 struct rte_eth_link link_info; 1037 assert(event == RTE_ETH_EVENT_INTR_LSC); 1038 assert(port == format_data->port); 1039 1040 rte_eth_link_get_nowait(port, &link_info); 1041 1042 if (link_info.link_status) 1043 format_data->link_speed = link_info.link_speed; 1044 else 1045 format_data->link_speed = 0; 1046 1047 #if DEBUG 1048 fprintf(stderr, "LSC - link status is %s %s speed=%d\n", 1049 link_info.link_status ? "up" : "down", 1050 (link_info.link_duplex == ETH_LINK_FULL_DUPLEX) ? 1051 "full-duplex" : "half-duplex", 1052 (int) link_info.link_speed); 1053 #endif 1054 1055 /* Turns out DPDK drivers might not come back up if the link speed 1056 * changes. So we reset the autoneg procedure. This is very unsafe 1057 * we have have threads reading packets and we stop the port. */ 1058 #if 0 1059 if (!link_info.link_status) { 1060 int ret; 1061 rte_eth_dev_stop(port); 1062 ret = rte_eth_dev_start(port); 1063 if (ret < 0) { 1064 fprintf(stderr, "Resetting the DPDK port failed : %s\n", 1065 strerror(-ret)); 1066 } 1067 } 1068 #endif 1069 } 1070 1071 /** Reserve a DPDK lcore ID for a thread globally. 1072 * 1073 * @param real If true allocate a real lcore, otherwise allocate a core which 1074 * does not exist on the local machine. 1075 * @param socket the prefered NUMA socket - only used if a real core is requested 1076 * @return a valid core, which can later be used with dpdk_register_lcore() or a 1077 * -1 if have run out of cores. 1078 * 1079 * If any thread is reading or freeing packets we need to register it here 1080 * due to TLS caches in the memory pools. 1081 */ 1082 static int dpdk_reserve_lcore(bool real, int socket) { 1083 int new_id = -1; 1084 int i; 1085 struct rte_config *cfg = rte_eal_get_configuration(); 1086 1087 pthread_mutex_lock(&dpdk_lock); 1088 /* If 'reading packets' fill in cores from 0 up and bind affinity 1089 * otherwise start from the MAX core (which is also the master) and work backwards 1090 * in this case physical cores on the system will not exist so we don't bind 1091 * these to any particular physical core */ 1092 if (real) { 1093 #if HAVE_LIBNUMA 1094 for (i = 0; i < RTE_MAX_LCORE; ++i) { 1095 if (!rte_lcore_is_enabled(i) && numa_node_of_cpu(i) == socket) { 1096 new_id = i; 1097 if (!lcore_config[i].detected) 1098 new_id = -1; 1099 break; 1100 } 1101 } 1102 #endif 1103 /* Retry without the the numa restriction */ 1104 if (new_id == -1) { 1105 for (i = 0; i < RTE_MAX_LCORE; ++i) { 1106 if (!rte_lcore_is_enabled(i)) { 1107 new_id = i; 1108 if (!lcore_config[i].detected) 1109 fprintf(stderr, "Warning the" 1110 " number of 'reading' " 1111 "threads exceed cores\n"); 1112 break; 1113 } 1114 } 1115 } 1116 } else { 1117 for (i = RTE_MAX_LCORE-1; i >= 0; --i) { 1118 if (!rte_lcore_is_enabled(i)) { 1119 new_id = i; 1120 break; 1121 } 1122 } 1123 } 1124 1125 if (new_id != -1) { 1126 /* Enable the core in global DPDK structs */ 1127 cfg->lcore_role[new_id] = ROLE_RTE; 1128 cfg->lcore_count++; 1129 } 1130 1131 pthread_mutex_unlock(&dpdk_lock); 1132 return new_id; 1133 } 1134 1135 /** Register a thread as a lcore 1136 * @param libtrace any error is set against libtrace on exit 1137 * @param real If this is a true lcore we will bind its affinty to the 1138 * requested core. 1139 * @param lcore The lcore as retrieved from dpdk_reserve_lcore() 1140 * @return 0, if successful otherwise -1 if an error occured (details are stored 1141 * in libtrace) 1142 * 1143 * @note This must be called from the thread being registered. 1144 */ 1145 static int dpdk_register_lcore(libtrace_t *libtrace, bool real, int lcore) { 1146 int ret; 1147 RTE_PER_LCORE(_lcore_id) = lcore; 1148 1149 /* Set affinity bind to corresponding core */ 1150 if (real) { 1151 cpu_set_t cpuset; 1152 CPU_ZERO(&cpuset); 1153 CPU_SET(rte_lcore_id(), &cpuset); 1154 ret = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset); 1155 if (ret != 0) { 1156 trace_set_err(libtrace, errno, "Warning " 1157 "pthread_setaffinity_np failed"); 1158 return -1; 1159 } 1160 } 1161 1162 return 0; 1163 } 1164 1165 /** Allocates a new dpdk packet buffer memory pool. 1166 * 1167 * @param n The number of threads 1168 * @param pkt_size The packet size we need ot store 1169 * @param socket_id The NUMA socket id 1170 * @param A new mempool, if NULL query the DPDK library for the error code 1171 * see rte_mempool_create() documentation. 1172 * 1173 * This allocates a new pool or recycles an existing memory pool. 1174 * Call dpdk_free_memory() to free the memory. 1175 * We cannot delete memory so instead we store the pools, allowing them to be 1176 * re-used. 1177 */ 1178 static struct rte_mempool *dpdk_alloc_memory(unsigned n, 1179 unsigned pkt_size, 1180 int socket_id) { 1181 struct rte_mempool *ret; 1182 size_t j,k; 1183 char name[MEMPOOL_NAME_LEN]; 1184 1185 /* Add on packet size overheads */ 1186 pkt_size += sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM; 1187 1188 pthread_mutex_lock(&dpdk_lock); 1189 1190 if (socket_id == SOCKET_ID_ANY || socket_id > 4) { 1191 /* Best guess go for zero */ 1192 socket_id = 0; 1193 } 1194 1195 /* Find a valid pool */ 1196 for (j = 0; j < RTE_MAX_LCORE && mem_pools[socket_id][j]; ++j) { 1197 if (mem_pools[socket_id][j]->size >= n && 1198 mem_pools[socket_id][j]->elt_size >= pkt_size) { 1199 break; 1200 } 1201 } 1202 1203 /* Find the end (+1) of the list */ 1204 for (k = j; k < RTE_MAX_LCORE && mem_pools[socket_id][k]; ++k) {} 1205 1206 if (mem_pools[socket_id][j]) { 1207 ret = mem_pools[socket_id][j]; 1208 mem_pools[socket_id][j] = mem_pools[socket_id][k-1]; 1209 mem_pools[socket_id][k-1] = NULL; 1210 mem_pools[socket_id][j] = NULL; 1211 } else { 1212 static uint32_t test = 10; 1213 test++; 1214 snprintf(name, MEMPOOL_NAME_LEN, 1215 "libtrace_pool_%"PRIu32, test); 1216 1217 ret = rte_mempool_create(name, n, pkt_size, 1218 128, sizeof(struct rte_pktmbuf_pool_private), 1219 rte_pktmbuf_pool_init, NULL, 1220 rte_pktmbuf_init, NULL, 1221 socket_id, 0); 1222 } 1223 1224 pthread_mutex_unlock(&dpdk_lock); 1225 return ret; 1226 } 1227 1228 /** Stores the memory against the DPDK library. 1229 * 1230 * @param mempool The mempool to free 1231 * @param socket_id The NUMA socket this mempool was allocated upon. 1232 * 1233 * Because we cannot free a memory pool, we verify it's full (i.e. unused) and 1234 * store the memory shared globally against the format. 1235 */ 1236 static void dpdk_free_memory(struct rte_mempool *mempool, int socket_id) { 1237 size_t i; 1238 pthread_mutex_lock(&dpdk_lock); 1239 1240 /* We should have all entries back in the mempool */ 1241 rte_mempool_audit(mempool); 1242 if (!rte_mempool_full(mempool)) { 1243 fprintf(stderr, "DPDK memory pool not empty %d of %d, please " 1244 "free all packets before finishing a trace\n", 1245 rte_mempool_count(mempool), mempool->size); 1246 } 1247 1248 /* Find the end (+1) of the list */ 1249 for (i = 0; i < RTE_MAX_LCORE && mem_pools[socket_id][i]; ++i) {} 1250 1251 if (i >= RTE_MAX_LCORE) { 1252 fprintf(stderr, "Too many memory pools, dropping this one\n"); 1253 } else { 1254 mem_pools[socket_id][i] = mempool; 1255 } 1256 1257 pthread_mutex_unlock(&dpdk_lock); 1258 } 1259 1260 /* Attach memory to the port and start (or restart) the port/s. 1261 */ 1262 static int dpdk_start_streams(struct dpdk_format_data_t *format_data, 1263 char *err, int errlen, uint16_t rx_queues) { 1264 int ret, i; 1265 struct rte_eth_link link_info; /* Wait for link */ 1266 dpdk_per_stream_t empty_stream = DPDK_EMPTY_STREAM; 1267 1268 /* Already started */ 1269 if (format_data->paused == DPDK_RUNNING) 1270 return 0; 1271 1272 /* First time started we need to alloc our memory, doing this here 1273 * rather than in environment setup because we don't have snaplen then */ 1274 if (format_data->paused == DPDK_NEVER_STARTED) { 1275 if (format_data->snaplen == 0) { 1276 format_data->snaplen = RX_MBUF_SIZE; 1277 port_conf.rxmode.jumbo_frame = 0; 1278 port_conf.rxmode.max_rx_pkt_len = 0; 1279 } else { 1280 /* Use jumbo frames */ 1281 port_conf.rxmode.jumbo_frame = 1; 1282 port_conf.rxmode.max_rx_pkt_len = format_data->snaplen; 1283 } 1284 843 1285 #if GET_MAC_CRC_CHECKSUM 844 format_data->snaplen += ETHER_CRC_LEN; 1286 /* This is additional overhead so make sure we allow space for this */ 1287 format_data->snaplen += ETHER_CRC_LEN; 845 1288 #endif 846 1289 #if HAS_HW_TIMESTAMPS_82580 847 848 #endif 849 850 /* Create the mbuf pool, which is the place ourpackets are allocated851 * from - TODO figure out if there is is a free function (I cannot see one) 852 * NOTE: RX queue requires nb_packets + 1 otherwise it fails to 853 * allocate however that extra 1 packet is not used. 854 855 * TX requires nb_tx_buffers + 1 in the case the queue is full 856 857 858 1290 format_data->snaplen += sizeof(struct hw_timestamp_82580); 1291 #endif 1292 1293 /* Create the mbuf pool, which is the place packets are allocated 1294 * from - There is no free function (I cannot see one). 1295 * NOTE: RX queue requires nb_packets + 1 otherwise it fails to 1296 * allocate however that extra 1 packet is not used. 1297 * (I assume <= vs < error some where in DPDK code) 1298 * TX requires nb_tx_buffers + 1 in the case the queue is full 1299 * so that will fill the new buffer and wait until slots in the 1300 * ring become available. 1301 */ 859 1302 #if DEBUG 860 fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name); 861 #endif 862 format_data->pktmbuf_pool = 863 rte_mempool_create(format_data->mempool_name, 864 format_data->nb_rx_buf + format_data->nb_tx_buf + 1, 865 format_data->snaplen + sizeof(struct rte_mbuf) 866 + RTE_PKTMBUF_HEADROOM, 867 8, sizeof(struct rte_pktmbuf_pool_private), 868 rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL, 869 rte_socket_id(), MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET); 870 871 if (format_data->pktmbuf_pool == NULL) { 872 snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf " 873 "pool failed: %s", strerror(rte_errno)); 874 return -1; 875 } 876 } 877 878 /* ----------- Now do the setup for the port mapping ------------ */ 879 /* Order of calls must be 880 * rte_eth_dev_configure() 881 * rte_eth_tx_queue_setup() 882 * rte_eth_rx_queue_setup() 883 * rte_eth_dev_start() 884 * other rte_eth calls 885 */ 886 887 /* This must be called first before another *eth* function 888 * 1 rx, 1 tx queue, port_conf sets checksum stripping etc */ 889 ret = rte_eth_dev_configure(format_data->port, 1, 1, &port_conf); 890 if (ret < 0) { 891 snprintf(err, errlen, "Intel DPDK - Cannot configure device port" 892 " %"PRIu8" : %s", format_data->port, 893 strerror(-ret)); 894 return -1; 895 } 896 /* Initialise the TX queue a minimum value if using this port for 897 * receiving. Otherwise a larger size if writing packets. 898 */ 899 ret = rte_eth_tx_queue_setup(format_data->port, format_data->queue_id, 900 format_data->nb_tx_buf, rte_socket_id(), &tx_conf); 901 if (ret < 0) { 902 snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue on port" 903 " %"PRIu8" : %s", format_data->port, 904 strerror(-ret)); 905 return -1; 906 } 907 /* Initialise the RX queue with some packets from memory */ 908 ret = rte_eth_rx_queue_setup(format_data->port, format_data->queue_id, 909 format_data->nb_rx_buf, rte_socket_id(), 910 &rx_conf, format_data->pktmbuf_pool); 911 if (ret < 0) { 912 snprintf(err, errlen, "Intel DPDK - Cannot configure RX queue on port" 913 " %"PRIu8" : %s", format_data->port, 914 strerror(-ret)); 915 return -1; 916 } 917 918 /* Start device */ 919 ret = rte_eth_dev_start(format_data->port); 920 if (ret < 0) { 921 snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s", 922 strerror(-ret)); 923 return -1; 924 } 925 926 /* Default promiscuous to on */ 927 if (format_data->promisc == -1) 928 format_data->promisc = 1; 929 930 if (format_data->promisc == 1) 931 rte_eth_promiscuous_enable(format_data->port); 932 else 933 rte_eth_promiscuous_disable(format_data->port); 934 935 /* Wait for the link to come up */ 936 rte_eth_link_get(format_data->port, &link_info); 1303 fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name); 1304 #endif 1305 format_data->pktmbuf_pool = dpdk_alloc_memory(format_data->nb_tx_buf*2, 1306 format_data->snaplen, 1307 format_data->nic_numa_node); 1308 1309 if (format_data->pktmbuf_pool == NULL) { 1310 snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf " 1311 "pool failed: %s", strerror(rte_errno)); 1312 return -1; 1313 } 1314 } 1315 1316 /* ----------- Now do the setup for the port mapping ------------ */ 1317 /* Order of calls must be 1318 * rte_eth_dev_configure() 1319 * rte_eth_tx_queue_setup() 1320 * rte_eth_rx_queue_setup() 1321 * rte_eth_dev_start() 1322 * other rte_eth calls 1323 */ 1324 1325 /* This must be called first before another *eth* function 1326 * 1+ rx, 1 tx queues, port_conf sets checksum stripping etc */ 1327 ret = rte_eth_dev_configure(format_data->port, rx_queues, 1, &port_conf); 1328 if (ret < 0) { 1329 snprintf(err, errlen, "Intel DPDK - Cannot configure device port" 1330 " %"PRIu8" : %s", format_data->port, 1331 strerror(-ret)); 1332 return -1; 1333 } 937 1334 #if DEBUG 938 fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status, 939 (int) link_info.link_duplex, (int) link_info.link_speed); 940 #endif 941 942 /* We have now successfully started/unpaused */ 943 format_data->paused = DPDK_RUNNING; 944 945 return 0; 1335 fprintf(stderr, "Doing dev configure\n"); 1336 #endif 1337 /* Initialise the TX queue a minimum value if using this port for 1338 * receiving. Otherwise a larger size if writing packets. 1339 */ 1340 ret = rte_eth_tx_queue_setup(format_data->port, 1341 0 /* queue XXX */, 1342 format_data->nb_tx_buf, 1343 SOCKET_ID_ANY, 1344 &tx_conf); 1345 if (ret < 0) { 1346 snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue" 1347 " on port %"PRIu8" : %s", format_data->port, 1348 strerror(-ret)); 1349 return -1; 1350 } 1351 1352 /* Attach memory to our RX queues */ 1353 for (i=0; i < rx_queues; i++) { 1354 dpdk_per_stream_t *stream; 1355 #if DEBUG 1356 fprintf(stderr, "Configuring queue %d\n", i); 1357 #endif 1358 1359 /* Add storage for the stream */ 1360 if (libtrace_list_get_size(format_data->per_stream) <= (size_t) i) 1361 libtrace_list_push_back(format_data->per_stream, &empty_stream); 1362 stream = libtrace_list_get_index(format_data->per_stream, i)->data; 1363 stream->queue_id = i; 1364 1365 if (stream->lcore == -1) 1366 stream->lcore = dpdk_reserve_lcore(true, format_data->nic_numa_node); 1367 1368 if (stream->lcore == -1) { 1369 snprintf(err, errlen, "Intel DPDK - Failed to reserve a lcore" 1370 ". Too many threads?"); 1371 return -1; 1372 } 1373 1374 if (stream->mempool == NULL) { 1375 stream->mempool = dpdk_alloc_memory( 1376 format_data->nb_rx_buf*2, 1377 format_data->snaplen, 1378 rte_lcore_to_socket_id(stream->lcore)); 1379 1380 if (stream->mempool == NULL) { 1381 snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf " 1382 "pool failed: %s", strerror(rte_errno)); 1383 return -1; 1384 } 1385 } 1386 1387 /* Initialise the RX queue with some packets from memory */ 1388 ret = rte_eth_rx_queue_setup(format_data->port, 1389 stream->queue_id, 1390 format_data->nb_rx_buf, 1391 format_data->nic_numa_node, 1392 &rx_conf, 1393 stream->mempool); 1394 if (ret < 0) { 1395 snprintf(err, errlen, "Intel DPDK - Cannot configure" 1396 " RX queue on port %"PRIu8" : %s", 1397 format_data->port, 1398 strerror(-ret)); 1399 return -1; 1400 } 1401 } 1402 1403 #if DEBUG 1404 fprintf(stderr, "Doing start device\n"); 1405 #endif 1406 rte_eth_stats_reset(format_data->port); 1407 /* Start device */ 1408 ret = rte_eth_dev_start(format_data->port); 1409 if (ret < 0) { 1410 snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s", 1411 strerror(-ret)); 1412 return -1; 1413 } 1414 1415 /* Default promiscuous to on */ 1416 if (format_data->promisc == -1) 1417 format_data->promisc = 1; 1418 1419 if (format_data->promisc == 1) 1420 rte_eth_promiscuous_enable(format_data->port); 1421 else 1422 rte_eth_promiscuous_disable(format_data->port); 1423 1424 /* We have now successfully started/unpased */ 1425 format_data->paused = DPDK_RUNNING; 1426 1427 1428 /* Register a callback for link state changes */ 1429 ret = rte_eth_dev_callback_register(format_data->port, 1430 RTE_ETH_EVENT_INTR_LSC, 1431 dpdk_lsc_callback, 1432 format_data); 1433 #if DEBUG 1434 if (ret) 1435 fprintf(stderr, "rte_eth_dev_callback_register failed %d : %s\n", 1436 ret, strerror(-ret)); 1437 #endif 1438 1439 /* Get the current link status */ 1440 rte_eth_link_get_nowait(format_data->port, &link_info); 1441 format_data->link_speed = link_info.link_speed; 1442 #if DEBUG 1443 fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status, 1444 (int) link_info.link_duplex, (int) link_info.link_speed); 1445 #endif 1446 1447 return 0; 946 1448 } 947 1449 948 1450 static int dpdk_start_input (libtrace_t *libtrace) { 949 char err[500]; 950 err[0] = 0; 951 952 if (dpdk_start_port(FORMAT(libtrace), err, sizeof(err)) != 0) { 953 trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err); 954 free(libtrace->format_data); 955 libtrace->format_data = NULL; 956 return -1; 957 } 958 return 0; 1451 char err[500]; 1452 err[0] = 0; 1453 1454 /* Make sure we don't reserve an extra thread for this */ 1455 FORMAT_DATA_FIRST(libtrace)->queue_id = rte_lcore_id(); 1456 1457 if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), 1) != 0) { 1458 trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err); 1459 free(libtrace->format_data); 1460 libtrace->format_data = NULL; 1461 return -1; 1462 } 1463 return 0; 1464 } 1465 1466 static inline size_t dpdk_get_max_rx_queues (uint8_t port_id) { 1467 struct rte_eth_dev_info dev_info; 1468 rte_eth_dev_info_get(port_id, &dev_info); 1469 return dev_info.max_rx_queues; 1470 } 1471 1472 static inline size_t dpdk_processor_count () { 1473 long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN); 1474 if (nb_cpu <= 0) 1475 return 1; 1476 else 1477 return (size_t) nb_cpu; 1478 } 1479 1480 static int dpdk_pstart_input (libtrace_t *libtrace) { 1481 char err[500]; 1482 int i=0, phys_cores=0; 1483 int tot = libtrace->perpkt_thread_count; 1484 libtrace_list_node_t *n; 1485 err[0] = 0; 1486 1487 if (rte_lcore_id() != rte_get_master_lcore()) 1488 fprintf(stderr, "Warning dpdk_pstart_input should be called" 1489 " from the master DPDK thread!\n"); 1490 1491 /* If the master is not on the last thread we move it there */ 1492 if (rte_get_master_lcore() != RTE_MAX_LCORE - 1) { 1493 if (dpdk_move_master_lcore(libtrace, RTE_MAX_LCORE - 1) != 0) 1494 return -1; 1495 } 1496 1497 /* Don't exceed the number of cores in the system/detected by dpdk 1498 * We don't have to force this but performance wont be good if we don't */ 1499 for (i = 0; i < RTE_MAX_LCORE; ++i) { 1500 if (lcore_config[i].detected) { 1501 if (rte_lcore_is_enabled(i)) { 1502 #if DEBUG 1503 fprintf(stderr, "Found core %d already in use!\n", i); 1504 #endif 1505 } else { 1506 phys_cores++; 1507 } 1508 } 1509 } 1510 /* If we are restarting we have already allocated some threads as such 1511 * we add these back to the count for this calculation */ 1512 for (n = FORMAT_DATA_HEAD(libtrace); n; n = n->next) { 1513 dpdk_per_stream_t * stream = n->data; 1514 if (stream->lcore != -1) 1515 phys_cores++; 1516 } 1517 1518 tot = MIN(libtrace->perpkt_thread_count, 1519 dpdk_get_max_rx_queues(FORMAT(libtrace)->port)); 1520 tot = MIN(tot, phys_cores); 1521 1522 #if DEBUG 1523 fprintf(stderr, "Running pstart DPDK tot=%d req=%d phys=%d\n", tot, 1524 libtrace->perpkt_thread_count, phys_cores); 1525 #endif 1526 1527 if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), tot) != 0) { 1528 trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err); 1529 free(libtrace->format_data); 1530 libtrace->format_data = NULL; 1531 return -1; 1532 } 1533 1534 /* Make sure we only start the number that we should */ 1535 libtrace->perpkt_thread_count = tot; 1536 return 0; 1537 } 1538 1539 /** 1540 * Register a thread with the DPDK system, 1541 * When we start DPDK in parallel libtrace we move the 'main thread' to the 1542 * MAXIMUM CPU core slot (32) and remove any affinity restrictions DPDK 1543 * gives it. 1544 * 1545 * We then allow a mapper thread to be started on every real core as DPDK would, 1546 * we also bind these to the corresponding CPU cores. 1547 * 1548 * @param libtrace A pointer to the trace 1549 * @param reading True if the thread will be used to read packets, i.e. will 1550 * call pread_packet(), false if thread used to process packet 1551 * in any other manner including statistics functions. 1552 */ 1553 static int dpdk_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t, bool reading) 1554 { 1555 #if DEBUG 1556 char name[99]; 1557 name[0] = 0; 1558 #if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__) 1559 pthread_getname_np(pthread_self(), 1560 name, sizeof(name)); 1561 #endif 1562 #endif 1563 if (reading) { 1564 dpdk_per_stream_t *stream; 1565 /* Attach our thread */ 1566 if(t->type == THREAD_PERPKT) { 1567 t->format_data = libtrace_list_get_index(FORMAT(libtrace)->per_stream, t->perpkt_num)->data; 1568 if (t->format_data == NULL) { 1569 trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, 1570 "Too many threads registered"); 1571 return -1; 1572 } 1573 } else { 1574 t->format_data = FORMAT_DATA_FIRST(libtrace); 1575 } 1576 stream = t->format_data; 1577 #if DEBUG 1578 fprintf(stderr, "%s new id memory:%s cpu-core:%d\n", name, stream->mempool->name, rte_lcore_id()); 1579 #endif 1580 return dpdk_register_lcore(libtrace, true, stream->lcore); 1581 } else { 1582 int lcore = dpdk_reserve_lcore(reading, 0); 1583 if (lcore == -1) { 1584 trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Too many threads" 1585 " for DPDK"); 1586 return -1; 1587 } 1588 #if DEBUG 1589 fprintf(stderr, "%s new id cpu-core:%d\n", name, rte_lcore_id()); 1590 #endif 1591 return dpdk_register_lcore(libtrace, false, lcore); 1592 } 1593 1594 return 0; 1595 } 1596 1597 /** 1598 * Unregister a thread with the DPDK system. 1599 * 1600 * Only previously registered threads should be calling this just before 1601 * they are destroyed. 1602 */ 1603 static void dpdk_punregister_thread(libtrace_t *libtrace UNUSED, libtrace_thread_t *t UNUSED) 1604 { 1605 struct rte_config *cfg = rte_eal_get_configuration(); 1606 1607 assert(rte_lcore_id() < RTE_MAX_LCORE); 1608 pthread_mutex_lock(&dpdk_lock); 1609 /* Skip if master */ 1610 if (rte_lcore_id() == rte_get_master_lcore()) { 1611 fprintf(stderr, "INFO: we are skipping unregistering the master lcore\n"); 1612 pthread_mutex_unlock(&dpdk_lock); 1613 return; 1614 } 1615 1616 /* Disable this core in global DPDK structs */ 1617 cfg->lcore_role[rte_lcore_id()] = ROLE_OFF; 1618 cfg->lcore_count--; 1619 RTE_PER_LCORE(_lcore_id) = -1; // Might make the world burn if used again 1620 assert(cfg->lcore_count >= 1); // We cannot unregister the master LCORE!! 1621 pthread_mutex_unlock(&dpdk_lock); 1622 return; 959 1623 } 960 1624 961 1625 static int dpdk_start_output(libtrace_out_t *libtrace) 962 1626 { 963 char err[500]; 964 err[0] = 0; 965 966 if (dpdk_start_port(FORMAT(libtrace), err, sizeof(err)) != 0) { 967 trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err); 968 free(libtrace->format_data); 969 libtrace->format_data = NULL; 970 return -1; 971 } 972 return 0; 973 } 974 975 static int dpdk_pause_input(libtrace_t * libtrace){ 976 /* This stops the device, but can be restarted using rte_eth_dev_start() */ 977 if (FORMAT(libtrace)->paused == DPDK_RUNNING) { 978 #if DEBUG 979 fprintf(stderr, "Pausing port\n"); 980 #endif 981 rte_eth_dev_stop(FORMAT(libtrace)->port); 982 FORMAT(libtrace)->paused = DPDK_PAUSED; 983 /* If we pause it the driver will be reset and likely our counter */ 1627 char err[500]; 1628 err[0] = 0; 1629 1630 if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), 1) != 0) { 1631 trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err); 1632 free(libtrace->format_data); 1633 libtrace->format_data = NULL; 1634 return -1; 1635 } 1636 return 0; 1637 } 1638 1639 static int dpdk_pause_input(libtrace_t * libtrace) { 1640 libtrace_list_node_t *tmp = FORMAT_DATA_HEAD(libtrace); 1641 /* This stops the device, but can be restarted using rte_eth_dev_start() */ 1642 if (FORMAT(libtrace)->paused == DPDK_RUNNING) { 1643 #if DEBUG 1644 fprintf(stderr, "Pausing DPDK port\n"); 1645 #endif 1646 rte_eth_dev_stop(FORMAT(libtrace)->port); 1647 FORMAT(libtrace)->paused = DPDK_PAUSED; 1648 /* Empty the queue of packets */ 1649 for (; FORMAT(libtrace)->burst_offset < FORMAT(libtrace)->burst_size; ++FORMAT(libtrace)->burst_offset) { 1650 rte_pktmbuf_free(FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset]); 1651 } 1652 FORMAT(libtrace)->burst_offset = 0; 1653 FORMAT(libtrace)->burst_size = 0; 1654 1655 for (; tmp != NULL; tmp = tmp->next) { 1656 dpdk_per_stream_t *stream = tmp->data; 1657 stream->ts_last_sys = 0; 984 1658 #if HAS_HW_TIMESTAMPS_82580 985 FORMAT(libtrace)->ts_first_sys = 0; 986 FORMAT(libtrace)->ts_last_sys = 0; 987 #endif 988 } 989 return 0; 990 } 991 992 static int dpdk_write_packet(libtrace_out_t *trace, 993 libtrace_packet_t *packet){ 994 struct rte_mbuf* m_buff[1]; 995 996 int wirelen = trace_get_wire_length(packet); 997 int caplen = trace_get_capture_length(packet); 998 999 /* Check for a checksum and remove it */ 1000 if (trace_get_link_type(packet) == TRACE_TYPE_ETH && 1001 wirelen == caplen) 1002 caplen -= ETHER_CRC_LEN; 1003 1004 m_buff[0] = rte_pktmbuf_alloc(FORMAT(trace)->pktmbuf_pool); 1005 if (m_buff[0] == NULL) { 1006 trace_set_err_out(trace, errno, "Cannot get an empty packet buffer"); 1007 return -1; 1008 } else { 1009 int ret; 1010 memcpy(rte_pktmbuf_append(m_buff[0], caplen), packet->payload, caplen); 1011 do { 1012 ret = rte_eth_tx_burst(FORMAT(trace)->queue_id, FORMAT(trace)->port, m_buff, 1); 1013 } while (ret != 1); 1014 } 1015 1016 return 0; 1659 stream->ts_first_sys = 0; 1660 #endif 1661 } 1662 1663 } 1664 return 0; 1665 } 1666 1667 static int dpdk_write_packet(libtrace_out_t *trace, 1668 libtrace_packet_t *packet){ 1669 struct rte_mbuf* m_buff[1]; 1670 1671 int wirelen = trace_get_wire_length(packet); 1672 int caplen = trace_get_capture_length(packet); 1673 1674 /* Check for a checksum and remove it */ 1675 if (trace_get_link_type(packet) == TRACE_TYPE_ETH && 1676 wirelen == caplen) 1677 caplen -= ETHER_CRC_LEN; 1678 1679 m_buff[0] = rte_pktmbuf_alloc(FORMAT(trace)->pktmbuf_pool); 1680 if (m_buff[0] == NULL) { 1681 trace_set_err_out(trace, errno, "Cannot get an empty packet buffer"); 1682 return -1; 1683 } else { 1684 int ret; 1685 memcpy(rte_pktmbuf_append(m_buff[0], caplen), packet->payload, caplen); 1686 do { 1687 ret = rte_eth_tx_burst(0 /*queue TODO*/, FORMAT(trace)->port, m_buff, 1); 1688 } while (ret != 1); 1689 } 1690 1691 return 0; 1017 1692 } 1018 1693 1019 1694 static int dpdk_fin_input(libtrace_t * libtrace) { 1020 /* Free our memory structures */ 1021 if (libtrace->format_data != NULL) { 1022 /* Close the device completely, device cannot be restarted */ 1023 if (FORMAT(libtrace)->port != 0xFF) 1024 rte_eth_dev_close(FORMAT(libtrace)->port); 1025 /* filter here if we used it */ 1695 libtrace_list_node_t * n; 1696 /* Free our memory structures */ 1697 if (libtrace->format_data != NULL) { 1698 1699 if (FORMAT(libtrace)->port != 0xFF) 1700 rte_eth_dev_callback_unregister(FORMAT(libtrace)->port, 1701 RTE_ETH_EVENT_INTR_LSC, 1702 dpdk_lsc_callback, 1703 FORMAT(libtrace)); 1704 /* Close the device completely, device cannot be restarted */ 1705 rte_eth_dev_close(FORMAT(libtrace)->port); 1706 1707 dpdk_free_memory(FORMAT(libtrace)->pktmbuf_pool, 1708 FORMAT(libtrace)->nic_numa_node); 1709 1710 for (n = FORMAT(libtrace)->per_stream->head; n ; n = n->next) { 1711 dpdk_per_stream_t * stream = n->data; 1712 if (stream->mempool) 1713 dpdk_free_memory(stream->mempool, 1714 rte_lcore_to_socket_id(stream->lcore)); 1715 } 1716 1717 libtrace_list_deinit(FORMAT(libtrace)->per_stream); 1718 /* filter here if we used it */ 1026 1719 free(libtrace->format_data); 1027 1720 } 1028 1721 1029 /* Revert to the original PCI drivers */ 1030 /* No longer in DPDK 1031 rte_eal_pci_exit(); */ 1032 return 0; 1722 return 0; 1033 1723 } 1034 1724 1035 1725 1036 1726 static int dpdk_fin_output(libtrace_out_t * libtrace) { 1037 /* Free our memory structures */ 1038 if (libtrace->format_data != NULL) { 1039 /* Close the device completely, device cannot be restarted */ 1040 if (FORMAT(libtrace)->port != 0xFF) 1041 rte_eth_dev_close(FORMAT(libtrace)->port); 1042 /* filter here if we used it */ 1727 /* Free our memory structures */ 1728 if (libtrace->format_data != NULL) { 1729 /* Close the device completely, device cannot be restarted */ 1730 if (FORMAT(libtrace)->port != 0xFF) 1731 rte_eth_dev_close(FORMAT(libtrace)->port); 1732 libtrace_list_deinit(FORMAT(libtrace)->per_stream); 1733 /* filter here if we used it */ 1043 1734 free(libtrace->format_data); 1044 1735 } 1045 1736 1046 /* Revert to the original PCI drivers */ 1047 /* No longer in DPDK 1048 rte_eal_pci_exit(); */ 1049 return 0; 1050 } 1051 1052 /** 1053 * Get the start of additional header that we added to a packet. 1737 return 0; 1738 } 1739 1740 /** 1741 * Get the start of the additional header that we added to a packet. 1054 1742 */ 1055 1743 static inline struct dpdk_addt_hdr * get_addt_hdr (const libtrace_packet_t *packet) { 1056 uint8_t *hdrsize; 1057 assert(packet); 1058 assert(packet->buffer); 1059 hdrsize = (uint8_t *) MBUF_PKTDATA(packet->buffer); 1060 /* The byte before the original packet data denotes the size in bytes 1061 * of our additional header that we added sits before the 'size byte' */ 1062 hdrsize--; 1063 return (struct dpdk_addt_hdr *) (hdrsize - *hdrsize); 1744 assert(packet); 1745 assert(packet->buffer); 1746 /* Our header sits straight after the mbuf header */ 1747 return (struct dpdk_addt_hdr *) ((struct rte_mbuf*) packet->buffer + 1); 1064 1748 } 1065 1749 1066 1750 static int dpdk_get_capture_length (const libtrace_packet_t *packet) { 1067 1068 1751 struct dpdk_addt_hdr * hdr = get_addt_hdr(packet); 1752 return hdr->cap_len; 1069 1753 } 1070 1754 1071 1755 static size_t dpdk_set_capture_length(libtrace_packet_t *packet, size_t size) { 1072 1073 1074 1756 struct dpdk_addt_hdr * hdr = get_addt_hdr(packet); 1757 if (size > hdr->cap_len) { 1758 /* Cannot make a packet bigger */ 1075 1759 return trace_get_capture_length(packet); 1076 1760 } 1077 1761 1078 1079 1080 1762 /* Reset the cached capture length first*/ 1763 packet->capture_length = -1; 1764 hdr->cap_len = (uint32_t) size; 1081 1765 return trace_get_capture_length(packet); 1082 1766 } 1083 1767 1084 1768 static int dpdk_get_wire_length (const libtrace_packet_t *packet) { 1085 struct dpdk_addt_hdr * hdr = get_addt_hdr(packet); 1086 int org_cap_size; /* The original capture size */ 1087 if (hdr->flags & INCLUDES_HW_TIMESTAMP) { 1088 org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) - 1089 (MBUF_PKTDATA(packet->buffer) - (char *) hdr) - 1090 sizeof(struct hw_timestamp_82580); 1091 } else { 1092 org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) - 1093 (MBUF_PKTDATA(packet->buffer) - (char *) hdr); 1094 } 1095 if (hdr->flags & INCLUDES_CHECKSUM) { 1096 return org_cap_size; 1097 } else { 1098 /* DPDK packets are always TRACE_TYPE_ETH packets */ 1099 return org_cap_size + ETHER_CRC_LEN; 1100 } 1101 } 1769 struct dpdk_addt_hdr * hdr = get_addt_hdr(packet); 1770 int org_cap_size; /* The original capture size */ 1771 if (hdr->flags & INCLUDES_HW_TIMESTAMP) { 1772 org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) - 1773 sizeof(struct hw_timestamp_82580); 1774 } else { 1775 org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)); 1776 } 1777 if (hdr->flags & INCLUDES_CHECKSUM) { 1778 return org_cap_size; 1779 } else { 1780 /* DPDK packets are always TRACE_TYPE_ETH packets */ 1781 return org_cap_size + ETHER_CRC_LEN; 1782 } 1783 } 1784 1102 1785 static int dpdk_get_framing_length (const libtrace_packet_t *packet) { 1103 1104 1105 1106 sizeof(struct hw_timestamp_82580);1107 1108 1786 struct dpdk_addt_hdr * hdr = get_addt_hdr(packet); 1787 if (hdr->flags & INCLUDES_HW_TIMESTAMP) 1788 return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM + 1789 sizeof(struct hw_timestamp_82580); 1790 else 1791 return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM; 1109 1792 } 1110 1793 1111 1794 static int dpdk_prepare_packet(libtrace_t *libtrace UNUSED, 1112 libtrace_packet_t *packet, void *buffer, 1113 libtrace_rt_types_t rt_type, uint32_t flags) { 1114 assert(packet); 1115 if (packet->buffer != buffer && 1116 packet->buf_control == TRACE_CTRL_PACKET) { 1117 free(packet->buffer); 1118 } 1119 1120 if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) { 1121 packet->buf_control = TRACE_CTRL_PACKET; 1122 } else 1123 packet->buf_control = TRACE_CTRL_EXTERNAL; 1124 1125 packet->buffer = buffer; 1126 packet->header = buffer; 1127 1128 /* Don't use pktmbuf_mtod will fail if the packet is a copy */ 1129 packet->payload = (char *)buffer + dpdk_get_framing_length(packet); 1130 packet->type = rt_type; 1131 return 0; 1132 } 1133 1134 /* 1135 * Does any extra preperation to a captured packet. 1136 * This includes adding our extra header to it with the timestamp 1137 */ 1138 static inline int dpdk_ready_pkt(libtrace_t *libtrace, libtrace_packet_t *packet, 1139 struct rte_mbuf* pkt){ 1140 uint8_t * hdr_size; 1141 struct dpdk_addt_hdr *hdr; 1795 libtrace_packet_t *packet, void *buffer, 1796 libtrace_rt_types_t rt_type, uint32_t flags) { 1797 assert(packet); 1798 if (packet->buffer != buffer && 1799 packet->buf_control == TRACE_CTRL_PACKET) { 1800 free(packet->buffer); 1801 } 1802 1803 if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) 1804 packet->buf_control = TRACE_CTRL_PACKET; 1805 else 1806 packet->buf_control = TRACE_CTRL_EXTERNAL; 1807 1808 packet->buffer = buffer; 1809 packet->header = buffer; 1810 1811 /* Don't use pktmbuf_mtod will fail if the packet is a copy */ 1812 packet->payload = (char *)buffer + dpdk_get_framing_length(packet); 1813 packet->type = rt_type; 1814 return 0; 1815 } 1816 1817 /** 1818 * Given a packet size and a link speed, computes the 1819 * time to transmit in nanoseconds. 1820 * 1821 * @param format_data The dpdk format data from which we get the link speed 1822 * and if unset updates it in a thread safe manner 1823 * @param pkt_size The size of the packet in bytes 1824 * @return The wire time in nanoseconds 1825 */ 1826 static inline uint32_t calculate_wire_time(struct dpdk_format_data_t* format_data, uint32_t pkt_size) { 1827 uint32_t wire_time; 1828 /* 20 extra bytes of interframe gap and preamble */ 1829 # if GET_MAC_CRC_CHECKSUM 1830 wire_time = ((pkt_size + 20) * 8000); 1831 # else 1832 wire_time = ((pkt_size + 20 + ETHER_CRC_LEN) * 8000); 1833 # endif 1834 1835 /* Division is really slow and introduces a pipeline stall 1836 * The compiler will optimise this into magical multiplication and shifting 1837 * See http://ridiculousfish.com/blog/posts/labor-of-division-episode-i.html 1838 */ 1839 retry_calc_wiretime: 1840 switch (format_data->link_speed) { 1841 case ETH_LINK_SPEED_40G: 1842 wire_time /= ETH_LINK_SPEED_40G; 1843 break; 1844 case ETH_LINK_SPEED_20G: 1845 wire_time /= ETH_LINK_SPEED_20G; 1846 break; 1847 case ETH_LINK_SPEED_10G: 1848 wire_time /= ETH_LINK_SPEED_10G; 1849 break; 1850 case ETH_LINK_SPEED_1000: 1851 wire_time /= ETH_LINK_SPEED_1000; 1852 break; 1853 case 0: 1854 { 1855 /* Maybe the link was down originally, but now it should be up */ 1856 struct rte_eth_link link = {0}; 1857 rte_eth_link_get_nowait(format_data->port, &link); 1858 if (link.link_status && link.link_speed) { 1859 format_data->link_speed = link.link_speed; 1860 #ifdef DEBUG 1861 fprintf(stderr, "Link has come up updated speed=%d\n", (int) link.link_speed); 1862 #endif 1863 goto retry_calc_wiretime; 1864 } 1865 /* We don't know the link speed, make sure numbers are counting up */ 1866 wire_time = 1; 1867 break; 1868 } 1869 default: 1870 wire_time /= format_data->link_speed; 1871 } 1872 return wire_time; 1873 } 1874 1875 /** 1876 * Does any extra preperation to all captured packets 1877 * This includes adding our extra header to it with the timestamp, 1878 * and any snapping 1879 * 1880 * @param format_data The DPDK format data 1881 * @param plc The DPDK per lcore format data 1882 * @param pkts An array of size nb_pkts of DPDK packets 1883 */ 1884 static inline void dpdk_ready_pkts(libtrace_t *libtrace, 1885 struct dpdk_per_stream_t *plc, 1886 struct rte_mbuf **pkts, 1887 size_t nb_pkts) { 1888 struct dpdk_format_data_t *format_data = FORMAT(libtrace); 1889 struct dpdk_addt_hdr *hdr; 1890 size_t i; 1891 uint64_t cur_sys_time_ns; 1142 1892 #if HAS_HW_TIMESTAMPS_82580 1143 struct hw_timestamp_82580 *hw_ts; 1144 struct timeval cur_sys_time; 1145 uint64_t cur_sys_time_ns; 1146 uint64_t estimated_wraps; 1147 1148 /* Using gettimeofday because it's most likely to be a vsyscall 1149 * We don't want to slow down anything with systemcalls we dont need 1150 * accauracy */ 1151 gettimeofday(&cur_sys_time, NULL); 1893 struct hw_timestamp_82580 *hw_ts; 1894 uint64_t estimated_wraps; 1152 1895 #else 1153 # if USE_CLOCK_GETTIME 1154 struct timespec cur_sys_time; 1155 1156 /* This looks terrible and I feel bad doing it. But it's OK 1157 * on new kernels, because this is a vsyscall */ 1158 clock_gettime(CLOCK_REALTIME, &cur_sys_time); 1159 # else 1160 struct timeval cur_sys_time; 1161 /* Should be a vsyscall */ 1162 gettimeofday(&cur_sys_time, NULL); 1163 # endif 1164 #endif 1165 1166 /* Record the size of our header */ 1167 hdr_size = (uint8_t *) rte_pktmbuf_prepend(pkt, sizeof(uint8_t)); 1168 *hdr_size = sizeof(struct dpdk_addt_hdr); 1169 /* Now put our header in front of that size */ 1170 hdr = (struct dpdk_addt_hdr *) rte_pktmbuf_prepend(pkt, sizeof(struct dpdk_addt_hdr)); 1171 memset(hdr, 0, sizeof(struct dpdk_addt_hdr)); 1172 1896 1897 #endif 1898 1899 #if USE_CLOCK_GETTIME 1900 struct timespec cur_sys_time = {0}; 1901 /* This looks terrible and I feel bad doing it. But it's OK 1902 * on new kernels, because this is a fast vsyscall */ 1903 clock_gettime(CLOCK_REALTIME, &cur_sys_time); 1904 cur_sys_time_ns = TS_TO_NS(cur_sys_time); 1905 #else 1906 struct timeval cur_sys_time = {0}; 1907 /* Also a fast vsyscall */ 1908 gettimeofday(&cur_sys_time, NULL); 1909 cur_sys_time_ns = TV_TO_NS(cur_sys_time); 1910 #endif 1911 1912 /* The system clock is not perfect so when running 1913 * at linerate we could timestamp a packet in the past. 1914 * To avoid this we munge the timestamp to appear 1ns 1915 * after the previous packet. We should eventually catch up 1916 * to system time since a 64byte packet on a 10G link takes 67ns. 1917 * 1918 * Note with parallel readers timestamping packets 1919 * with duplicate stamps or out of order is unavoidable without 1920 * hardware timestamping from the NIC. 1921 */ 1922 #if !HAS_HW_TIMESTAMPS_82580 1923 if (plc->ts_last_sys >= cur_sys_time_ns) { 1924 cur_sys_time_ns = plc->ts_last_sys + 1; 1925 } 1926 #endif 1927 1928 ct_assert(RTE_PKTMBUF_HEADROOM >= sizeof(struct dpdk_addt_hdr)); 1929 for (i = 0 ; i < nb_pkts ; ++i) { 1930 1931 /* We put our header straight after the dpdk header */ 1932 hdr = (struct dpdk_addt_hdr *) (pkts[i] + 1); 1933 memset(hdr, 0, sizeof(struct dpdk_addt_hdr)); 1934 1173 1935 #if GET_MAC_CRC_CHECKSUM 1174 /* Add back in the CRC sum */ 1175 rte_pktmbuf_pkt_len(pkt) += ETHER_CRC_LEN; 1176 rte_pktmbuf_data_len(pkt) += ETHER_CRC_LEN; 1177 hdr->flags |= INCLUDES_CHECKSUM; 1178 #endif 1936 /* Add back in the CRC sum */ 1937 rte_pktmbuf_pkt_len(pkt) += ETHER_CRC_LEN; 1938 rte_pktmbuf_data_len(pkt) += ETHER_CRC_LEN; 1939 hdr->flags |= INCLUDES_CHECKSUM; 1940 #endif 1941 1942 hdr->cap_len = rte_pktmbuf_pkt_len(pkts[i]); 1179 1943 1180 1944 #if HAS_HW_TIMESTAMPS_82580 1181 /* Taken from igb_ptp.c part of Intel Linux drivers (Good example code) 1182 * 1183 * +----------+---+ +--------------+ 1184 * 82580 | 24 | 8 | | 32 | 1185 * +----------+---+ +--------------+ 1186 * reserved \______ 40 bits _____/ 1187 * 1188 * The 40 bit 82580 SYSTIM overflows every 1189 * 2^40 * 10^-9 / 60 = 18.3 minutes. 1190 * 1191 * NOTE picture is in Big Endian order, in memory it's acutally in Little 1192 * Endian (for the full 64 bits) i.e. picture is mirrored 1193 */ 1194 1195 /* The timestamp is sitting before our packet and is included in pkt_len */ 1196 hdr->flags |= INCLUDES_HW_TIMESTAMP; 1197 hw_ts = (struct hw_timestamp_82580 *) MBUF_PKTDATA(pkt); 1198 1199 /* Despite what the documentation says this is in Little 1200 * Endian byteorder. Mask the reserved section out. 1201 */ 1202 hdr->timestamp = le64toh(hw_ts->timestamp) & 1203 ~(((~0ull)>>TS_NBITS_82580)<<TS_NBITS_82580); 1204 1205 cur_sys_time_ns = TV_TO_NS(cur_sys_time); 1206 if (unlikely(FORMAT(libtrace)->ts_first_sys == 0)) { 1207 FORMAT(libtrace)->ts_first_sys = cur_sys_time_ns - hdr->timestamp; 1208 FORMAT(libtrace)->ts_last_sys = FORMAT(libtrace)->ts_first_sys; 1209 } 1210 1211 /* This will have serious problems if packets aren't read quickly 1212 * that is within a couple of seconds because our clock cycles every 1213 * 18 seconds */ 1214 estimated_wraps = (cur_sys_time_ns - FORMAT(libtrace)->ts_last_sys) 1215 / (1ull<<TS_NBITS_82580); 1216 1217 /* Estimated_wraps gives the number of times the counter should have 1218 * wrapped (however depending on value last time it could have wrapped 1219 * twice more (if hw clock is close to its max value) or once less (allowing 1220 * for a bit of variance between hw and sys clock). But if the clock 1221 * shouldn't have wrapped once then don't allow it to go backwards in time */ 1222 if (unlikely(estimated_wraps >= 2)) { 1223 /* 2 or more wrap arounds add all but the very last wrap */ 1224 FORMAT(libtrace)->wrap_count += estimated_wraps - 1; 1225 } 1226 1227 /* Set the timestamp to the lowest possible value we're considering */ 1228 hdr->timestamp += FORMAT(libtrace)->ts_first_sys + 1229 FORMAT(libtrace)->wrap_count * (1ull<<TS_NBITS_82580); 1230 1231 /* In most runs only the first if() will need evaluating - i.e our 1232 * estimate is correct. */ 1233 if (unlikely(!WITHIN_VARIANCE(cur_sys_time_ns, 1234 hdr->timestamp, MAXSKEW_82580))) { 1235 /* Failed to match estimated_wraps-1 (or estimated_wraps in ==0 case) */ 1236 FORMAT(libtrace)->wrap_count++; 1237 hdr->timestamp += (1ull<<TS_NBITS_82580); 1238 if (!WITHIN_VARIANCE(cur_sys_time_ns, 1239 hdr->timestamp, MAXSKEW_82580)) { 1240 /* Failed to match estimated_wraps */ 1241 FORMAT(libtrace)->wrap_count++; 1242 hdr->timestamp += (1ull<<TS_NBITS_82580); 1243 if (!WITHIN_VARIANCE(cur_sys_time_ns, 1244 hdr->timestamp, MAXSKEW_82580)) { 1245 if (estimated_wraps == 0) { 1246 /* 0 case Failed to match estimated_wraps+2 */ 1247 printf("WARNING - Hardware Timestamp failed to" 1248 " match using systemtime!\n"); 1249 hdr->timestamp = cur_sys_time_ns; 1250 } else { 1251 /* Failed to match estimated_wraps+1 */ 1252 FORMAT(libtrace)->wrap_count++; 1253 hdr->timestamp += (1ull<<TS_NBITS_82580); 1254 if (!WITHIN_VARIANCE(cur_sys_time_ns, 1255 hdr->timestamp, MAXSKEW_82580)) { 1256 /* Failed to match estimated_wraps+2 */ 1257 printf("WARNING - Hardware Timestamp failed to" 1258 " match using systemtime!!\n"); 1259 } 1260 } 1261 } 1262 } 1263 } 1264 1265 /* Log our previous for the next loop */ 1266 FORMAT(libtrace)->ts_last_sys = TV_TO_NS(cur_sys_time); 1267 1945 /* The timestamp is sitting before our packet and is included in pkt_len */ 1946 hdr->flags |= INCLUDES_HW_TIMESTAMP; 1947 hdr->cap_len -= sizeof(struct hw_timestamp_82580); 1948 hw_ts = (struct hw_timestamp_82580 *) MBUF_PKTDATA(pkts[i]); 1949 1950 /* Taken from igb_ptp.c part of Intel Linux drivers (Good example code) 1951 * 1952 * +----------+---+ +--------------+ 1953 * 82580 | 24 | 8 | | 32 | 1954 * +----------+---+ +--------------+ 1955 * reserved \______ 40 bits _____/ 1956 * 1957 * The 40 bit 82580 SYSTIM overflows every 1958 * 2^40 * 10^-9 / 60 = 18.3 minutes. 1959 * 1960 * NOTE picture is in Big Endian order, in memory it's acutally in Little 1961 * Endian (for the full 64 bits) i.e. picture is mirrored 1962 */ 1963 1964 /* Despite what the documentation says this is in Little 1965 * Endian byteorder. Mask the reserved section out. 1966 */ 1967 hdr->timestamp = le64toh(hw_ts->timestamp) & 1968 ~(((~0ull)>>TS_NBITS_82580)<<TS_NBITS_82580); 1969 1970 if (unlikely(plc->ts_first_sys == 0)) { 1971 plc->ts_first_sys = cur_sys_time_ns - hdr->timestamp; 1972 plc->ts_last_sys = plc->ts_first_sys; 1973 } 1974 1975 /* This will have serious problems if packets aren't read quickly 1976 * that is within a couple of seconds because our clock cycles every 1977 * 18 seconds */ 1978 estimated_wraps = (cur_sys_time_ns - plc->ts_last_sys) 1979 / (1ull<<TS_NBITS_82580); 1980 1981 /* Estimated_wraps gives the number of times the counter should have 1982 * wrapped (however depending on value last time it could have wrapped 1983 * twice more (if hw clock is close to its max value) or once less (allowing 1984 * for a bit of variance between hw and sys clock). But if the clock 1985 * shouldn't have wrapped once then don't allow it to go backwards in time */ 1986 if (unlikely(estimated_wraps >= 2)) { 1987 /* 2 or more wrap arounds add all but the very last wrap */ 1988 plc->wrap_count += estimated_wraps - 1; 1989 } 1990 1991 /* Set the timestamp to the lowest possible value we're considering */ 1992 hdr->timestamp += plc->ts_first_sys + 1993 plc->wrap_count * (1ull<<TS_NBITS_82580); 1994 1995 /* In most runs only the first if() will need evaluating - i.e our 1996 * estimate is correct. */ 1997 if (unlikely(!WITHIN_VARIANCE(cur_sys_time_ns, 1998 hdr->timestamp, MAXSKEW_82580))) { 1999 /* Failed to match estimated_wraps-1 (or estimated_wraps in ==0 case) */ 2000 plc->wrap_count++; 2001 hdr->timestamp += (1ull<<TS_NBITS_82580); 2002 if (!WITHIN_VARIANCE(cur_sys_time_ns, 2003 hdr->timestamp, MAXSKEW_82580)) { 2004 /* Failed to match estimated_wraps */ 2005 plc->wrap_count++; 2006 hdr->timestamp += (1ull<<TS_NBITS_82580); 2007 if (!WITHIN_VARIANCE(cur_sys_time_ns, 2008 hdr->timestamp, MAXSKEW_82580)) { 2009 if (estimated_wraps == 0) { 2010 /* 0 case Failed to match estimated_wraps+2 */ 2011 printf("WARNING - Hardware Timestamp failed to" 2012 " match using systemtime!\n"); 2013 hdr->timestamp = cur_sys_time_ns; 2014 } else { 2015 /* Failed to match estimated_wraps+1 */ 2016 plc->wrap_count++; 2017 hdr->timestamp += (1ull<<TS_NBITS_82580); 2018 if (!WITHIN_VARIANCE(cur_sys_time_ns, 2019 hdr->timestamp, MAXSKEW_82580)) { 2020 /* Failed to match estimated_wraps+2 */ 2021 printf("WARNING - Hardware Timestamp failed to" 2022 " match using systemtime!!\n"); 2023 } 2024 } 2025 } 2026 } 2027 } 1268 2028 #else 1269 # if USE_CLOCK_GETTIME 1270 hdr->timestamp = TS_TO_NS(cur_sys_time); 1271 # else 1272 hdr->timestamp = TV_TO_NS(cur_sys_time); 1273 # endif 1274 #endif 1275 1276 /* Intels samples prefetch into level 0 cache lets assume it is a good 1277 * idea and do the same */ 1278 rte_prefetch0(rte_pktmbuf_mtod(pkt, void *)); 1279 packet->buffer = pkt; 1280 dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0); 1281 1282 /* Set our capture length for the first time */ 1283 hdr->cap_len = dpdk_get_wire_length(packet); 1284 if (!(hdr->flags & INCLUDES_CHECKSUM)) { 1285 hdr->cap_len -= ETHER_CRC_LEN; 1286 } 1287 1288 1289 return dpdk_get_framing_length(packet) + 1290 dpdk_get_capture_length(packet); 2029 2030 hdr->timestamp = cur_sys_time_ns; 2031 /* Offset the next packet by the wire time of previous */ 2032 calculate_wire_time(format_data, hdr->cap_len); 2033 2034 #endif 2035 } 2036 2037 plc->ts_last_sys = cur_sys_time_ns; 2038 return; 2039 } 2040 2041 2042 static void dpdk_fin_packet(libtrace_packet_t *packet) 2043 { 2044 if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) { 2045 rte_pktmbuf_free(packet->buffer); 2046 packet->buffer = NULL; 2047 } 2048 } 2049 2050 /** Reads at least one packet or returns an error 2051 */ 2052 static inline int dpdk_read_packet_stream (libtrace_t *libtrace, 2053 dpdk_per_stream_t *stream, 2054 libtrace_message_queue_t *mesg, 2055 struct rte_mbuf* pkts_burst[], 2056 size_t nb_packets) { 2057 size_t nb_rx; /* Number of rx packets we've recevied */ 2058 while (1) { 2059 /* Poll for a batch of packets */ 2060 nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port, 2061 stream->queue_id, pkts_burst, nb_packets); 2062 if (nb_rx > 0) { 2063 /* Got some packets - otherwise we keep spining */ 2064 dpdk_ready_pkts(libtrace, stream, pkts_burst, nb_rx); 2065 //fprintf(stderr, "Doing P READ PACKET port=%d q=%d\n", (int) FORMAT(libtrace)->port, (int) get_thread_table_num(libtrace)); 2066 return nb_rx; 2067 } 2068 /* Check the message queue this could be less than 0 */ 2069 if (mesg && libtrace_message_queue_count(mesg) > 0) 2070 return READ_MESSAGE; 2071 if (libtrace_halt) 2072 return READ_EOF; 2073 /* Wait a while, polling on memory degrades performance 2074 * This relieves the pressure on memory allowing the NIC to DMA */ 2075 rte_delay_us(10); 2076 } 2077 2078 /* We'll never get here - but if we did it would be bad */ 2079 return READ_ERROR; 2080 } 2081 2082 static int dpdk_pread_packets (libtrace_t *libtrace, 2083 libtrace_thread_t *t, 2084 libtrace_packet_t **packets, 2085 size_t nb_packets) { 2086 int nb_rx; /* Number of rx packets we've recevied */ 2087 struct rte_mbuf* pkts_burst[nb_packets]; /* Array of pointer(s) */ 2088 int i; 2089 dpdk_per_stream_t *stream = t->format_data; 2090 2091 nb_rx = dpdk_read_packet_stream (libtrace, stream, &t->messages, 2092 pkts_burst, nb_packets); 2093 2094 if (nb_rx > 0) { 2095 for (i = 0; i < nb_rx; ++i) { 2096 if (packets[i]->buffer != NULL) { 2097 /* The packet should always be finished */ 2098 assert(packets[i]->buf_control == TRACE_CTRL_PACKET); 2099 free(packets[i]->buffer); 2100 } 2101 packets[i]->buf_control = TRACE_CTRL_EXTERNAL; 2102 packets[i]->type = TRACE_RT_DATA_DPDK; 2103 packets[i]->buffer = pkts_burst[i]; 2104 packets[i]->trace = libtrace; 2105 packets[i]->error = 1; 2106 dpdk_prepare_packet(libtrace, packets[i], packets[i]->buffer, packets[i]->type, 0); 2107 } 2108 } 2109 2110 return nb_rx; 1291 2111 } 1292 2112 1293 2113 static int dpdk_read_packet (libtrace_t *libtrace, libtrace_packet_t *packet) { 1294 int nb_rx; /* Number of rx packets we've recevied */ 1295 struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) */ 1296 1297 /* Free the last packet buffer */ 1298 if (packet->buffer != NULL) { 1299 /* Buffer is owned by DPDK */ 1300 if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) { 1301 rte_pktmbuf_free(packet->buffer); 1302 packet->buffer = NULL; 1303 } else 1304 /* Buffer is owned by packet i.e. has been malloc'd */ 1305 if (packet->buf_control == TRACE_CTRL_PACKET) { 1306 free(packet->buffer); 1307 packet->buffer = NULL; 1308 } 1309 } 1310 1311 packet->buf_control = TRACE_CTRL_EXTERNAL; 1312 packet->type = TRACE_RT_DATA_DPDK; 1313 1314 /* Wait for a packet */ 1315 while (1) { 1316 /* Poll for a single packet */ 1317 nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port, 1318 FORMAT(libtrace)->queue_id, pkts_burst, 1); 1319 if (nb_rx > 0) { /* Got a packet - otherwise we keep spining */ 1320 return dpdk_ready_pkt(libtrace, packet, pkts_burst[0]); 1321 } 1322 if (libtrace_halt) { 1323 return 0; 1324 } 1325 } 1326 1327 /* We'll never get here - but if we did it would be bad */ 1328 return -1; 2114 int nb_rx; /* Number of rx packets we've received */ 2115 dpdk_per_stream_t *stream = FORMAT_DATA_FIRST(libtrace); 2116 2117 /* Free the last packet buffer */ 2118 if (packet->buffer != NULL) { 2119 /* The packet should always be finished */ 2120 assert(packet->buf_control == TRACE_CTRL_PACKET); 2121 free(packet->buffer); 2122 packet->buffer = NULL; 2123 } 2124 2125 packet->buf_control = TRACE_CTRL_EXTERNAL; 2126 packet->type = TRACE_RT_DATA_DPDK; 2127 2128 /* Check if we already have some packets buffered */ 2129 if (FORMAT(libtrace)->burst_size != FORMAT(libtrace)->burst_offset) { 2130 packet->buffer = FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset++]; 2131 dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0); 2132 return 1; // TODO should be bytes read, which essentially useless anyway 2133 } 2134 2135 nb_rx = dpdk_read_packet_stream (libtrace, stream, NULL, 2136 FORMAT(libtrace)->burst_pkts, BURST_SIZE); 2137 2138 if (nb_rx > 0) { 2139 FORMAT(libtrace)->burst_size = nb_rx; 2140 FORMAT(libtrace)->burst_offset = 1; 2141 packet->buffer = FORMAT(libtrace)->burst_pkts[0]; 2142 dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0); 2143 return 1; 2144 } 2145 return nb_rx; 1329 2146 } 1330 2147 1331 2148 static struct timeval dpdk_get_timeval (const libtrace_packet_t *packet) { 1332 1333 1334 1335 1336 1337 2149 struct timeval tv; 2150 struct dpdk_addt_hdr * hdr = get_addt_hdr(packet); 2151 2152 tv.tv_sec = hdr->timestamp / (uint64_t) 1000000000; 2153 tv.tv_usec = (hdr->timestamp % (uint64_t) 1000000000) / 1000; 2154 return tv; 1338 2155 } 1339 2156 1340 2157 static struct timespec dpdk_get_timespec (const libtrace_packet_t *packet) { 1341 1342 1343 1344 1345 1346 2158 struct timespec ts; 2159 struct dpdk_addt_hdr * hdr = get_addt_hdr(packet); 2160 2161 ts.tv_sec = hdr->timestamp / (uint64_t) 1000000000; 2162 ts.tv_nsec = hdr->timestamp % (uint64_t) 1000000000; 2163 return ts; 1347 2164 } 1348 2165 1349 2166 static libtrace_linktype_t dpdk_get_link_type (const libtrace_packet_t *packet UNUSED) { 1350 2167 return TRACE_TYPE_ETH; /* Always ethernet until proven otherwise */ 1351 2168 } 1352 2169 1353 2170 static libtrace_direction_t dpdk_get_direction (const libtrace_packet_t *packet) { 1354 1355 2171 struct dpdk_addt_hdr * hdr = get_addt_hdr(packet); 2172 return (libtrace_direction_t) hdr->direction; 1356 2173 } 1357 2174 1358 2175 static libtrace_direction_t dpdk_set_direction(libtrace_packet_t *packet, libtrace_direction_t direction) { 1359 struct dpdk_addt_hdr * hdr = get_addt_hdr(packet); 1360 hdr->direction = (uint8_t) direction; 1361 return (libtrace_direction_t) hdr->direction; 1362 } 1363 1364 /* 1365 * NOTE: Drops could occur for other reasons than running out of buffer 1366 * space. Such as failed MAC checksums and oversized packets. 1367 */ 1368 static uint64_t dpdk_get_dropped_packets (libtrace_t *trace) { 1369 struct rte_eth_stats stats = {0}; 1370 1371 if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF) 1372 return UINT64_MAX; 1373 /* Grab the current stats */ 1374 rte_eth_stats_get(FORMAT(trace)->port, &stats); 1375 1376 /* Get the drop counter */ 1377 return (uint64_t) stats.ierrors; 1378 } 1379 1380 static uint64_t dpdk_get_captured_packets (libtrace_t *trace) { 1381 struct rte_eth_stats stats = {0}; 1382 1383 if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF) 1384 return UINT64_MAX; 1385 /* Grab the current stats */ 1386 rte_eth_stats_get(FORMAT(trace)->port, &stats); 1387 1388 /* Get the drop counter */ 1389 return (uint64_t) stats.ipackets; 1390 } 1391 1392 /* 1393 * This is the number of packets filtered by the NIC 1394 * and maybe ahead of number read using libtrace. 1395 * 1396 * XXX we are yet to implement any filtering, but if it was this should 1397 * get the result. So this will just return 0 for now. 1398 */ 1399 static uint64_t dpdk_get_filtered_packets (libtrace_t *trace) { 1400 struct rte_eth_stats stats = {0}; 1401 1402 if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF) 1403 return UINT64_MAX; 1404 /* Grab the current stats */ 1405 rte_eth_stats_get(FORMAT(trace)->port, &stats); 1406 1407 /* Get the drop counter */ 1408 return (uint64_t) stats.fdirmiss; 2176 struct dpdk_addt_hdr * hdr = get_addt_hdr(packet); 2177 hdr->direction = (uint8_t) direction; 2178 return (libtrace_direction_t) hdr->direction; 2179 } 2180 2181 static void dpdk_get_stats(libtrace_t *trace, libtrace_stat_t *stats) { 2182 struct rte_eth_stats dev_stats = {0}; 2183 2184 if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF) 2185 return; 2186 2187 /* Grab the current stats */ 2188 rte_eth_stats_get(FORMAT(trace)->port, &dev_stats); 2189 2190 stats->captured_valid = true; 2191 stats->captured = dev_stats.ipackets; 2192 2193 /* Not that we support adding filters but if we did this 2194 * would work */ 2195 stats->filtered += dev_stats.fdirmiss; 2196 2197 stats->dropped_valid = true; 2198 stats->dropped = dev_stats.imissed; 2199 2200 /* DPDK errors includes drops */ 2201 stats->errors_valid = true; 2202 stats->errors = dev_stats.ierrors - dev_stats.imissed; 2203 2204 stats->received_valid = true; 2205 stats->received = dev_stats.ipackets + dev_stats.imissed; 2206 1409 2207 } 1410 2208 … … 1414 2212 */ 1415 2213 static libtrace_eventobj_t dpdk_trace_event(libtrace_t *trace, 1416 libtrace_packet_t *packet) { 1417 libtrace_eventobj_t event = {0,0,0.0,0}; 1418 int nb_rx; /* Number of receive packets we've read */ 1419 struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) to rx buffers */ 1420 1421 do { 1422 1423 /* See if we already have a packet waiting */ 1424 nb_rx = rte_eth_rx_burst(FORMAT(trace)->port, 1425 FORMAT(trace)->queue_id, pkts_burst, 1); 1426 1427 if (nb_rx > 0) { 1428 /* Free the last packet buffer */ 1429 if (packet->buffer != NULL) { 1430 /* Buffer is owned by DPDK */ 1431 if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) { 1432 rte_pktmbuf_free(packet->buffer); 1433 packet->buffer = NULL; 1434 } else 1435 /* Buffer is owned by packet i.e. has been malloc'd */ 1436 if (packet->buf_control == TRACE_CTRL_PACKET) { 1437 free(packet->buffer); 1438 packet->buffer = NULL; 1439 } 1440 } 1441 1442 packet->buf_control = TRACE_CTRL_EXTERNAL; 1443 packet->type = TRACE_RT_DATA_DPDK; 1444 event.type = TRACE_EVENT_PACKET; 1445 event.size = dpdk_ready_pkt(trace, packet, pkts_burst[0]); 1446 1447 /* XXX - Check this passes the filter trace_read_packet normally 1448 * does this for us but this wont */ 1449 if (trace->filter) { 1450 if (!trace_apply_filter(trace->filter, packet)) { 1451 /* Failed the filter so we loop for another packet */ 1452 trace->filtered_packets ++; 1453 continue; 1454 } 1455 } 1456 trace->accepted_packets ++; 1457 } else { 1458 /* We only want to sleep for a very short time - we are non-blocking */ 1459 event.type = TRACE_EVENT_SLEEP; 1460 event.seconds = 0.0001; 1461 event.size = 0; 1462 } 1463 1464 /* If we get here we have our event */ 1465 break; 1466 } while (1); 1467 1468 return event; 1469 } 1470 2214 libtrace_packet_t *packet) { 2215 libtrace_eventobj_t event = {0,0,0.0,0}; 2216 int nb_rx; /* Number of receive packets we've read */ 2217 struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) to rx buffers */ 2218 2219 do { 2220 2221 /* See if we already have a packet waiting */ 2222 nb_rx = rte_eth_rx_burst(FORMAT(trace)->port, 2223 FORMAT_DATA_FIRST(trace)->queue_id, 2224 pkts_burst, 1); 2225 2226 if (nb_rx > 0) { 2227 /* Free the last packet buffer */ 2228 if (packet->buffer != NULL) { 2229 /* The packet should always be finished */ 2230 assert(packet->buf_control == TRACE_CTRL_PACKET); 2231 free(packet->buffer); 2232 packet->buffer = NULL; 2233 } 2234 2235 packet->buf_control = TRACE_CTRL_EXTERNAL; 2236 packet->type = TRACE_RT_DATA_DPDK; 2237 event.type = TRACE_EVENT_PACKET; 2238 dpdk_ready_pkts(trace, FORMAT_DATA_FIRST(trace), pkts_burst, 1); 2239 packet->buffer = FORMAT(trace)->burst_pkts[0]; 2240 dpdk_prepare_packet(trace, packet, packet->buffer, packet->type, 0); 2241 event.size = 1; // TODO should be bytes read, which essentially useless anyway 2242 2243 /* XXX - Check this passes the filter trace_read_packet normally 2244 * does this for us but this wont */ 2245 if (trace->filter) { 2246 if (!trace_apply_filter(trace->filter, packet)) { 2247 /* Failed the filter so we loop for another packet */ 2248 trace->filtered_packets ++; 2249 continue; 2250 } 2251 } 2252 trace->accepted_packets ++; 2253 } else { 2254 /* We only want to sleep for a very short time - we are non-blocking */ 2255 event.type = TRACE_EVENT_SLEEP; 2256 event.seconds = 0.0001; 2257 event.size = 0; 2258 } 2259 2260 /* If we get here we have our event */ 2261 break; 2262 } while (1); 2263 2264 return event; 2265 } 1471 2266 1472 2267 static void dpdk_help(void) { 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 } 1489 1490 2268 printf("dpdk format module: $Revision: 1752 $\n"); 2269 printf("Supported input URIs:\n"); 2270 printf("\tdpdk:<domain:bus:devid.func>-<coreid>\n"); 2271 printf("\tThe -<coreid> is optional \n"); 2272 printf("\t e.g. dpdk:0000:01:00.1\n"); 2273 printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n\n"); 2274 printf("\t By default the last CPU core is used if not otherwise specified.\n"); 2275 printf("\t Only a single libtrace instance of dpdk can use the same CPU core.\n"); 2276 printf("\t Support for multiple simultaneous instances of dpdk format is currently limited.\n"); 2277 printf("\n"); 2278 printf("Supported output URIs:\n"); 2279 printf("\tSame format as the input URI.\n"); 2280 printf("\t e.g. dpdk:0000:01:00.1\n"); 2281 printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n"); 2282 printf("\n"); 2283 } 2284 2285 static struct libtrace_format_t dpdk = { 1491 2286 "dpdk", 1492 "$Id : format_dpdk.c 1805 2013-03-08 02:01:35Z salcock$",2287 "$Id$", 1493 2288 TRACE_FORMAT_DPDK, 1494 NULL, /* probe filename */ 1495 NULL, /* probe magic */ 1496 dpdk_init_input, /* init_input */ 1497 dpdk_config_input, /* config_input */ 1498 dpdk_start_input, /* start_input */ 1499 dpdk_pause_input, /* pause_input */ 1500 dpdk_init_output, /* init_output */ 1501 NULL, /* config_output */ 1502 dpdk_start_output, /* start_ouput */ 1503 dpdk_fin_input, /* fin_input */ 1504 dpdk_fin_output, /* fin_output */ 1505 dpdk_read_packet, /* read_packet */ 1506 dpdk_prepare_packet, /* prepare_packet */ 1507 NULL, /* fin_packet */ 1508 dpdk_write_packet, /* write_packet */ 1509 dpdk_get_link_type, /* get_link_type */ 1510 dpdk_get_direction, /* get_direction */ 1511 dpdk_set_direction, /* set_direction */ 1512 NULL, /* get_erf_timestamp */ 1513 dpdk_get_timeval, /* get_timeval */ 1514 dpdk_get_timespec, /* get_timespec */ 1515 NULL, /* get_seconds */ 1516 NULL, /* seek_erf */ 1517 NULL, /* seek_timeval */ 1518 NULL, /* seek_seconds */ 1519 dpdk_get_capture_length,/* get_capture_length */ 1520 dpdk_get_wire_length, /* get_wire_length */ 1521 dpdk_get_framing_length,/* get_framing_length */ 1522 dpdk_set_capture_length,/* set_capture_length */ 1523 NULL, /* get_received_packets */ 1524 dpdk_get_filtered_packets,/* get_filtered_packets */ 1525 dpdk_get_dropped_packets,/* get_dropped_packets */ 1526 dpdk_get_captured_packets,/* get_captured_packets */ 1527 NULL, /* get_fd */ 1528 dpdk_trace_event, /* trace_event */ 1529 dpdk_help, /* help */ 1530 NULL 2289 NULL, /* probe filename */ 2290 NULL, /* probe magic */ 2291 dpdk_init_input, /* init_input */ 2292 dpdk_config_input, /* config_input */ 2293 dpdk_start_input, /* start_input */ 2294 dpdk_pause_input, /* pause_input */ 2295 dpdk_init_output, /* init_output */ 2296 NULL, /* config_output */ 2297 dpdk_start_output, /* start_ouput */ 2298 dpdk_fin_input, /* fin_input */ 2299 dpdk_fin_output, /* fin_output */ 2300 dpdk_read_packet, /* read_packet */ 2301 dpdk_prepare_packet, /* prepare_packet */ 2302 dpdk_fin_packet, /* fin_packet */ 2303 dpdk_write_packet, /* write_packet */ 2304 dpdk_get_link_type, /* get_link_type */ 2305 dpdk_get_direction, /* get_direction */ 2306 dpdk_set_direction, /* set_direction */ 2307 NULL, /* get_erf_timestamp */ 2308 dpdk_get_timeval, /* get_timeval */ 2309 dpdk_get_timespec, /* get_timespec */ 2310 NULL, /* get_seconds */ 2311 NULL, /* seek_erf */ 2312 NULL, /* seek_timeval */ 2313 NULL, /* seek_seconds */ 2314 dpdk_get_capture_length, /* get_capture_length */ 2315 dpdk_get_wire_length, /* get_wire_length */ 2316 dpdk_get_framing_length, /* get_framing_length */ 2317 dpdk_set_capture_length, /* set_capture_length */ 2318 NULL, /* get_received_packets */ 2319 NULL, /* get_filtered_packets */ 2320 NULL, /* get_dropped_packets */ 2321 dpdk_get_stats, /* get_statistics */ 2322 NULL, /* get_fd */ 2323 dpdk_trace_event, /* trace_event */ 2324 dpdk_help, /* help */ 2325 NULL, /* next pointer */ 2326 {true, 8}, /* Live, NICs typically have 8 threads */ 2327 dpdk_pstart_input, /* pstart_input */ 2328 dpdk_pread_packets, /* pread_packets */ 2329 dpdk_pause_input, /* ppause */ 2330 dpdk_fin_input, /* p_fin */ 2331 dpdk_pregister_thread, /* pregister_thread */ 2332 dpdk_punregister_thread, /* punregister_thread */ 2333 NULL /* get thread stats */ 1531 2334 }; 1532 2335 -
lib/format_duck.c
rc5ac872 r5ab626a 362 362 NULL, /* get_filtered_packets */ 363 363 NULL, /* get_dropped_packets */ 364 NULL, /* get_ captured_packets */364 NULL, /* get_statistics */ 365 365 NULL, /* get_fd */ 366 366 NULL, /* trace_event */ 367 367 duck_help, /* help */ 368 NULL /* next pointer */ 368 NULL, /* next pointer */ 369 NON_PARALLEL(false) 369 370 }; 370 371 -
lib/format_erf.c
rb13caea rd420777 856 856 NULL, /* get_filtered_packets */ 857 857 erf_get_dropped_packets, /* get_dropped_packets */ 858 NULL, /* get_ captured_packets */858 NULL, /* get_statistics */ 859 859 NULL, /* get_fd */ 860 860 erf_event, /* trace_event */ 861 861 erf_help, /* help */ 862 NULL /* next pointer */ 862 NULL, /* next pointer */ 863 NON_PARALLEL(false) 863 864 }; 864 865 … … 899 900 NULL, /* get_filtered_packets */ 900 901 erf_get_dropped_packets, /* get_dropped_packets */ 901 NULL, /* get_ captured_packets */902 NULL, /* get_statistics */ 902 903 NULL, /* get_fd */ 903 904 erf_event, /* trace_event */ 904 905 erf_help, /* help */ 905 NULL /* next pointer */ 906 NULL, /* next pointer */ 907 NON_PARALLEL(false) 906 908 }; 907 909 -
lib/format_legacy.c
r1ca603b r5ab626a 548 548 NULL, /* get_filtered_packets */ 549 549 NULL, /* get_dropped_packets */ 550 NULL, /* get_ captured_packets */550 NULL, /* get_statistics */ 551 551 NULL, /* get_fd */ 552 552 trace_event_trace, /* trace_event */ 553 553 legacyatm_help, /* help */ 554 NULL /* next pointer */ 554 NULL, /* next pointer */ 555 NON_PARALLEL(false) 555 556 }; 556 557 … … 591 592 NULL, /* get_filtered_packets */ 592 593 NULL, /* get_dropped_packets */ 593 NULL, /* get_ captured_packets */594 NULL, /* get_statistics */ 594 595 NULL, /* get_fd */ 595 596 trace_event_trace, /* trace_event */ 596 597 legacyeth_help, /* help */ 597 NULL /* next pointer */ 598 NULL, /* next pointer */ 599 NON_PARALLEL(false) 598 600 }; 599 601 … … 634 636 NULL, /* get_filtered_packets */ 635 637 NULL, /* get_dropped_packets */ 636 NULL, /* get_ captured_packets */638 NULL, /* get_statistics */ 637 639 NULL, /* get_fd */ 638 640 trace_event_trace, /* trace_event */ 639 641 legacypos_help, /* help */ 640 642 NULL, /* next pointer */ 643 NON_PARALLEL(false) 641 644 }; 642 645 … … 677 680 NULL, /* get_filtered_packets */ 678 681 NULL, /* get_dropped_packets */ 679 NULL, /* get_ captured_packets */682 NULL, /* get_statistics */ 680 683 NULL, /* get_fd */ 681 684 trace_event_trace, /* trace_event */ 682 685 legacynzix_help, /* help */ 683 686 NULL, /* next pointer */ 687 NON_PARALLEL(false) 684 688 }; 685 689 -
lib/format_pcap.c
r4649fea r5ab626a 830 830 NULL, /* get_filtered_packets */ 831 831 NULL, /* get_dropped_packets */ 832 NULL, /* get_ captured_packets */832 NULL, /* get_statistics */ 833 833 NULL, /* get_fd */ 834 834 trace_event_trace, /* trace_event */ 835 835 pcap_help, /* help */ 836 NULL /* next pointer */ 836 NULL, /* next pointer */ 837 NON_PARALLEL(false) 837 838 }; 838 839 … … 873 874 NULL, /* get_filtered_packets */ 874 875 pcap_get_dropped_packets, /* get_dropped_packets */ 875 NULL, /* get_ captured_packets */876 NULL, /* get_statistics */ 876 877 pcap_get_fd, /* get_fd */ 877 878 trace_event_device, /* trace_event */ 878 879 pcapint_help, /* help */ 879 NULL /* next pointer */ 880 NULL, /* next pointer */ 881 NON_PARALLEL(true)