Changeset a21b45e
- Timestamp:
- 01/28/16 17:30:32 (6 years ago)
- Branches:
- pfring
- Children:
- 93c9aff
- Parents:
- e2cd7d8
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
lib/format_pfring.c
re2cd7d8 ra21b45e 50 50 51 51 #include <pfring.h> 52 #include <pfring_zc.h> 52 53 53 54 struct pfring_format_data_t { … … 59 60 }; 60 61 62 struct pfringzc_per_thread { 63 64 uint32_t lastbatch; 65 uint32_t nextpacket; 66 pfring_zc_pkt_buff ** buffers; 67 } 68 69 70 struct pfringzc_format_data_t { 71 pfring_zc_cluster *cluster; 72 pfring_zc_worker *hasher; 73 pfring_zc_buffer_pool *pool; 74 75 pfring_zc_queue **inqueues; 76 pfring_zc_queue **outqueues; 77 uint16_t clusterid; 78 int numthreads; 79 80 struct pfringzc_per_thread *perthreads; 81 82 int8_t promisc; 83 int snaplen; 84 char *bpffilter; 85 enum hasher_types hashtype; 86 87 }; 88 61 89 struct pfring_per_stream_t { 62 90 … … 69 97 70 98 #define DATA(x) ((struct pfring_format_data_t *)x->format_data) 99 #define ZCDATA(x) ((struct pfringzc_format_data_t *)x->format_data) 71 100 #define STREAM_DATA(x) ((struct pfring_per_stream_t *)x->data) 72 101 73 102 #define FORMAT_DATA DATA(libtrace) 103 #define ZC_FORMAT_DATA ZCDATA(libtrace) 74 104 #define FORMAT_DATA_HEAD FORMAT_DATA->per_stream->head 75 105 #define FORMAT_DATA_FIRST ((struct pfring_per_stream_t *)FORMAT_DATA_HEAD->data) 76 106 107 #define PFRINGZC_BATCHSIZE 10 77 108 78 109 typedef union { … … 207 238 } 208 239 240 static inline int pfringzc_init_queues(libtrace_t *libtrace, 241 struct pfringzc_format_data_t *fdata, int threads) { 242 243 int i, j; 244 char devname[4096]; 245 246 fdata->inqueues = calloc(threads, sizeof(pfring_zc_queue *)); 247 fdata->outqueues = calloc(threads, sizeof(pfring_zc_queue *)); 248 fdata->perthreads = calloc(threads, sizeof(struct pfringzc_per_thread)); 249 250 for (i = 0; i < threads; i++) { 251 snprintf(devname, 4095, "zc:%s@%d", libtrace->uridata, i); 252 253 fdata->perthreads[i]->buffers = calloc(PFRINGZC_BATCHSIZE, sizeof(pfring_zc_pkt_buff *)); 254 fdata->perthreads[i]->lastbatch = 0; 255 fdata->perthreads[i]->nextpacket = 0; 256 257 for (j = 0; j < PFRINGZC_BATCHSIZE; j++) { 258 fdata->perthreads[i]->buffers[j] = pfring_zc_get_packet_handle(fdata->cluster); 259 260 if (fdata->perthreads[i]->buffers[j] == NULL) { 261 trace_set_err(libtrace, errno, "Failed to create pfringzc packet handle"); 262 goto error; 263 } 264 } 265 266 fdata->inqueues[i] = pfring_zc_open_device(fdata->cluster, 267 devname, rx_only, 0); 268 if (data->inqueues[i] == NULL) { 269 trace_set_err(libtrace, errno, "Failed to create pfringzc in queue"); 270 goto error; 271 } 272 273 274 fdata->outqueues[i] = pfring_zc_create_queue(fdata->cluster, 275 8192); 276 if (data->outqueues[i] == NULL) { 277 trace_set_err(libtrace, errno, "Failed to create pfringzc out queue"); 278 goto error; 279 } 280 281 } 282 283 fdata->pool = pfring_zc_create_buffer_pool(fdata->cluster, 8); 284 if (fdata->pool == NULL) { 285 trace_set_err(libtrace, errno, "Failed to create pfringzc buffer pool"); 286 goto error; 287 } 288 289 return 0; 290 291 error: 292 //pfringzc_destroy_queues(libtrace, fdata, threads); 293 return -1; 294 295 } 296 297 static int pfringzc_start_input(libtrace_t *libtrace) { 298 299 if (ZC_FORMAT_DATA->cluster != NULL) { 300 trace_set_err(libtrace, TRACE_ERR_BAD_STATE, 301 "Attempted to start a pfringzc: input that was already started!"); 302 return -1; 303 } 304 305 if (libtrace->uridata == NULL) { 306 trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT, 307 "Missing interface name from pfringzc: URI"); 308 return -1; 309 } 310 311 ZC_FORMAT_DATA->cluster = pfring_zc_create_cluster( 312 ZC_FORMAT_DATA->clusterid, 313 1600, /* TODO calculate */ 314 0, /* meta-data length */ 315 8192 * 32687 + PFRINGZC_BATCHSIZE, /* number of buffers */ 316 pfring_zc_numa_get_cpu_node(0), /* bind to core 0 */ 317 NULL /* auto hugetlb mountpoint */ 318 ); 319 if (ZC_FORMAT_DATA->cluster == NULL) { 320 trace_set_err(libtrace, errno, "Failed to create pfringzc cluster"); 321 return -1; 322 } 323 324 if (pfringzc_init_queues(libtrace, ZC_FORMAT_DATA, 1) == -1) 325 return -1; 326 327 /* No hasher necessary, as we just have one thread */ 328 ZC_FORMAT_DATA->hasher = pfring_zc_run_balancer( 329 ZC_FORMAT_DATA->inqueues, ZC_FORMAT_DATA->outqueues, 1, 1, 330 ZC_FORMAT_DATA->pool, round_robin_bursts_policy, NULL, 331 NULL, NULL, 1, 0); 332 return 0; 333 } 334 209 335 static int pfring_start_input(libtrace_t *libtrace) { 210 336 struct pfring_per_stream_t *stream = FORMAT_DATA_FIRST; … … 323 449 324 450 return 0; 451 } 452 453 static int pfringzc_init_input(libtrace_t *libtrace) { 454 455 libtrace->format_data = (struct pfringzc_format_data_t *) 456 malloc(sizeof(struct pfringzc_format_data_t)); 457 assert(libtrace->format_data != NULL); 458 459 ZC_FORMAT_DATA->promisc = -1; 460 ZC_FORMAT_DATA->snaplen = LIBTRACE_PACKET_BUFSIZE; 461 ZC_FORMAT_DATA->bpffilter = NULL; 462 463 ZC_FORMAT_DATA->cluster = NULL; 464 ZC_FORMAT_DATA->inqueue = NULL; 465 ZC_FORMAT_DATA->outqueues = NULL; 466 ZC_FORMAT_DATA->buffers = NULL; 467 ZC_FORMAT_DATA->pool = NULL; 468 ZC_FORMAT_DATA->hasher = NULL; 469 ZC_FORMAT_DATA->hashtype = HASHER_BIDIRECTIONAL; 470 ZC_FORMAT_DATA->clusterid = (uint16_t)rand(); 471 472 return 0; 473 } 474 475 static int pfringzc_config_input(libtrace_t *libtrace, trace_option_t option, 476 void *data) { 477 478 switch (option) { 479 case TRACE_OPTION_SNAPLEN: 480 ZC_FORMAT_DATA->snaplen = *(int *)data; 481 return 0; 482 case TRACE_OPTION_PROMISC: 483 ZC_FORMAT_DATA->promisc = *(int *)data; 484 return 0; 485 case TRACE_OPTION_FILTER: 486 ZC_FORMAT_DATA->bpffilter = strdup((char *)data); 487 return 0; 488 case TRACE_OPTION_HASHER: 489 /* We can do bidirectional hashing on hardware 490 * by default, thanks to the ZC library */ 491 ZC_FORMAT_DATA->hashtype = *((enum hasher_types *)data); 492 switch (*((enum hasher_types *)data)) { 493 case HASHER_BIDIRECTIONAL: 494 case HASHER_UNIDIRECTIONAL: 495 return 0; 496 case HASHER_BALANCE: 497 return 0; 498 case HASHER_CUSTOM: 499 return -1; 500 } 501 break; 502 case TRACE_OPTION_META_FREQ: 503 break; 504 case TRACE_OPTION_EVENT_REALTIME: 505 break; 506 } 507 return -1; 325 508 } 326 509 … … 375 558 } 376 559 560 static int pfringzc_pause_input(libtrace_t *libtrace) { 561 562 /* hopefully this will clean up our buffers and queues? */ 563 pfring_zc_kill_worker(ZC_FORMAT_DATA->hasher); 564 pfring_zc_destroy_cluster(ZC_FORMAT_DATA->cluster); 565 return 0; 566 } 567 377 568 static int pfring_fin_input(libtrace_t *libtrace) { 378 569 … … 388 579 389 580 581 static int pfringzc_fin_input(libtrace_t *input) { 582 if (libtrace->format_data) { 583 if (ZC_FORMAT_DATA->bpffilter) 584 free(ZC_FORMAT_DATA->bpffilter); 585 free(libtrace->format_data); 586 } 587 return 0; 588 589 } 590 390 591 static int pfring_get_capture_length(const libtrace_packet_t *packet) { 391 592 struct libtrace_pfring_header *phdr; … … 444 645 packet->payload = (buffer + sizeof(struct libtrace_pfring_header)); 445 646 647 return 0; 648 } 649 650 static int pfringzc_read_batch(libtrace_t *libtrace, int oq, uint8_t block, 651 libtrace_message_queue_t *queue) { 652 653 int received; 654 655 do { 656 received = pfring_zc_recv_pkt_burst( 657 ZC_FORMAT_DATA->outqueues[oq], 658 ZC_FORMAT_DATA->buffers[oq], 659 PFRINGZC_BATCHSIZE, 660 0); 661 662 if (received < 0) { 663 trace_set_err(libtrace, errno, "Failed to read packet batch from pfringzc:"); 664 return -1; 665 } 666 667 if (received == 0) { 668 if (queue && libtrace_message_queue_count(queue) > 0) 669 return READ_MESSAGE; 670 continue; 671 } 672 673 ZC_FORMAT_DATA->lastbatch[oq] = received; 674 ZC_FORMAT_DATA->nextpacket[oq] = 0; 675 676 } while (block); 446 677 return 0; 447 678 } … … 485 716 return 0; 486 717 487 /* Convert the header fields to network byte order so we can488 * export them over RT safely. Also deal with 32 vs 64 bit489 * timevals! */490 //hdr->ts.tv_sec = bswap_host_to_le64((uint64_t)local.ts.tv_sec);491 //hdr->ts.tv_usec = bswap_host_to_le64((uint64_t)local.ts.tv_usec);492 718 #if __BYTE_ORDER == __LITTLE_ENDIAN 493 719 hdr->byteorder = PFRING_BYTEORDER_LITTLEENDIAN; … … 496 722 #endif 497 723 498 /*499 hdr->caplen = htonl(local.caplen);500 hdr->wlen = htonl(local.wlen);501 hdr->ext.ts_ns = bswap_host_to_le64(local.ext.ts_ns);502 hdr->ext.flags = htonl(local.ext.flags);503 hdr->ext.if_index = htonl(local.ext.if_index);504 hdr->ext.hash = htonl(local.ext.hash);505 hdr->ext.tx.bounce_iface = htonl(local.ext.tx.bounce_iface);506 hdr->ext.parsed_hdr_len = htons(local.ext.parsed_hdr_len);507 */508 724 hdr->caplen = (local.caplen); 509 725 hdr->wlen = (local.wlen); … … 528 744 return pfring_get_capture_length(packet) + 529 745 pfring_get_framing_length(packet); 746 747 } 748 749 static int pfringzc_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) 750 { 751 752 struct pfringzc_per_thread *pzt = ZC_FORMAT_DATA->perthreads[0]; 753 754 if (pzt->nextpacket >= pzt->lastbatch) { 755 /* Read a fresh batch of packets */ 756 if (pfringzc_read_batch(libtrace, 0, 1, NULL) < 0) { 757 return -1; 758 } 759 760 } 761 762 pfring_zc_pkt_buff *pbuf = pzt->buffers[pzt->nextpacket]; 763 pzt->nextpacket ++; 764 765 530 766 531 767 }
Note: See TracChangeset
for help on using the changeset viewer.