Changeset 29bbef0
- Timestamp:
- 03/30/14 17:48:26 (7 years ago)
- Branches:
- 4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
- Children:
- f1015ad
- Parents:
- dad224b
- Files:
-
- 16 added
- 21 edited
Legend:
- Unmodified
- Added
- Removed
-
lib/Makefile.am
r2138553 r29bbef0 2 2 include_HEADERS = libtrace.h dagformat.h lt_inttypes.h daglegacy.h rt_protocol.h erftypes.h 3 3 4 AM_CFLAGS=@LIBCFLAGS@ 5 AM_CXXFLAGS=@LIBCXXFLAGS@ 4 AM_CFLAGS=@LIBCFLAGS@ -pthread 5 AM_CXXFLAGS=@LIBCXXFLAGS@ -pthread 6 6 7 7 extra_DIST = format_template.c … … 40 40 endif 41 41 42 libtrace_la_SOURCES = trace.c common.h \42 libtrace_la_SOURCES = trace.c trace_parallel.c common.h \ 43 43 format_erf.c format_pcap.c format_legacy.c \ 44 44 format_rt.c format_helper.c format_helper.h format_pcapfile.c \ … … 52 52 $(DAGSOURCE) format_erf.h \ 53 53 $(BPFJITSOURCE) \ 54 libtrace_arphrd.h 54 libtrace_arphrd.h \ 55 trace_ringbuffer.c trace_vector.c libtrace_message_queue.c deque.c trace_sliding_window.c hash_toeplitz.c 55 56 56 57 if DAG2_4 -
lib/format_atmhdr.c
r5952ff0 r29bbef0 231 231 trace_event_trace, /* trace_event */ 232 232 NULL, /* help */ 233 NULL, /* pstart_input */ 234 NULL, /* pread_packet */ 235 NULL, /* ppause_input */ 236 NULL, /* pfin_input */ 237 NULL, /* pconfig_input */ 233 238 NULL /* next pointer */ 234 239 }; -
lib/format_bpf.c
r19b44c8 r29bbef0 648 648 NULL, /* trace_event */ 649 649 bpf_help, /* help */ 650 NULL, /* pstart_input */ 651 NULL, /* pread_packet */ 652 NULL, /* ppause_input */ 653 NULL, /* pfin_input */ 654 NULL, /* pconfig_input */ 650 655 NULL 651 656 }; -
lib/format_dag24.c
rc909fad r29bbef0 557 557 trace_event_dag, /* trace_event */ 558 558 dag_help, /* help */ 559 NULL, /* pstart_input */ 560 NULL, /* pread_packet */ 561 NULL, /* ppause_input */ 562 NULL, /* pfin_input */ 563 NULL, /* pconfig_input */ 559 564 NULL /* next pointer */ 560 565 }; -
lib/format_dag25.c
rc909fad r29bbef0 1210 1210 trace_event_dag, /* trace_event */ 1211 1211 dag_help, /* help */ 1212 NULL, /* pstart_input */ 1213 NULL, /* pread_packet */ 1214 NULL, /* ppause_input */ 1215 NULL, /* pfin_input */ 1216 NULL, /* pconfig_input */ 1212 1217 NULL /* next pointer */ 1213 1218 }; -
lib/format_dpdk.c
r2138553 r29bbef0 46 46 #include "format_helper.h" 47 47 #include "libtrace_arphrd.h" 48 #include "hash_toeplitz.h" 48 49 49 50 #ifdef HAVE_INTTYPES_H … … 72 73 #include <rte_mempool.h> 73 74 #include <rte_mbuf.h> 75 #include <rte_launch.h> 76 #include <rte_lcore.h> 77 #include <rte_per_lcore.h> 74 78 75 79 /* The default size of memory buffers to use - This is the max size of standard … … 129 133 130 134 /* Print verbose messages to stdout */ 131 #define DEBUG 0135 #define DEBUG 1 132 136 133 137 /* Use clock_gettime() for nanosecond resolution rather than gettimeofday() … … 176 180 DPDK_RUNNING, 177 181 DPDK_PAUSED, 182 }; 183 184 struct per_lcore_t 185 { 186 // TODO move time stamp stuff here 187 uint16_t queue_id; 188 uint8_t port; 189 uint8_t enabled; 178 190 }; 179 191 … … 194 206 char mempool_name[MEMPOOL_NAME_LEN]; /* The name of the mempool that we are using */ 195 207 unsigned int nb_blacklist; /* Number of blacklist items in are valid */ 208 uint8_t rss_key[40]; // This is the RSS KEY 196 209 #if HAS_HW_TIMESTAMPS_82580 197 210 /* Timestamping only relevent to RX */ … … 200 213 uint32_t wrap_count; /* Number of times the NIC clock has wrapped around completely */ 201 214 #endif 215 // DPDK normally seems to have a limit of 216 struct per_lcore_t per_lcore[RTE_MAX_LCORE]; 202 217 }; 203 218 … … 399 414 * Basically binds this thread to a fixed core, which we choose as 400 415 * the last core on the machine (assuming fewer interrupts mapped here). 401 * "-c" controls the cpu mask 0x1=1st core 0x2=2nd 0x4=3rd and so o m416 * "-c" controls the cpu mask 0x1=1st core 0x2=2nd 0x4=3rd and so on 402 417 * "-n" the number of memory channels into the CPU (hardware specific) 403 418 * - Most likely to be half the number of ram slots in your machine. … … 436 451 } 437 452 438 /* Make our mask */ 439 snprintf(cpu_number, sizeof(cpu_number), "%x", 0x 1 << (my_cpu - 1));453 /* Make our mask */ // 0x1 << (my_cpu - 1) 454 snprintf(cpu_number, sizeof(cpu_number), "%x", 0x3); 440 455 argv[2] = cpu_number; 441 456 … … 478 493 return -1; 479 494 } 495 496 struct rte_eth_dev_info dev_info; 497 rte_eth_dev_info_get(0, &dev_info); 498 printf("Device port=0\n\tmin_rx_bufsize=%d\n\tmax_rx_pktlen=%d\n\tmax rx queues=%d\n\tmax tx queues=%d", 499 (int) dev_info.min_rx_bufsize, (int) dev_info.max_rx_pktlen, (int) dev_info.max_rx_queues, (int) dev_info.max_tx_queues); 480 500 481 501 return 0; … … 485 505 char err[500]; 486 506 err[0] = 0; 507 int i; 487 508 488 509 libtrace->format_data = (struct dpdk_format_data_t *) … … 504 525 FORMAT(libtrace)->wrap_count = 0; 505 526 #endif 506 527 for (i = 0;i < RTE_MAX_LCORE; i++) { 528 // Disabled by default 529 FORMAT(libtrace)->per_lcore[i].enabled = 0; 530 } 531 507 532 if (dpdk_init_enviroment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) { 508 533 trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err); … … 547 572 }; 548 573 574 static int dpdk_pconfig_input (libtrace_t *libtrace, 575 trace_parallel_option_t option, 576 void *data) { 577 switch (option) { 578 case TRACE_OPTION_SET_HASHER: 579 switch (*((enum hasher_types *) data)) 580 { 581 case HASHER_BALANCE: 582 case HASHER_UNIDIRECTIONAL: 583 toeplitz_create_unikey(FORMAT(libtrace)->rss_key); 584 return 0; 585 case HASHER_BIDIRECTIONAL: 586 toeplitz_create_bikey(FORMAT(libtrace)->rss_key); 587 return 0; 588 case HASHER_HARDWARE: 589 case HASHER_CUSTOM: 590 // We don't support these 591 return -1; 592 } 593 } 594 return -1; 595 } 549 596 /** 550 597 * Note here snaplen excludes the MAC checksum. Packets over … … 596 643 static struct rte_eth_conf port_conf = { 597 644 .rxmode = { 645 .mq_mode = ETH_RSS, 598 646 .split_hdr_size = 0, 599 647 .header_split = 0, /**< Header Split disabled */ … … 619 667 .txmode = { 620 668 .mq_mode = ETH_DCB_NONE, 669 }, 670 .rx_adv_conf = { 671 .rss_conf = { 672 // .rss_key = &rss_key, // We set this per format 673 .rss_hf = ETH_RSS_IPV4_UDP | ETH_RSS_IPV6 | ETH_RSS_IPV4 | ETH_RSS_IPV4_TCP | ETH_RSS_IPV6_TCP | ETH_RSS_IPV6_UDP, 674 }, 621 675 }, 622 676 }; … … 710 764 */ 711 765 766 767 port_conf.rx_adv_conf.rss_conf.rss_key = format_data->rss_key; 768 712 769 /* This must be called first before another *eth* function 713 770 * 1 rx, 1 tx queue, port_conf sets checksum stripping etc */ … … 770 827 return 0; 771 828 } 829 int mapper_start(void *data); // This actually a void* 830 831 /* Attach memory to the port and start the port or restart the ports. 832 */ 833 static int dpdk_start_port_queues (libtrace_t *libtrace, struct dpdk_format_data_t * format_data, char *err, int errlen, uint16_t rx_queues){ 834 int ret, i; /* Check return values for errors */ 835 struct rte_eth_link link_info; /* Wait for link */ 836 837 /* Already started */ 838 if (format_data->paused == DPDK_RUNNING) 839 return 0; 840 841 /* First time started we need to alloc our memory, doing this here 842 * rather than in enviroment setup because we don't have snaplen then */ 843 if (format_data->paused == DPDK_NEVER_STARTED) { 844 if (format_data->snaplen == 0) { 845 format_data->snaplen = RX_MBUF_SIZE; 846 port_conf.rxmode.jumbo_frame = 0; 847 port_conf.rxmode.max_rx_pkt_len = 0; 848 } else { 849 /* Use jumbo frames */ 850 port_conf.rxmode.jumbo_frame = 1; 851 port_conf.rxmode.max_rx_pkt_len = format_data->snaplen; 852 } 853 854 /* This is additional overhead so make sure we allow space for this */ 855 #if GET_MAC_CRC_CHECKSUM 856 format_data->snaplen += ETHER_CRC_LEN; 857 #endif 858 #if HAS_HW_TIMESTAMPS_82580 859 format_data->snaplen += sizeof(struct hw_timestamp_82580); 860 #endif 861 862 /* Create the mbuf pool, which is the place our packets are allocated 863 * from - TODO figure out if there is is a free function (I cannot see one) 864 * NOTE: RX queue requires nb_packets + 1 otherwise it fails to 865 * allocate however that extra 1 packet is not used. 866 * (I assume <= vs < error some where in DPDK code) 867 * TX requires nb_tx_buffers + 1 in the case the queue is full 868 * so that will fill the new buffer and wait until slots in the 869 * ring become available. 870 */ 871 #if DEBUG 872 printf("Creating mempool named %s\n", format_data->mempool_name); 873 #endif 874 format_data->pktmbuf_pool = 875 rte_mempool_create(format_data->mempool_name, 876 format_data->nb_rx_buf*rx_queues + format_data->nb_tx_buf + 1, 877 format_data->snaplen + sizeof(struct rte_mbuf) 878 + RTE_PKTMBUF_HEADROOM, 879 8, sizeof(struct rte_pktmbuf_pool_private), 880 rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL, 881 0, MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET); 882 883 if (format_data->pktmbuf_pool == NULL) { 884 snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf " 885 "pool failed: %s", strerror(rte_errno)); 886 return -1; 887 } 888 } 889 890 /* ----------- Now do the setup for the port mapping ------------ */ 891 /* Order of calls must be 892 * rte_eth_dev_configure() 893 * rte_eth_tx_queue_setup() 894 * rte_eth_rx_queue_setup() 895 * rte_eth_dev_start() 896 * other rte_eth calls 897 */ 898 899 /* This must be called first before another *eth* function 900 * 1 rx, 1 tx queue, port_conf sets checksum stripping etc */ 901 ret = rte_eth_dev_configure(format_data->port, rx_queues, 1, &port_conf); 902 if (ret < 0) { 903 snprintf(err, errlen, "Intel DPDK - Cannot configure device port" 904 " %"PRIu8" : %s", format_data->port, 905 strerror(-ret)); 906 return -1; 907 } 908 #if DEBUG 909 printf("Doing dev configure\n"); 910 #endif 911 /* Initilise the TX queue a minimum value if using this port for 912 * receiving. Otherwise a larger size if writing packets. 913 */ 914 ret = rte_eth_tx_queue_setup(format_data->port, format_data->queue_id, 915 format_data->nb_tx_buf, SOCKET_ID_ANY, &tx_conf); 916 if (ret < 0) { 917 snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue on port" 918 " %"PRIu8" : %s", format_data->port, 919 strerror(-ret)); 920 return -1; 921 } 922 923 for (i=0; i < rx_queues; i++) { 924 #if DEBUG 925 printf("Doing queue configure\n"); 926 #endif 927 /* Initilise the RX queue with some packets from memory */ 928 ret = rte_eth_rx_queue_setup(format_data->port, i, 929 format_data->nb_rx_buf, SOCKET_ID_ANY, 930 &rx_conf, format_data->pktmbuf_pool); 931 if (ret < 0) { 932 snprintf(err, errlen, "Intel DPDK - Cannot configure RX queue on port" 933 " %"PRIu8" : %s", format_data->port, 934 strerror(-ret)); 935 return -1; 936 } 937 } 938 939 #if DEBUG 940 printf("Doing start device\n"); 941 #endif 942 /* Start device */ 943 ret = rte_eth_dev_start(format_data->port); 944 #if DEBUG 945 printf("Done start device\n"); 946 #endif 947 if (ret < 0) { 948 snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s", 949 strerror(-ret)); 950 return -1; 951 } 952 953 954 /* Default promiscuous to on */ 955 if (format_data->promisc == -1) 956 format_data->promisc = 1; 957 958 if (format_data->promisc == 1) 959 rte_eth_promiscuous_enable(format_data->port); 960 else 961 rte_eth_promiscuous_disable(format_data->port); 962 963 964 /* We have now successfully started/unpased */ 965 format_data->paused = DPDK_RUNNING; 966 967 // Can use remote launch for all 968 /*RTE_LCORE_FOREACH_SLAVE(i) { 969 rte_eal_remote_launch(mapper_start, (void *)libtrace, i); 970 }*/ 971 972 /* Wait for the link to come up */ 973 rte_eth_link_get(format_data->port, &link_info); 974 #if DEBUG 975 printf("Link status is %d %d %d\n", (int) link_info.link_status, 976 (int) link_info.link_duplex, (int) link_info.link_speed); 977 #endif 978 979 return 0; 980 } 772 981 773 982 static int dpdk_start_input (libtrace_t *libtrace) { … … 782 991 } 783 992 return 0; 993 } 994 995 static int dpdk_pstart_input (libtrace_t *libtrace) { 996 char err[500]; 997 int enabled_lcore_count = 0, i=0; 998 int tot = libtrace->mapper_thread_count; 999 err[0] = 0; 1000 1001 libtrace->mapper_thread_count; 1002 1003 for (i = 0; i < RTE_MAX_LCORE; i++) 1004 { 1005 if (rte_lcore_is_enabled(i)) 1006 enabled_lcore_count++; 1007 } 1008 1009 tot = MIN(libtrace->mapper_thread_count, enabled_lcore_count); 1010 tot = MIN(tot, 8); 1011 printf("Running pstart DPDK %d %d %d %d\n", tot, libtrace->mapper_thread_count, enabled_lcore_count, rte_lcore_count()); 1012 1013 if (dpdk_start_port_queues(libtrace, FORMAT(libtrace), err, sizeof(err), tot) != 0) { 1014 trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err); 1015 free(libtrace->format_data); 1016 libtrace->format_data = NULL; 1017 return -1; 1018 } 1019 1020 return 0; 1021 return tot; 784 1022 } 785 1023 … … 1116 1354 } 1117 1355 1356 1357 static void dpdk_fin_packet(libtrace_packet_t *packet) 1358 { 1359 if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) { 1360 rte_pktmbuf_free(packet->buffer); 1361 packet->buffer = NULL; 1362 } 1363 } 1364 1118 1365 static int dpdk_read_packet (libtrace_t *libtrace, libtrace_packet_t *packet) { 1119 1366 int nb_rx; /* Number of rx packets we've recevied */ … … 1145 1392 return dpdk_ready_pkt(libtrace, packet, pkts_burst[0]); 1146 1393 } 1394 } 1395 1396 /* We'll never get here - but if we did it would be bad */ 1397 return -1; 1398 } 1399 libtrace_thread_t * get_thread_table(libtrace_t *libtrace); 1400 static int dpdk_pread_packet (libtrace_t *libtrace, libtrace_packet_t *packet) { 1401 int nb_rx; /* Number of rx packets we've recevied */ 1402 struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) */ 1403 1404 /* Free the last packet buffer */ 1405 if (packet->buffer != NULL) { 1406 /* Buffer is owned by DPDK */ 1407 if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) { 1408 rte_pktmbuf_free(packet->buffer); 1409 packet->buffer = NULL; 1410 } else 1411 /* Buffer is owned by packet i.e. has been malloc'd */ 1412 if (packet->buf_control == TRACE_CTRL_PACKET) { 1413 free(packet->buffer); 1414 packet->buffer = NULL; 1415 } 1416 } 1417 1418 packet->buf_control = TRACE_CTRL_EXTERNAL; 1419 packet->type = TRACE_RT_DATA_DPDK; 1420 1421 /* Wait for a packet */ 1422 while (1) { 1423 /* Poll for a single packet */ 1424 nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port, 1425 get_thread_table_num(libtrace), pkts_burst, 1); 1426 if (nb_rx > 0) { /* Got a packet - otherwise we keep spining */ 1427 printf("Doing P READ PACKET port=%d q=%d\n", (int) FORMAT(libtrace)->port, (int) get_thread_table_num(libtrace)); 1428 return dpdk_ready_pkt(libtrace, packet, pkts_burst[0]); 1429 } 1430 // Check the message queue this could be (Well it shouldn't but anyway) be less than 0 1431 if (libtrace_message_queue_count(&(get_thread_table(libtrace)->messages)) > 0) { 1432 printf("Extra message yay"); 1433 return -2; 1434 } 1147 1435 } 1148 1436 … … 1325 1613 dpdk_read_packet, /* read_packet */ 1326 1614 dpdk_prepare_packet, /* prepare_packet */ 1327 NULL, /* fin_packet */1615 dpdk_fin_packet, /* fin_packet */ 1328 1616 dpdk_write_packet, /* write_packet */ 1329 1617 dpdk_get_link_type, /* get_link_type */ … … 1348 1636 dpdk_trace_event, /* trace_event */ 1349 1637 dpdk_help, /* help */ 1638 dpdk_pstart_input, /* pstart_input */ 1639 dpdk_pread_packet, /* pread_packet */ 1640 dpdk_pause_input, /* ppause */ 1641 dpdk_fin_input, /* p_fin */ 1642 dpdk_pconfig_input, /* pconfig_input */ 1350 1643 NULL 1351 1644 }; -
lib/format_duck.c
r9b097ea r29bbef0 360 360 NULL, /* trace_event */ 361 361 duck_help, /* help */ 362 NULL, /* pstart_input */ 363 NULL, /* pread_packet */ 364 NULL, /* ppause_input */ 365 NULL, /* pfin_input */ 366 NULL, /* pconfig_input */ 362 367 NULL /* next pointer */ 363 368 }; -
lib/format_erf.c
rf7bcbfb r29bbef0 828 828 erf_event, /* trace_event */ 829 829 erf_help, /* help */ 830 NULL, /* pstart_input */ 831 NULL, /* pread_packet */ 832 NULL, /* ppause_input */ 833 NULL, /* pfin_input */ 834 NULL, /* pconfig_input */ 830 835 NULL /* next pointer */ 831 836 }; … … 871 876 erf_event, /* trace_event */ 872 877 erf_help, /* help */ 878 NULL, /* pstart_input */ 879 NULL, /* pread_packet */ 880 NULL, /* ppause_input */ 881 NULL, /* pfin_input */ 882 NULL, /* pconfig_input */ 873 883 NULL /* next pointer */ 874 884 }; -
lib/format_legacy.c
r1ca603b r29bbef0 681 681 trace_event_trace, /* trace_event */ 682 682 legacynzix_help, /* help */ 683 NULL, /* pstart_input */ 684 NULL, /* pread_packet */ 685 NULL, /* ppause_input */ 686 NULL, /* pfin_input */ 687 NULL, /* pconfig_input */ 683 688 NULL, /* next pointer */ 684 689 }; -
lib/format_linux.c
r7a529a9 r29bbef0 72 72 #include <sys/mman.h> 73 73 74 #include <fcntl.h> 75 74 76 /* MAX_ORDER is defined in linux/mmzone.h. 10 is default for 2.4 kernel. 75 77 * max_order will be decreased by one if the ring buffer fails to allocate. … … 147 149 #define PACKET_HDRLEN 11 148 150 #define PACKET_TX_RING 13 151 #define PACKET_FANOUT 18 149 152 #define TP_STATUS_USER 0x1 150 153 #define TP_STATUS_SEND_REQUEST 0x1 … … 154 157 #define TPACKET_ALIGN(x) (((x)+TPACKET_ALIGNMENT-1)&~(TPACKET_ALIGNMENT-1)) 155 158 #define TPACKET_HDRLEN (TPACKET_ALIGN(sizeof(struct tpacket2_hdr)) + sizeof(struct sockaddr_ll)) 159 160 /* Since 3.1 kernel we have packet_fanout support */ 161 // schedule to socket by skb's rxhash - the implementation is bi-directional 162 #define PACKET_FANOUT_HASH 0 163 // schedule round robin 164 #define PACKET_FANOUT_LB 1 165 // schedule to the same socket that received the packet 166 #define PACKET_FANOUT_CPU 2 167 // Something to do with fragmented packets and hashing problems !! TODO figure out if this needs to be on 168 #define PACKET_FANOUT_FLAG_DEFRAG 0x8000 169 /* Included but unused by libtrace since 3.10 */ 170 // if one socket if full roll over to the next 171 #define PACKET_FANOUT_ROLLOVER 3 172 // This flag makes any other system roll over 173 #define PACKET_FANOUT_FLAG_ROLLOVER 0x1000 174 /* Included but unused by libtrace since 3.12 */ 175 // schedule random 176 #define PACKET_FANOUT_RND 4 177 156 178 157 179 enum tpacket_versions { … … 184 206 unsigned int tp_frame_size; /* Size of frame */ 185 207 unsigned int tp_frame_nr; /* Total number of frames */ 208 }; 209 210 struct linux_per_thread_t { 211 char *rx_ring; 212 int rxring_offset; 213 int fd; 214 // The flag layout should be the same for all (I Hope) 215 // max_order 186 216 }; 187 217 … … 212 242 /* Used to determine buffer size for the ring buffer */ 213 243 uint32_t max_order; 244 /* Used for the parallel case, fanout is the mode */ 245 uint16_t fanout_flags; 246 /* The group lets Linux know which sockets to group together 247 * so we use a random here to try avoid collisions */ 248 uint16_t fanout_group; 249 /* When running in parallel mode this is malloc'd with an array 250 * file descriptors from packet fanout will use, here we assume/hope 251 * that every ring can get setup the same */ 252 struct linux_per_thread_t *per_thread; 214 253 }; 215 254 … … 367 406 FORMAT(libtrace->format_data)->rxring_offset = 0; 368 407 FORMAT(libtrace->format_data)->max_order = MAX_ORDER; 408 FORMAT(libtrace->format_data)->fanout_flags = PACKET_FANOUT_LB; // This might be best or alternatively PACKET_FANOUT_LB 409 // Some examples use pid for the group however that would limit a single 410 // application to use only int/ring format, instead using rand 411 FORMAT(libtrace->format_data)->fanout_group = (uint16_t) rand(); 412 FORMAT(libtrace->format_data)->per_thread = NULL; 369 413 } 370 414 static int linuxring_init_input(libtrace_t *libtrace) … … 582 626 return 0; 583 627 } 584 static int linuxring_start_input(libtrace_t *libtrace){ 585 586 char error[2048]; 628 629 /** 630 * Converts a socket, either packet_mmap or standard raw socket into a 631 * fanout socket. 632 * NOTE: This means we can read from the socket with multiple queues, 633 * each must be setup (identically) and then this called upon them 634 * 635 * @return 0 success, -1 error 636 */ 637 static inline int socket_to_packet_fanout(int fd, 638 uint16_t fanout_flags, 639 uint16_t fanout_group) { 640 int fanout_opt = ((int)fanout_flags << 16) | (int)fanout_group; 641 if (setsockopt(fd, SOL_PACKET, PACKET_FANOUT, 642 &fanout_opt, sizeof(fanout_opt)) == -1) { 643 return -1; 644 } 645 return 0; 646 } 647 648 static int linuxnative_ppause_input(libtrace_t *libtrace) 649 { 650 int i; 651 int tot = libtrace->mapper_thread_count; 652 printf("CAlling native pause packet\n"); 653 654 for (i = 0; i < tot; i++) { 655 close(FORMAT(libtrace->format_data)->per_thread[i].fd); 656 } 657 658 free(FORMAT(libtrace->format_data)->per_thread); 659 FORMAT(libtrace->format_data)->per_thread = NULL; 660 return 0; 661 } 662 663 static int linuxring_start_input(libtrace_t *libtrace) 664 { 665 char error[2048]; 587 666 588 667 /* We set the socket up the same and then convert it to PACKET_MMAP */ … … 609 688 } 610 689 690 static int linuxnative_pstart_input(libtrace_t *libtrace) { 691 int i = 0; 692 int tot = libtrace->mapper_thread_count; 693 int iserror = 0; 694 // We store this here otherwise it will be leaked if the memory doesn't know 695 struct linux_per_thread_t *per_thread = NULL; 696 697 if (!FORMAT(libtrace->format_data)->per_thread) { 698 per_thread = calloc(tot, sizeof(struct linux_per_thread_t)); 699 FORMAT(libtrace->format_data)->per_thread = per_thread; 700 } else { 701 // Whats going on this might not work 100% 702 // We assume all sockets have been closed ;) 703 printf("Pause and then start called again lets hope that mapper_thread_count hasn't changed\n"); 704 } 705 706 printf("Calling native pstart packet\n"); 707 for (i = 0; i < tot; ++i) 708 { 709 if (FORMAT(libtrace->format_data)->format == TRACE_FORMAT_LINUX_NATIVE) { 710 if (linuxnative_start_input(libtrace) != 0) { 711 iserror = 1; 712 break; 713 } 714 } else { 715 // This must be ring 716 if (linuxring_start_input(libtrace) != 0) { 717 iserror = 1; 718 break; 719 } 720 } 721 if (socket_to_packet_fanout(FORMAT(libtrace->format_data)->fd, FORMAT(libtrace->format_data)->fanout_flags, FORMAT(libtrace->format_data)->fanout_group) != 0) 722 { 723 iserror = 1; 724 // Clean up here to keep consistent with every one else 725 trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Converting the fd to a socket fanout failed"); 726 close(FORMAT(libtrace->format_data)->fd); 727 free(libtrace->format_data); 728 libtrace->format_data = NULL; 729 break; 730 } 731 per_thread[i].fd = FORMAT(libtrace->format_data)->fd; 732 if (FORMAT(libtrace->format_data)->format == TRACE_FORMAT_LINUX_RING) { 733 per_thread[i].rxring_offset = FORMAT(libtrace->format_data)->rxring_offset; 734 per_thread[i].rx_ring = FORMAT(libtrace->format_data)->rx_ring; 735 } 736 } 737 738 // Roll back those that failed - by this point in time the format_data 739 // has been freed 740 if (iserror) { 741 for (i = i - 1; i >= 0; i--) { 742 close(per_thread[i].fd); 743 } 744 free(per_thread); 745 per_thread = NULL; 746 return -1; 747 } 748 749 return 0; 750 } 751 611 752 static int linuxnative_start_output(libtrace_out_t *libtrace) 612 753 { … … 615 756 free(DATAOUT(libtrace)); 616 757 return -1; 617 } 758 } 618 759 619 760 return 0; … … 660 801 return 0; 661 802 } 803 662 804 static int linuxring_pause_input(libtrace_t *libtrace) 663 805 { … … 800 942 #endif /* HAVE_NETPACKET_PACKET_H */ 801 943 944 945 static int linuxnative_pconfig_input(libtrace_t *libtrace, 946 trace_parallel_option_t option, 947 void *data) 948 { 949 switch(option) { 950 case TRACE_OPTION_SET_HASHER: 951 switch (*((enum hasher_types *)data)) { 952 case HASHER_BALANCE: 953 // Do fanout 954 FORMAT(libtrace->format_data)->fanout_flags = PACKET_FANOUT_LB; 955 // Or we could balance to the CPU 956 return 0; 957 case HASHER_BIDIRECTIONAL: 958 case HASHER_UNIDIRECTIONAL: 959 FORMAT(libtrace->format_data)->fanout_flags = PACKET_FANOUT_HASH; 960 return 0; 961 case HASHER_CUSTOM: 962 case HASHER_HARDWARE: 963 return -1; 964 } 965 break; 966 /* Avoid default: so that future options will cause a warning 967 * here to remind us to implement it, or flag it as 968 * unimplementable 969 */ 970 } 971 972 /* Don't set an error - trace_config will try to deal with the 973 * option and will set an error if it fails */ 974 return -1; 975 } 976 977 802 978 static int linuxnative_prepare_packet(libtrace_t *libtrace UNUSED, 803 979 libtrace_packet_t *packet, void *buffer, … … 871 1047 872 1048 #ifdef HAVE_NETPACKET_PACKET_H 873 static int linuxnative_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) 1049 libtrace_thread_t * get_thread_table(libtrace_t *libtrace) ; 1050 inline static int linuxnative_read_packet_fd(libtrace_t *libtrace, libtrace_packet_t *packet, int fd, const int check_queue) 874 1051 { 875 1052 struct libtrace_linuxnative_header *hdr; … … 879 1056 struct cmsghdr *cmsg; 880 1057 int snaplen; 1058 881 1059 uint32_t flags = 0; 882 1060 … … 914 1092 iovec.iov_base = (void*)(packet->buffer+sizeof(*hdr)); 915 1093 iovec.iov_len = snaplen; 916 917 hdr->wirelen = recvmsg(FORMAT(libtrace->format_data)->fd, &msghdr, 0); 918 1094 1095 if (check_queue) { 1096 // Check for a packet - TODO only Linux has MSG_DONTWAIT should use fctl O_NONBLOCK 1097 hdr->wirelen = recvmsg(fd, &msghdr, MSG_DONTWAIT); 1098 if ((unsigned) hdr->wirelen == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) { 1099 // Do message queue check or select 1100 int ret; 1101 fd_set rfds; 1102 FD_ZERO(&rfds); 1103 FD_SET(fd, &rfds); 1104 FD_SET(get_thread_table(libtrace)->messages.pipefd[0], &rfds); 1105 int largestfd = fd > get_thread_table(libtrace)->messages.pipefd[0] ? fd : get_thread_table(libtrace)->messages.pipefd[0]; 1106 1107 do { 1108 ret = select(largestfd+1, &rfds, NULL, NULL, NULL); 1109 if (ret == -1 && errno != EINTR) 1110 perror("Select() failed"); 1111 } 1112 while (ret == -1); 1113 1114 assert (ret == 1 || ret == 2); // No timeout 0 is not an option 1115 1116 if (FD_ISSET(get_thread_table(libtrace)->messages.pipefd[0], &rfds)) { 1117 // Not an error but check the message queue we have something 1118 return -2; 1119 } 1120 // Otherwise we must have a packet 1121 hdr->wirelen = recvmsg(fd, &msghdr, 0); 1122 } 1123 } else { 1124 hdr->wirelen = recvmsg(fd, &msghdr, 0); 1125 } 1126 919 1127 if (hdr->wirelen==~0U) { 920 1128 trace_set_err(libtrace,errno,"recvmsg"); … … 964 1172 if (cmsg == NULL) { 965 1173 struct timeval tv; 966 if (ioctl(FORMAT(libtrace->format_data)->fd, 967 SIOCGSTAMP,&tv)==0) { 1174 if (ioctl(fd, SIOCGSTAMP,&tv)==0) { 968 1175 hdr->tv.tv_sec = tv.tv_sec; 969 1176 hdr->tv.tv_usec = tv.tv_usec; … … 986 1193 } 987 1194 1195 static int linuxnative_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) 1196 { 1197 int fd = FORMAT(libtrace->format_data)->fd; 1198 return linuxnative_read_packet_fd(libtrace, packet, fd, 0); 1199 } 1200 1201 static int linuxnative_pread_packet(libtrace_t *libtrace, libtrace_packet_t *packet) 1202 { 1203 int fd = FORMAT(libtrace->format_data)->per_thread[get_thread_table_num(libtrace)].fd; 1204 printf("Thread number is #%d point %p\n", get_thread_table_num(libtrace), FORMAT(libtrace->format_data)->per_thread); 1205 return linuxnative_read_packet_fd(libtrace, packet, fd, 1); 1206 } 1207 988 1208 #define LIBTRACE_BETWEEN(test,a,b) ((test) >= (a) && (test) < (b)) 989 1209 static int linuxring_get_capture_length(const libtrace_packet_t *packet); … … 992 1212 /* Release a frame back to the kernel or free() if it's a malloc'd buffer 993 1213 */ 994 inline static void ring_release_frame(libtrace_t *libtrace, libtrace_packet_t *packet 1214 inline static void ring_release_frame(libtrace_t *libtrace, libtrace_packet_t *packet){ 995 1215 /* Free the old packet */ 996 1216 if(packet->buffer == NULL) … … 1004 1224 struct linux_format_data_t *ftd = FORMAT(libtrace->format_data); 1005 1225 1006 /* Check it's within our buffer first */ 1007 if(LIBTRACE_BETWEEN((char *) packet->buffer, 1226 /* Check it's within our buffer first - consider the pause resume case it might have already been free'd lets hope we get another buffer */ 1227 // For now let any one free anything 1228 /*if(LIBTRACE_BETWEEN((char *) packet->buffer, 1008 1229 (char *) ftd->rx_ring, 1009 1230 ftd->rx_ring 1010 + ftd->req.tp_block_size * ftd->req.tp_block_nr)){ 1231 + ftd->req.tp_block_size * ftd->req.tp_block_nr)){*/ 1011 1232 TO_TP_HDR(packet->buffer)->tp_status = 0; 1012 1233 packet->buffer = NULL; 1013 } 1014 } 1015 } 1016 1017 static int linuxring_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) { 1234 /*}*/ 1235 } 1236 } 1237 1238 /** 1239 * Free any resources being kept for this packet, Note: libtrace 1240 * will ensure all fields are zeroed correctly. 1241 */ 1242 static void linuxring_fin_packet(libtrace_packet_t *packet) 1243 { 1244 assert(packet->trace); 1245 1246 // Started should always match the existence of the rx_ring 1247 assert(!!FORMAT(packet->trace->format_data)->rx_ring == !!packet->trace->started); 1248 1249 // Our packets are always under our control 1250 assert(packet->buf_control == TRACE_CTRL_EXTERNAL); 1251 1252 if (FORMAT(packet->trace->format_data)->rx_ring) // If we don't have a ring its already been destroyed or paused 1253 ring_release_frame(packet->trace, packet); 1254 else 1255 packet->buffer = NULL; 1256 } 1257 1258 inline static int linuxring_read_packet_fd(libtrace_t *libtrace, libtrace_packet_t *packet, int fd, int *rxring_offset, char *rx_ring, int message) { 1018 1259 1019 1260 struct tpacket2_hdr *header; 1020 struct pollfd pollset;1021 1261 int ret; 1022 1262 … … 1027 1267 1028 1268 /* Fetch the current frame */ 1029 header = GET_CURRENT_BUFFER(libtrace);1269 header = ((void*) rx_ring) + *rxring_offset * FORMAT(libtrace->format_data)->req.tp_frame_size; // GET_CURRENT_BUFFER(libtrace); 1030 1270 assert((((unsigned long) header) & (pagesize - 1)) == 0); 1031 1271 … … 1035 1275 */ 1036 1276 while (!(header->tp_status & TP_STATUS_USER)) { 1037 pollset.fd = FORMAT(libtrace->format_data)->fd; 1038 pollset.events = POLLIN; 1039 pollset.revents = 0; 1040 /* Wait for more data */ 1041 ret = poll(&pollset, 1, -1); 1042 if (ret < 0) { 1043 if (errno != EINTR) 1044 trace_set_err(libtrace,errno,"poll()"); 1045 return -1; 1277 if (message) { 1278 struct pollfd pollset[2]; 1279 pollset[0].fd = fd; 1280 pollset[0].events = POLLIN; 1281 pollset[0].revents = 0; 1282 pollset[1].fd = libtrace_message_queue_get_fd(&get_thread_table(libtrace)->messages); 1283 pollset[1].events = POLLIN; 1284 pollset[1].revents = 0; 1285 /* Wait for more data or a message*/ 1286 ret = poll(pollset, 2, -1); 1287 if (ret < 0) { 1288 if (errno != EINTR) 1289 trace_set_err(libtrace,errno,"poll()"); 1290 return -1; 1291 } 1292 // Check for a message otherwise loop 1293 if (pollset[1].revents) 1294 return -2; 1295 } else { 1296 struct pollfd pollset; 1297 pollset.fd = fd; 1298 pollset.events = POLLIN; 1299 pollset.revents = 0; 1300 1301 /* Wait for more data or a message*/ 1302 ret = poll(&pollset, 1, -1); 1303 if (ret < 0) { 1304 if (errno != EINTR) 1305 trace_set_err(libtrace,errno,"poll()"); 1306 return -1; 1307 } 1046 1308 } 1047 1309 } … … 1050 1312 1051 1313 /* Move to next buffer */ 1052 FORMAT(libtrace->format_data)->rxring_offset++;1053 FORMAT(libtrace->format_data)->rxring_offset %= FORMAT(libtrace->format_data)->req.tp_frame_nr;1314 (*rxring_offset)++; 1315 *rxring_offset %= FORMAT(libtrace->format_data)->req.tp_frame_nr; 1054 1316 1055 1317 /* We just need to get prepare_packet to set all our packet pointers … … 1061 1323 linuxring_get_capture_length(packet); 1062 1324 1325 } 1326 1327 static int linuxring_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) { 1328 int fd = FORMAT(libtrace->format_data)->fd; 1329 int *rxring_offset = &FORMAT(libtrace->format_data)->rxring_offset; 1330 char *rx_ring = FORMAT(libtrace->format_data)->rx_ring; 1331 return linuxring_read_packet_fd(libtrace, packet, fd, rxring_offset, rx_ring, 0); 1332 } 1333 1334 static int linuxring_pread_packet(libtrace_t *libtrace, libtrace_packet_t *packet) { 1335 int tnum = get_thread_table_num(libtrace); 1336 int fd = FORMAT(libtrace->format_data)->per_thread[tnum].fd; 1337 int *rxring_offset = &FORMAT(libtrace->format_data)->per_thread[tnum].rxring_offset; 1338 char *rx_ring = FORMAT(libtrace->format_data)->per_thread[tnum].rx_ring; 1339 printf("Thread number is #%d point %p\n", get_thread_table_num(libtrace), FORMAT(libtrace->format_data)->per_thread); 1340 return linuxring_read_packet_fd(libtrace, packet, fd, rxring_offset, rx_ring, 1); 1063 1341 } 1064 1342 … … 1550 1828 trace_event_device, /* trace_event */ 1551 1829 linuxnative_help, /* help */ 1830 linuxnative_pstart_input, /* pstart_input */ 1831 linuxnative_pread_packet, /* pread_packet */ 1832 linuxnative_ppause_input, /* ppause */ 1833 linuxnative_fin_input, /* p_fin */ 1834 linuxnative_pconfig_input, /* pconfig input */ //int (*pconfig_input)(libtrace_t *libtrace,trace_option_t option,void *value); 1552 1835 NULL 1553 1836 }; … … 1570 1853 linuxring_read_packet, /* read_packet */ 1571 1854 linuxring_prepare_packet, /* prepare_packet */ 1572 NULL, /* fin_packet */1855 linuxring_fin_packet, /* fin_packet */ 1573 1856 linuxring_write_packet, /* write_packet */ 1574 1857 linuxring_get_link_type, /* get_link_type */ … … 1593 1876 linuxring_event, /* trace_event */ 1594 1877 linuxring_help, /* help */ 1878 linuxnative_pstart_input, /* pstart_input */ 1879 linuxring_pread_packet, /* pread_packet */ 1880 linuxnative_ppause_input, /* ppause */ 1881 linuxnative_fin_input, /* p_fin */ 1882 linuxnative_pconfig_input, 1595 1883 NULL 1596 1884 }; … … 1645 1933 trace_event_device, /* trace_event */ 1646 1934 linuxnative_help, /* help */ 1935 NULL, /* pstart_input */ 1936 NULL, /* pread_packet */ 1647 1937 NULL 1648 1938 }; … … 1688 1978 NULL, /* trace_event */ 1689 1979 linuxring_help, /* help */ 1980 NULL, /* pstart_input */ 1981 NULL, /* pread_packet */ 1690 1982 NULL 1691 1983 }; -
lib/format_pcap.c
rf7bcbfb r29bbef0 801 801 trace_event_trace, /* trace_event */ 802 802 pcap_help, /* help */ 803 NULL, /* pstart_input */ 804 NULL, /* pread_packet */ 805 NULL, /* ppause_input */ 806 NULL, /* pfin_input */ 807 NULL, /* pconfig_input */ 803 808 NULL /* next pointer */ 804 809 }; … … 844 849 trace_event_device, /* trace_event */ 845 850 pcapint_help, /* help */ 851 NULL, /* pstart_input */ 852 NULL, /* pread_packet */ 853 NULL, /* ppause_input */ 854 NULL, /* pfin_input */ 855 NULL, /* pconfig_input */ 846 856 NULL /* next pointer */ 847 857 }; -
lib/format_pcapfile.c
rf7bcbfb r29bbef0 774 774 pcapfile_event, /* trace_event */ 775 775 pcapfile_help, /* help */ 776 NULL, /* pstart_input */ 777 NULL, /* pread_packet */ 778 NULL, /* ppause_input */ 779 NULL, /* pfin_input */ 780 NULL, /* pconfig_input */ 776 781 NULL /* next pointer */ 777 782 }; -
lib/format_rt.c
r90e8d92 r29bbef0 858 858 trace_event_rt, /* trace_event */ 859 859 rt_help, /* help */ 860 NULL, /* pstart_input */ 861 NULL, /* pread_packet */ 862 NULL, /* ppause_input */ 863 NULL, /* pfin_input */ 864 NULL, /* pconfig_input */ 860 865 NULL /* next pointer */ 861 866 }; -
lib/format_tsh.c
rc909fad r29bbef0 269 269 trace_event_trace, /* trace_event */ 270 270 tsh_help, /* help */ 271 NULL, /* pstart_input */ 272 NULL, /* pread_packet */ 273 NULL, /* ppause_input */ 274 NULL, /* pfin_input */ 275 NULL, /* pconfig_input */ 271 276 NULL /* next pointer */ 272 277 }; … … 317 322 trace_event_trace, /* trace_event */ 318 323 tsh_help, /* help */ 324 NULL, /* pstart_input */ 325 NULL, /* pread_packet */ 326 NULL, /* ppause_input */ 327 NULL, /* pfin_input */ 328 NULL, /* pconfig_input */ 319 329 NULL /* next pointer */ 320 330 }; -
lib/libtrace.h.in
r74ecbc7 r29bbef0 196 196 typedef struct libtrace_filter_t libtrace_filter_t; 197 197 198 /** Opaque structure holding information about a result */ 199 typedef struct libtrace_result_t libtrace_result_t; 200 201 typedef struct libtrace_message_t libtrace_message_t; 202 203 typedef struct libtrace_thread_t libtrace_thread_t; 204 198 205 /** If the packet has allocated its own memory the buffer_control should be 199 206 * set to TRACE_CTRL_PACKET, so that the memory will be freed when the packet … … 481 488 uint8_t transport_proto; /**< Cached transport protocol */ 482 489 uint32_t l4_remaining; /**< Cached transport remaining */ 490 uint64_t order; /**< Notes the order of this packet in relation to the input */ 491 uint64_t hash; /**< A hash of the packet as supplied by the user */ 492 int error; /**< The error status of pread_packet */ 483 493 } libtrace_packet_t; 484 494 … … 3098 3108 /*@}*/ 3099 3109 3110 typedef void* (*fn_per_pkt)(libtrace_t* trace, libtrace_packet_t *p, libtrace_message_t *m, libtrace_thread_t *thread); 3111 typedef void* (*fn_reducer)(libtrace_t* trace, void* global_blob); 3112 typedef uint64_t (*fn_hasher)(libtrace_packet_t* packet, void *data); 3113 3114 DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob, fn_per_pkt per_pkt, fn_reducer reducer); 3115 DLLEXPORT int trace_pread_packet(libtrace_t *libtrace, libtrace_packet_t **packet); 3116 DLLEXPORT int trace_ppause(libtrace_t *libtrace); 3117 DLLEXPORT int trace_pstop(libtrace_t *libtrace); 3118 DLLEXPORT void trace_join(libtrace_t * trace); 3119 DLLEXPORT inline void print_contention_stats (libtrace_t *libtrace); 3120 DLLEXPORT libtrace_result_t *trace_create_result(); 3121 3122 typedef struct libtrace_result_t { 3123 uint64_t key; 3124 void * value; 3125 } libtrace_result_t; 3126 3127 DLLEXPORT inline void libtrace_result_set_key(libtrace_result_t * result, uint64_t key); 3128 DLLEXPORT inline uint64_t libtrace_result_get_key(libtrace_result_t * result); 3129 DLLEXPORT inline void libtrace_result_set_value(libtrace_result_t * result, void * value); 3130 DLLEXPORT inline void* libtrace_result_get_value(libtrace_result_t * result); 3131 DLLEXPORT inline void libtrace_result_set_key_value(libtrace_result_t * result, uint64_t key, void * value); 3132 DLLEXPORT void trace_destroy_result(libtrace_result_t ** result); 3133 3134 // Ways to access Global and TLS storage that we provide the user 3135 DLLEXPORT void * trace_get_global(libtrace_t *trace); 3136 DLLEXPORT void * trace_set_global(libtrace_t *trace, void * data); 3137 DLLEXPORT void * trace_set_tls(libtrace_thread_t *t, void * data); 3138 DLLEXPORT void * trace_get_tls(libtrace_thread_t *t); 3139 3140 DLLEXPORT void trace_publish_result(libtrace_t *libtrace, uint64_t key, void * value); 3141 typedef struct libtrace_vector libtrace_vector_t; 3142 DLLEXPORT int trace_get_results(libtrace_t *libtrace, libtrace_vector_t * results); 3143 3144 DLLEXPORT libtrace_result_t *trace_create_result(); 3145 DLLEXPORT int libtrace_thread_get_message_count(libtrace_t * libtrace); 3146 DLLEXPORT int libtrace_thread_get_message(libtrace_t * libtrace, libtrace_message_t * message); 3147 DLLEXPORT int libtrace_thread_try_get_message(libtrace_t * libtrace, libtrace_message_t * message); 3148 DLLEXPORT int trace_send_message_to_reducer(libtrace_t * libtrace, libtrace_message_t * message); 3149 DLLEXPORT int trace_finished(libtrace_t * libtrace); 3150 DLLEXPORT uint64_t trace_packet_get_order(libtrace_packet_t * packet); 3151 DLLEXPORT uint64_t trace_packet_get_hash(libtrace_packet_t * packet); 3152 DLLEXPORT void trace_packet_set_order(libtrace_packet_t * packet, uint64_t order); 3153 DLLEXPORT void trace_packet_set_hash(libtrace_packet_t * packet, uint64_t hash); 3154 3155 3156 typedef enum { 3157 /** 3158 * Sets the hasher function, if NULL(default) no hashing is used a 3159 * cores will get packets on a first in first served basis 3160 */ 3161 TRACE_OPTION_SET_HASHER, 3162 3163 /** 3164 * See diagrams, this sets the maximum size of freelist used to 3165 * maintain packets and there memory buffers. 3166 * NOTE setting this to less than recommend could cause deadlock a 3167 * trace that manages its own packets. 3168 * A unblockable error message will be printed. 3169 */ 3170 TRACE_OPTION_SET_PACKET_FREELIST_SIZE, 3171 3172 /** 3173 * See diagrams, this sets the maximum size of buffers used between 3174 * the single hasher thread and the buffer. 3175 * NOTE setting this to less than recommend could cause deadlock a 3176 * trace that manages its own packets. 3177 * A unblockable warning message will be printed to stderr in this case. 3178 */ 3179 TRACE_OPTION_SET_MAPPER_BUFFER_SIZE, 3180 3181 /** 3182 * Libtrace set mapper thread count 3183 */ 3184 TRACE_OPTION_SET_MAPPER_THREAD_COUNT, 3185 3186 /** 3187 * Libtrace should expect sequential keys from the output to count 3188 * up starting numbered from 1, 2, 3 ... 3189 * such as is the case with numbered packets. 3190 * 3191 * ALSO consider - TRACE_OPTIONS_ORDERED_RESULTS suitable for live formats 3192 */ 3193 TRACE_OPTION_SEQUENTIAL, 3194 3195 /** 3196 * Libtrace ordered results, results in each queue are ordered by key 3197 * however my not be sequential, a typically case is packet timestamps 3198 * the reducer will receive packets in order - note threasholds 3199 * will be used such that a empty queue wont break things 3200 */ 3201 TRACE_OPTION_ORDERED, 3202 3203 3204 /** 3205 * When accepting ordered results if a threashold is meet before an 3206 * older result is available from another queue drop that packet 3207 */ 3208 TRACE_DROP_OUT_OF_ORDER, 3209 3210 /** 3211 * If set to true (i.e. 1) the trace starts a dedicated hasher thread 3212 */ 3213 TRACE_OPTION_USE_DEDICATED_HASHER, 3214 TRACE_OPTION_USE_SLIDING_WINDOW_BUFFER, 3215 TRACE_OPTION_TRACETIME 3216 } trace_parallel_option_t; 3217 3218 DLLEXPORT int trace_parallel_config(libtrace_t *libtrace, trace_parallel_option_t option, void *value); 3219 DLLEXPORT libtrace_packet_t* trace_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet); 3220 DLLEXPORT void trace_free_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet); 3221 3100 3222 #ifdef __cplusplus 3101 3223 } /* extern "C" */ -
lib/libtrace_int.h
r8b49230 r29bbef0 165 165 bool waiting; 166 166 }; 167 168 enum thread_types { 169 THREAD_EMPTY, 170 THREAD_HASHER, 171 THREAD_MAPPER, 172 THREAD_REDUCER 173 }; 174 175 enum thread_states { 176 THREAD_RUNNING, 177 THREAD_FINISHING, 178 THREAD_FINISHED, 179 THREAD_PAUSED 180 }; 181 182 enum hasher_types { 183 HASHER_BALANCE, /** Balance load across CPUs best as possible this is basically to say don't care about hash, but this might still might be implemented using a hash or round robin etc.. */ 184 HASHER_BIDIRECTIONAL, /** Use a hash which is uni-directional for TCP flows (IP src dest,TCP port src dest), non TCP 185 Will be sent to the same place, with the exception of UDP which may or may not be sent to separate cores */ 186 HASHER_UNIDIRECTIONAL, /** Use a hash which is uni-directional across TCP flow */ 187 HASHER_CUSTOM, /** Always use the user supplied hasher */ 188 HASHER_HARDWARE, /** Set by the format if the hashing is going to be done in hardware */ 189 }; 190 191 // Reduce expects sequential data 192 #define REDUCE_SEQUENTIAL 0x1 193 // Reduce is working on ordered data 194 #define REDUCE_ORDERED 0x2 195 // Reduce should sort the data 196 #define REDUCE_SORT 0x4 197 // Drop out of order valid with 198 #define REDUCE_DROP_OOO 0x8 199 // Reduce reads all queues with same key 200 #define REDUCE_STEPPING 0x10 201 202 #define MAPPER_USE_SLIDING_WINDOW 0x20 203 204 205 #include "trace_ringbuffer.h" 206 #include "trace_vector.h" 207 #include "libtrace_message_queue.h" 208 #include "deque.h" 209 #include "trace_sliding_window.h" 210 211 /** 212 * Information of this thread 213 */ 214 typedef struct libtrace_thread_t { 215 libtrace_t * trace; 216 void* ret; 217 enum thread_types type; 218 enum thread_states state; 219 void* user_data; // TLS for the user to use 220 pthread_t tid; 221 int map_num; // A number from 0-X that represents this mapper number 222 // in the table, intended to quickly identify this thread 223 // -1 represents NA (such as in the case this is not a mapper thread) 224 libtrace_ringbuffer_t rbuffer; // Input 225 libtrace_vector_t vector; // Output 226 libtrace_queue_t deque; // Real Output type makes more sense 227 libtrace_message_queue_t messages; // Message handling 228 // Temp storage for time sensitive results 229 uint64_t tmp_key; 230 void *tmp_data; 231 pthread_spinlock_t tmp_spinlock; 232 // Set to true once the first packet has been stored 233 bool recorded_first; 234 // For thread safety reason we actually must store this here 235 int64_t tracetime_offset_usec; 236 } libtrace_thread_t; 237 238 /** 239 * Storage to note time value against each. 240 * Used both internally to do trace time playback 241 * and can be used externally to assist applications which need 242 * a trace starting time such as tracertstats. 243 */ 244 struct first_packets { 245 pthread_spinlock_t lock; 246 size_t count; // If == mappers we have all 247 size_t first; // Valid if count != 0 248 struct __packet_storage_magic_type { 249 libtrace_packet_t * packet; 250 struct timeval tv; 251 } * packets; 252 }; 253 167 254 168 255 /** A libtrace input trace … … 187 274 uint64_t filtered_packets; 188 275 /** The filename from the uri for the trace */ 189 char *uridata; 276 char *uridata; 190 277 /** The libtrace IO reader for this trace (if applicable) */ 191 io_t *io; 278 io_t *io; 192 279 /** Error information for the trace */ 193 libtrace_err_t err; 280 libtrace_err_t err; 194 281 /** Boolean flag indicating whether the trace has been started */ 195 bool started; 282 bool started; 283 /** Synchronise writes/reads across this format object and attached threads etc */ 284 pthread_mutex_t libtrace_lock; 285 286 /** Use to control pausing threads and finishing threads etc always used with libtrace_lock */ 287 pthread_cond_t perpkt_cond; 288 /** Set to the number of mapper threads that are finishing (or have finished), or to -1 once all have been joined, 0 implies all are running */ 289 int mappers_finishing; 290 /** A count of mappers that are pausing */ 291 int perpkt_pausing; 292 293 /** For the sliding window hasher implementation */ 294 pthread_rwlock_t window_lock; 295 /** Set once trace_join has been called */ 296 bool joined; 297 /** Set to indicate a mappers queue is full and such the writing mapper cannot proceed */ 298 bool mapper_queue_full; 299 /** Global storage for this trace, shared among all the threads */ 300 void* global_blob; 301 /** Requested size of the pkt buffer (currently only used if using dedicated hasher thread) */ 302 int packet_freelist_size; 303 /** The actual freelist */ 304 libtrace_ringbuffer_t packet_freelist; 305 /** The number of packets that can queue per mapper thread - XXX consider deadlocks with non malloc()'d packets that need to be released */ 306 int mapper_buffer_size; 307 /** The reducer flags */ 308 int reducer_flags; 309 /** Used to track the next expected key */ 310 uint64_t expected_key; 311 /** User defined per_pkt function called when a pkt is ready */ 312 fn_per_pkt per_pkt; 313 /** User defined reducer function entry point XXX not hooked up */ 314 fn_reducer reducer; 315 /** The hasher function */ 316 enum hasher_types hasher_type; 317 /** The hasher function - NULL implies they don't care or balance */ 318 fn_hasher hasher; // If valid using a separate thread 319 void *hasher_data; 320 321 libtrace_thread_t hasher_thread; 322 libtrace_thread_t reducer_thread; 323 int mapper_thread_count; 324 libtrace_thread_t * mapper_threads; // All our mapper threads 325 libtrace_slidingwindow_t sliding_window; 326 sem_t sem; 327 // Used to keep track of the first packet seen on each thread 328 struct first_packets first_packets; 329 int tracetime; 196 330 }; 331 332 inline void libtrace_zero_thread(libtrace_thread_t * t); 333 inline void store_first_packet(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t); 197 334 198 335 /** A libtrace output trace … … 201 338 struct libtrace_out_t { 202 339 /** The capture format for the output trace */ 203 340 struct libtrace_format_t *format; 204 341 /** Pointer to the "global" data for the capture format module */ 205 342 void *format_data; … … 209 346 libtrace_err_t err; 210 347 /** Boolean flag indicating whether the trace has been started */ 211 bool started; 348 bool started; 212 349 }; 213 350 … … 733 870 /** Prints some useful help information to standard output. */ 734 871 void (*help)(void); 735 872 873 /** Starts or unpauses an input trace in parallel mode - note that 874 * this function is often the one that opens the file or device for 875 * reading. 876 * 877 * @param libtrace The input trace to be started or unpaused 878 * @return If successful the number of threads started, 0 indicates 879 * no threads started and this should be done automatically. 880 * Otherwise in event of an error -1 is returned. 881 * 882 */ 883 int (*pstart_input)(libtrace_t *trace); 884 885 /** Read a packet in the new parallel mode 886 * @return same as read_packet, with the addition of return -2 to represent 887 * interrupted due to message waiting. */ 888 int (*pread_packet)(libtrace_t *trace, libtrace_packet_t *packet); 889 890 /** Pause a parallel trace */ 891 int (*ppause_input)(libtrace_t *trace); 892 893 /** Called after all threads have been paused, Finish (close) a parallel trace */ 894 int (*pfin_input)(libtrace_t *trace); 895 896 /** Applies a configuration option to an input trace. 897 * 898 * @param libtrace The input trace to apply the option to 899 * @param option The option that is being configured 900 * @param value A pointer to the value that the option is to be 901 * set to 902 * @return 0 if successful, -1 if the option is unsupported or an error 903 * occurs 904 */ 905 int (*pconfig_input)(libtrace_t *libtrace,trace_parallel_option_t option,void *value); 906 736 907 /** Next pointer, should always be NULL - used by the format module 737 908 * manager. */ -
lib/trace.c
rf7bcbfb r29bbef0 98 98 #include "format_helper.h" 99 99 #include "rt_protocol.h" 100 101 #include <pthread.h> 102 #include <signal.h> 100 103 101 104 #define MAXOPTS 1024 … … 253 256 libtrace->filtered_packets = 0; 254 257 libtrace->accepted_packets = 0; 258 259 /* Parallel inits */ 260 // libtrace->libtrace_lock 261 // libtrace->perpkt_cond; 262 libtrace->perpkt_pausing = 0; 263 libtrace->mapper_queue_full = false; 264 libtrace->mappers_finishing = -1; 265 libtrace->reducer_flags = 0; 266 libtrace->joined = false; 267 libtrace->global_blob = NULL; 268 libtrace->per_pkt = NULL; 269 libtrace->reducer = NULL; 270 libtrace->hasher = NULL; 271 libtrace->packet_freelist_size = 0; 272 libtrace->mapper_buffer_size = 0; 273 libtrace->expected_key = 0; 274 libtrace_zero_ringbuffer(&libtrace->packet_freelist); 275 libtrace_zero_thread(&libtrace->hasher_thread); 276 libtrace_zero_thread(&libtrace->reducer_thread); 277 libtrace_zero_slidingwindow(&libtrace->sliding_window); 278 libtrace->reducer_thread.type = THREAD_EMPTY; 279 libtrace->mapper_thread_count = 0; 280 libtrace->mapper_threads = NULL; 255 281 256 282 /* Parse the URI to determine what sort of trace we are dealing with */ … … 348 374 libtrace->io = NULL; 349 375 libtrace->filtered_packets = 0; 376 377 /* Parallel inits */ 378 // libtrace->libtrace_lock 379 // libtrace->perpkt_cond; 380 libtrace->perpkt_pausing = 0; 381 libtrace->mapper_queue_full = false; 382 libtrace->mappers_finishing = -1; 383 libtrace->reducer_flags = 0; 384 libtrace->joined = false; 385 libtrace->global_blob = NULL; 386 libtrace->per_pkt = NULL; 387 libtrace->reducer = NULL; 388 libtrace->hasher = NULL; 389 libtrace->expected_key = 0; 390 libtrace->packet_freelist_size = 0; 391 libtrace->mapper_buffer_size = 0; 392 libtrace_zero_ringbuffer(&libtrace->packet_freelist); 393 libtrace_zero_thread(&libtrace->hasher_thread); 394 libtrace_zero_thread(&libtrace->reducer_thread); 395 libtrace_zero_slidingwindow(&libtrace->sliding_window); 396 libtrace->reducer_thread.type = THREAD_EMPTY; 397 libtrace->mapper_thread_count = 0; 398 libtrace->mapper_threads = NULL; 350 399 351 400 for(tmp=formats_list;tmp;tmp=tmp->next) { … … 583 632 */ 584 633 DLLEXPORT void trace_destroy(libtrace_t *libtrace) { 634 int i; 585 635 assert(libtrace); 586 636 if (libtrace->format) { … … 590 640 libtrace->format->fin_input(libtrace); 591 641 } 592 593 642 /* Need to free things! */ 643 if (libtrace->uridata) 594 644 free(libtrace->uridata); 645 646 /* Empty any packet memory */ 647 648 libtrace_packet_t * packet; 649 while (libtrace_ringbuffer_try_read(&libtrace->packet_freelist,(void **) &packet)) 650 trace_destroy_packet(packet); 651 652 libtrace_ringbuffer_destroy(&libtrace->packet_freelist); 653 654 for (i = 0; i < libtrace->mapper_thread_count; ++i) { 655 assert (libtrace_vector_get_size(&libtrace->mapper_threads[i].vector) == 0); 656 libtrace_vector_destroy(&libtrace->mapper_threads[i].vector); 657 } 658 free(libtrace->mapper_threads); 659 libtrace->mapper_threads = NULL; 660 libtrace->mapper_thread_count = 0; 661 595 662 if (libtrace->event.packet) { 596 663 /* Don't use trace_destroy_packet here - there is almost … … 661 728 dest->type=packet->type; 662 729 dest->buf_control=TRACE_CTRL_PACKET; 730 dest->order = packet->order; 663 731 /* Reset the cache - better to recalculate than try to convert 664 732 * the values over to the new packet */ … … 675 743 */ 676 744 DLLEXPORT void trace_destroy_packet(libtrace_packet_t *packet) { 745 /* Free any resources possibly associated with the packet */ 746 if (packet->trace && packet->trace->format->fin_packet) { 747 packet->trace->format->fin_packet(packet); 748 } 749 677 750 if (packet->buf_control == TRACE_CTRL_PACKET && packet->buffer) { 678 751 free(packet->buffer); … … 685 758 } 686 759 760 /** 761 * Removes any possible data stored againt the trace and releases any data. 762 * This will not destroy a reusable good malloc'd buffer (TRACE_CTRL_PACKET) 763 * use trace_destroy_packet() for those diabolical purposes. 764 */ 765 void trace_fin_packet(libtrace_packet_t *packet); 766 void trace_fin_packet(libtrace_packet_t *packet) { 767 if (packet) 768 { 769 if (packet->trace && packet->trace->format->fin_packet) { 770 packet->trace->format->fin_packet(packet); 771 //gettimeofday(&tv, NULL); 772 //printf ("%d.%06d DESTROYED #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet)); 773 } 774 775 // No matter what we remove the header and link pointers 776 packet->trace = NULL; 777 packet->header = NULL; 778 packet->payload = NULL; 779 780 if (packet->buf_control != TRACE_CTRL_PACKET) 781 { 782 //packet->buf_control = 0; // Invalid value this should be fixed 783 packet->buffer = NULL; 784 } 785 786 packet->trace = NULL; 787 packet->hash = 0; 788 packet->order = 0; 789 trace_clear_cache(packet); 790 } 791 } 792 687 793 /* Read one packet from the trace into buffer. Note that this function will 688 794 * block until a packet is read (or EOF is reached). … … 707 813 } 708 814 assert(packet); 709 710 /* Store the trace we are reading from into the packet opaque711 * structure */712 packet->trace = libtrace;713 714 /* Finalise the packet, freeing any resources the format module715 * may have allocated it716 */717 if (libtrace->format->fin_packet) {718 libtrace->format->fin_packet(packet);719 }720 721 815 722 816 if (libtrace->format->read_packet) { 723 817 do { 724 818 size_t ret; 725 /* Clear the packet cache */ 726 trace_clear_cache(packet); 819 /* Finalise the packet, freeing any resources the format module 820 * may have allocated it and zeroing all data associated with it. 821 */ 822 trace_fin_packet(packet); 823 /* Store the trace we are reading from into the packet opaque 824 * structure */ 825 packet->trace = libtrace; 727 826 ret=libtrace->format->read_packet(libtrace,packet); 728 827 if (ret==(size_t)-1 || ret==0) { … … 743 842 libtrace->snaplen); 744 843 } 844 trace_packet_set_order(packet, libtrace->accepted_packets); 745 845 ++libtrace->accepted_packets; 746 846 return ret; … … 946 1046 } 947 1047 948 1048 return tv; 949 1049 } 950 1050 … … 1199 1299 libtrace_linktype_t linktype ) { 1200 1300 #ifdef HAVE_BPF_FILTER 1301 /* It just so happens that the underlying libs used by pthread arn't 1302 * thread safe, namely lex/flex thingys, so single threaded compile 1303 * multi threaded running should be safe. 1304 */ 1305 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; 1201 1306 assert(filter); 1202 1307 … … 1220 1325 "Unknown pcap equivalent linktype"); 1221 1326 return -1; 1327 } 1328 assert (pthread_mutex_lock(&mutex) == 0); 1329 /* Make sure not one bet us to this */ 1330 if (filter->flag) { 1331 printf("Someone bet us to compile the filter\n"); 1332 assert (pthread_mutex_unlock(&mutex) == 0); 1333 return 1; 1222 1334 } 1223 1335 pcap=(pcap_t *)pcap_open_dead( … … 1233 1345 pcap_geterr(pcap)); 1234 1346 pcap_close(pcap); 1347 assert (pthread_mutex_unlock(&mutex) == 0); 1235 1348 return -1; 1236 1349 } 1237 1350 pcap_close(pcap); 1238 1351 filter->flag=1; 1352 assert (pthread_mutex_unlock(&mutex) == 0); 1239 1353 } 1240 1354 return 0; … … 1256 1370 libtrace_linktype_t linktype; 1257 1371 libtrace_packet_t *packet_copy = (libtrace_packet_t*)packet; 1372 #ifdef HAVE_LLVM 1373 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; 1374 #endif 1258 1375 1259 1376 assert(filter); … … 1306 1423 * what the link type was 1307 1424 */ 1425 // Note internal mutex locking used here 1308 1426 if (trace_bpf_compile(filter,packet_copy,linkptr,linktype)==-1) { 1309 1427 if (free_packet_needed) { … … 1316 1434 #if HAVE_LLVM 1317 1435 if (!filter->jitfilter) { 1318 filter->jitfilter = compile_program(filter->filter.bf_insns, filter->filter.bf_len); 1436 assert(pthread_mutex_lock(&mutex) == 0); 1437 /* Again double check here like the bpf filter */ 1438 if(filter->jitfilter) 1439 printf("Someone bet us to compile the JIT thingy\n"); 1440 else 1441 /* Looking at compile_program source this appears to be thread safe 1442 * however if this gets called twice we will leak this memory :( 1443 * as such lock here anyways */ 1444 filter->jitfilter = compile_program(filter->filter.bf_insns, filter->filter.bf_len); 1445 assert(pthread_mutex_unlock(&mutex) == 0); 1319 1446 } 1320 1447 #endif -
tools/traceanon/Makefile.am
rc0a5a50 r29bbef0 1 bin_PROGRAMS = traceanon 1 bin_PROGRAMS = traceanon traceanon_parallel 2 2 3 3 man_MANS = traceanon.1 … … 6 6 include ../Makefile.tools 7 7 traceanon_SOURCES = traceanon.c rijndael.h rijndael.c panon.h panon.c ipenc.c ipenc.h 8 traceanon_parallel_SOURCES = traceanon_parallel.c rijndael.h rijndael.c panon.h panon.c ipenc.c ipenc.h 8 9 9 10 # rijndael.c does lots of nasty casting that is going to be a nightmare to fix … … 11 12 # messy and hopefully isn't actually an issue. 12 13 traceanon_CFLAGS = $(AM_CFLAGS) 14 traceanon_parallel_CFLAGS = $(AM_CFLAGS) -
tools/traceanon/panon.c
ra3041a4 r29bbef0 8 8 #include "panon.h" 9 9 10 static uint8_t m_key[16];11 static uint8_t m_pad[16];10 static __thread uint8_t m_key[16]; 11 static __thread uint8_t m_pad[16]; 12 12 13 13 #define CACHEBITS 20 … … 16 16 //static uint32_t enc_cache[CACHESIZE]; 17 17 18 static uint32_t *enc_cache = 0;19 static uint32_t fullcache[2][2];18 static __thread uint32_t *enc_cache = 0; // Should be ok shared across multiple 19 static __thread uint32_t fullcache[2][2]; // Needs to be against on thread 20 20 21 21 -
tools/tracertstats/Makefile.am
r530bcf0 r29bbef0 1 bin_PROGRAMS = tracertstats 1 bin_PROGRAMS = tracertstats tracertstats_parallel 2 2 man_MANS = tracertstats.1 3 3 EXTRA_DIST = $(man_MANS) … … 16 16 tracertstats_SOURCES = tracertstats.c output.h output.c $(OUTPUT_MODULES) 17 17 tracertstats_LDADD = -ltrace $(OUTPUT_PNG_LD) 18 tracertstats_parallel_SOURCES = tracertstats_parallel.c output.h output.c $(OUTPUT_MODULES) 19 tracertstats_parallel_LDADD = -ltrace $(OUTPUT_PNG_LD) -
tools/tracestats/Makefile.am
r3b8a5ef r29bbef0 1 bin_PROGRAMS = tracestats 1 bin_PROGRAMS = tracestats tracestats_parallel 2 2 bin_SCRIPTS = tracesummary 3 3 … … 7 7 include ../Makefile.tools 8 8 tracestats_SOURCES = tracestats.c 9 tracestats_parallel_SOURCES = tracestats_parallel.c
Note: See TracChangeset
for help on using the changeset viewer.