- Timestamp:
- 02/13/15 09:47:42 (6 years ago)
- Branches:
- 4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
- Children:
- 4631115
- Parents:
- 1b59edf (diff), 6cf3ca0 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent. - Location:
- lib
- Files:
-
- 6 added
- 1 deleted
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
lib/Makefile.am
r9e429e8 r6cf3ca0 6 6 7 7 extra_DIST = format_template.c 8 NATIVEFORMATS=format_linux .c8 NATIVEFORMATS=format_linux_common.c format_linux_ring.c format_linux_int.c format_linux_common.h 9 9 BPFFORMATS=format_bpf.c 10 10 … … 21 21 22 22 if HAVE_LLVM 23 BPFJITSOURCE=bpf-jit/bpf-jit.cc 23 BPFJITSOURCE=bpf-jit/bpf-jit.cc 24 24 else 25 25 BPFJITSOURCE= … … 29 29 NATIVEFORMATS+= format_dpdk.c 30 30 # So we also make libtrace.mk in dpdk otherwise automake tries to expand 31 # it to early which I cannot seem to stop unless we use a path that31 # it too early which I cannot seem to stop unless we use a path that 32 32 # doesn't exist currently 33 33 export RTE_SDK=@RTE_SDK@ … … 57 57 $(BPFJITSOURCE) \ 58 58 libtrace_arphrd.h \ 59 data-struct/ring_buffer.c data-struct/vector.c data-struct/message_queue.c \ 60 data-struct/deque.c data-struct/sliding_window.c data-struct/object_cache.c \ 61 hash_toeplitz.c combiner_ordered.c combiner_sorted.c combiner_unordered.c 59 data-struct/ring_buffer.c data-struct/vector.c \ 60 data-struct/message_queue.c data-struct/deque.c \ 61 data-struct/sliding_window.c data-struct/object_cache.c \ 62 data-struct/linked_list.c hash_toeplitz.c combiner_ordered.c \ 63 combiner_sorted.c combiner_unordered.c 62 64 63 65 if DAG2_4 -
lib/format_dag25.c
r18bf317 rcb39d35 79 79 */ 80 80 81 82 81 #define DATA(x) ((struct dag_format_data_t *)x->format_data) 83 82 #define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data) 84 #define PERPKT_DATA(x) ((struct dag_per_thread_t *)(x->format_data))83 #define STREAM_DATA(x) ((struct dag_per_stream_t *)x->data) 85 84 86 85 #define FORMAT_DATA DATA(libtrace) … … 88 87 89 88 #define DUCK FORMAT_DATA->duck 89 90 #define FORMAT_DATA_HEAD FORMAT_DATA->per_stream->head 91 #define FORMAT_DATA_FIRST ((struct dag_per_stream_t *)FORMAT_DATA_HEAD->data) 92 90 93 static struct libtrace_format_t dag; 91 94 … … 117 120 }; 118 121 119 /* Data that is stored for each libtrace_thread_t*/120 struct dag_per_ thread_t {122 /* Data that is stored against each input stream */ 123 struct dag_per_stream_t { 121 124 /* DAG device */ 122 125 struct dag_dev_t *device; 123 /* Stream number */124 uint16_t stream;126 /* DAG stream number */ 127 uint16_t dagstream; 125 128 /* Pointer to the last unread byte in the DAG memory */ 126 129 uint8_t *top; … … 129 132 /* Amount of data processed from the bottom pointer */ 130 133 uint32_t processed; 131 /* Number of packets seen by the thread*/134 /* Number of packets seen by the stream */ 132 135 uint64_t pkt_count; 133 /* Drop count for this particular thread*/136 /* Drop count for this particular stream */ 134 137 uint64_t drops; 138 /* Boolean values to indicate if a particular interface has been seen 139 * or not. This is limited to four interfaces, which is enough to 140 * support all current DAG cards */ 141 uint8_t seeninterface[4]; 135 142 }; 136 143 … … 138 145 struct dag_format_data_t { 139 146 /* Data required for regular DUCK reporting */ 147 /* TODO: This doesn't work with the 10X2S card! I don't know how 148 * DUCK stuff works and don't know how to fix it */ 140 149 struct { 141 150 /* Timestamp of the last DUCK report */ … … 150 159 } duck; 151 160 152 /* The DAG device that we are reading from */ 153 struct dag_dev_t *device; 154 /* The DAG stream that we are reading from */ 155 unsigned int dagstream; 156 /* Boolean flag indicating whether the stream is currently attached */ 161 /* Boolean flag indicating whether the trace is currently attached */ 157 162 int stream_attached; 158 /* Pointer to the first unread byte in the DAG memory hole */ 159 uint8_t *bottom; 160 /* Pointer to the last unread byte in the DAG memory hole */ 161 uint8_t *top; 162 /* The amount of data processed thus far from the bottom pointer */ 163 uint32_t processed; 164 /* The number of packets that have been dropped */ 165 uint64_t drops; 166 /* When running in parallel mode this is malloc'd with an 167 * array of thread structures. Most of the stuff above doesn't 168 * get used in parallel mode. */ 169 struct dag_per_thread_t *per_thread; 170 171 uint8_t seeninterface[4]; 163 164 /* Data stored against each DAG input stream */ 165 libtrace_list_t *per_stream; 172 166 }; 173 167 … … 240 234 static void dag_init_format_data(libtrace_t *libtrace) 241 235 { 236 struct dag_per_stream_t stream_data; 237 242 238 libtrace->format_data = (struct dag_format_data_t *) 243 239 malloc(sizeof(struct dag_format_data_t)); … … 246 242 DUCK.last_pkt = 0; 247 243 DUCK.dummy_duck = NULL; 248 FORMAT_DATA->stream_attached = 0; 249 FORMAT_DATA-> drops = 0;250 FORMAT_DATA->device = NULL;251 FORMAT_DATA->dagstream = 0;252 FORMAT_DATA->processed = 0; 253 FORMAT_DATA->bottom = NULL;254 FORMAT_DATA->top = NULL;255 memset( FORMAT_DATA->seeninterface, 0,256 sizeof(FORMAT_DATA->seeninterface));244 245 FORMAT_DATA->per_stream = 246 libtrace_list_init(sizeof(stream_data)); 247 assert(FORMAT_DATA->per_stream != NULL); 248 249 /* We'll start with just one instance of stream_data, and we'll 250 * add more later if we need them */ 251 memset(&stream_data, 0, sizeof(stream_data)); 252 libtrace_list_push_back(FORMAT_DATA->per_stream, &stream_data); 257 253 } 258 254 … … 480 476 char *dag_dev_name = NULL; 481 477 char *scan = NULL; 482 int stream = 0 , thread_count = 1;478 int stream = 0; 483 479 struct dag_dev_t *dag_device = NULL; 484 480 … … 505 501 } 506 502 507 FORMAT_DATA ->dagstream = stream;503 FORMAT_DATA_FIRST->dagstream = stream; 508 504 509 505 /* See if our DAG device is already open */ … … 528 524 } 529 525 530 FORMAT_DATA ->device = dag_device;526 FORMAT_DATA_FIRST->device = dag_device; 531 527 532 528 /* See Config_Status_API_Programming_Guide.pdf from the Endace … … 564 560 /* Tell the card our new snap length */ 565 561 snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data); 566 if (dag_configure(FORMAT_DATA ->device->fd,562 if (dag_configure(FORMAT_DATA_FIRST->device->fd, 567 563 conf_str) != 0) { 568 564 trace_set_err(libtrace, errno, "Failed to configure " … … 622 618 struct timeval zero, nopoll; 623 619 uint8_t *top, *bottom, *starttop; 624 uint64_t diff = 0;625 620 top = bottom = NULL; 626 621 … … 630 625 631 626 /* Attach and start the DAG stream */ 632 if (dag_attach_stream(FORMAT_DATA ->device->fd,633 FORMAT_DATA ->dagstream, 0, 0) < 0) {627 if (dag_attach_stream(FORMAT_DATA_FIRST->device->fd, 628 FORMAT_DATA_FIRST->dagstream, 0, 0) < 0) { 634 629 trace_set_err(libtrace, errno, "Cannot attach DAG stream"); 635 630 return -1; 636 631 } 637 632 638 if (dag_start_stream(FORMAT_DATA ->device->fd,639 FORMAT_DATA ->dagstream) < 0) {633 if (dag_start_stream(FORMAT_DATA_FIRST->device->fd, 634 FORMAT_DATA_FIRST->dagstream) < 0) { 640 635 trace_set_err(libtrace, errno, "Cannot start DAG stream"); 641 636 return -1; … … 644 639 645 640 /* We don't want the dag card to do any sleeping */ 646 dag_set_stream_poll(FORMAT_DATA ->device->fd,647 FORMAT_DATA ->dagstream, 0, &zero,641 dag_set_stream_poll(FORMAT_DATA_FIRST->device->fd, 642 FORMAT_DATA_FIRST->dagstream, 0, &zero, 648 643 &nopoll); 649 644 650 starttop = dag_advance_stream(FORMAT_DATA ->device->fd,651 FORMAT_DATA ->dagstream,645 starttop = dag_advance_stream(FORMAT_DATA_FIRST->device->fd, 646 FORMAT_DATA_FIRST->dagstream, 652 647 &bottom); 653 648 … … 656 651 while (starttop - bottom > 0) { 657 652 bottom += (starttop - bottom); 658 top = dag_advance_stream(FORMAT_DATA ->device->fd,659 FORMAT_DATA ->dagstream,653 top = dag_advance_stream(FORMAT_DATA_FIRST->device->fd, 654 FORMAT_DATA_FIRST->dagstream, 660 655 &bottom); 661 656 } 662 FORMAT_DATA ->top = top;663 FORMAT_DATA ->bottom = bottom;664 FORMAT_DATA ->processed = 0;665 FORMAT_DATA ->drops = 0;657 FORMAT_DATA_FIRST->top = top; 658 FORMAT_DATA_FIRST->bottom = bottom; 659 FORMAT_DATA_FIRST->processed = 0; 660 FORMAT_DATA_FIRST->drops = 0; 666 661 667 662 return 0; 663 } 664 665 static int dag_pstart_input(libtrace_t *libtrace) 666 { 667 char *scan, *tok; 668 uint16_t stream_count = 0, max_streams; 669 int iserror = 0; 670 struct dag_per_stream_t stream_data; 671 672 /* Check we aren't trying to create more threads than the DAG card can 673 * handle */ 674 max_streams = dag_rx_get_stream_count(FORMAT_DATA_FIRST->device->fd); 675 if (libtrace->perpkt_thread_count > max_streams) { 676 trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT, 677 "trying to create too many threads (max is %u)", 678 max_streams); 679 iserror = 1; 680 goto cleanup; 681 } 682 683 /* Get the stream names from the uri */ 684 if ((scan = strchr(libtrace->uridata, ',')) == NULL) { 685 trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT, 686 "format uri doesn't specify the DAG streams"); 687 iserror = 1; 688 goto cleanup; 689 } 690 691 scan++; 692 693 tok = strtok(scan, ","); 694 while (tok != NULL) { 695 /* Ensure we haven't specified too many streams */ 696 if (stream_count >= libtrace->perpkt_thread_count) { 697 trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT, 698 "format uri specifies too many streams. " 699 "Max is %u", max_streams); 700 iserror = 1; 701 goto cleanup; 702 } 703 704 /* Save the stream details */ 705 if (stream_count == 0) { 706 /* Special case where we update the existing stream 707 * data structure */ 708 FORMAT_DATA_FIRST->dagstream = (uint16_t)atoi(tok); 709 } else { 710 memset(&stream_data, 0, sizeof(stream_data)); 711 stream_data.device = FORMAT_DATA_FIRST->device; 712 stream_data.dagstream = (uint16_t)atoi(tok); 713 libtrace_list_push_back(FORMAT_DATA->per_stream, 714 &stream_data); 715 } 716 717 stream_count++; 718 tok = strtok(NULL, ","); 719 } 720 721 FORMAT_DATA->stream_attached = 1; 722 723 cleanup: 724 if (iserror) { 725 return -1; 726 } else { 727 return 0; 728 } 668 729 } 669 730 … … 690 751 static int dag_pause_input(libtrace_t *libtrace) 691 752 { 692 /* Stop and detach the stream */ 693 if (dag_stop_stream(FORMAT_DATA->device->fd, 694 FORMAT_DATA->dagstream) < 0) { 695 trace_set_err(libtrace, errno, "Could not stop DAG stream"); 696 return -1; 697 } 698 if (dag_detach_stream(FORMAT_DATA->device->fd, 699 FORMAT_DATA->dagstream) < 0) { 700 trace_set_err(libtrace, errno, "Could not detach DAG stream"); 701 return -1; 702 } 753 libtrace_list_node_t *tmp = FORMAT_DATA_HEAD; 754 755 /* Stop and detach each stream */ 756 while (tmp != NULL) { 757 if (dag_stop_stream(STREAM_DATA(tmp)->device->fd, 758 STREAM_DATA(tmp)->dagstream) < 0) { 759 trace_set_err(libtrace, errno, 760 "Could not stop DAG stream"); 761 printf("Count not stop DAG stream\n"); 762 return -1; 763 } 764 if (dag_detach_stream(STREAM_DATA(tmp)->device->fd, 765 STREAM_DATA(tmp)->dagstream) < 0) { 766 trace_set_err(libtrace, errno, 767 "Could not detach DAG stream"); 768 printf("Count not detach DAG stream\n"); 769 return -1; 770 } 771 772 tmp = tmp->next; 773 } 774 703 775 FORMAT_DATA->stream_attached = 0; 704 776 return 0; 705 777 } 706 778 779 780 707 781 /* Closes a DAG input trace */ 708 782 static int dag_fin_input(libtrace_t *libtrace) 709 783 { 784 libtrace_list_node_t *tmp = FORMAT_DATA_HEAD; 785 710 786 /* Need the lock, since we're going to be handling the device list */ 711 787 pthread_mutex_lock(&open_dag_mutex); … … 714 790 if (FORMAT_DATA->stream_attached) 715 791 dag_pause_input(libtrace); 716 FORMAT_DATA->device->ref_count --; 717 718 /* Close the DAG device if there are no more references to it */ 719 if (FORMAT_DATA->device->ref_count == 0) 720 dag_close_device(FORMAT_DATA->device); 792 793 /* Close any dag devices that have no more references */ 794 while (tmp != NULL) { 795 STREAM_DATA(tmp)->device->ref_count--; 796 if (STREAM_DATA(tmp)->device->ref_count == 0) 797 dag_close_device(STREAM_DATA(tmp)->device); 798 799 tmp = tmp->next; 800 } 801 721 802 if (DUCK.dummy_duck) 722 803 trace_destroy_dead(DUCK.dummy_duck); 804 805 /* Clear the list */ 806 libtrace_list_deinit(FORMAT_DATA->per_stream); 807 723 808 free(libtrace->format_data); 724 809 pthread_mutex_unlock(&open_dag_mutex); … … 761 846 } 762 847 848 /* DUCK reporting is broken at the moment! */ 849 #if 0 763 850 /* Extracts DUCK information from the DAG card and produces a DUCK packet */ 764 851 static int dag_get_duckinfo(libtrace_t *libtrace, libtrace_packet_t *packet) … … 797 884 packet->trace = DUCK.dummy_duck; 798 885 return sizeof(duckinf_t); 799 } 886 887 return 0; 888 } 889 #endif 800 890 801 891 /* Determines the amount of data available to read from the DAG card */ 802 static int dag_available(libtrace_t *libtrace) 803 { 804 uint32_t diff = FORMAT_DATA->top - FORMAT_DATA->bottom; 892 static int dag_available(libtrace_t *libtrace, 893 struct dag_per_stream_t *stream_data) 894 { 895 uint32_t diff = stream_data->top - stream_data->bottom; 805 896 806 897 /* If we've processed more than 4MB of data since we last called 807 898 * dag_advance_stream, then we should call it again to allow the 808 899 * space occupied by that 4MB to be released */ 809 if (diff >= dag_record_size && FORMAT_DATA->processed < 4 * 1024 * 1024)900 if (diff >= dag_record_size && stream_data->processed < 4 * 1024 * 1024) 810 901 return diff; 811 902 812 903 /* Update the top and bottom pointers */ 813 FORMAT_DATA->top = dag_advance_stream(FORMAT_DATA->device->fd,814 FORMAT_DATA->dagstream,815 &( FORMAT_DATA->bottom));816 817 if ( FORMAT_DATA->top == NULL) {904 stream_data->top = dag_advance_stream(stream_data->device->fd, 905 stream_data->dagstream, 906 &(stream_data->bottom)); 907 908 if (stream_data->top == NULL) { 818 909 trace_set_err(libtrace, errno, "dag_advance_stream failed!"); 819 910 return -1; 820 911 } 821 FORMAT_DATA->processed = 0;822 diff = FORMAT_DATA->top - FORMAT_DATA->bottom;912 stream_data->processed = 0; 913 diff = stream_data->top - stream_data->bottom; 823 914 return diff; 824 915 } 825 916 826 917 /* Returns a pointer to the start of the next complete ERF record */ 827 static dag_record_t *dag_get_record( libtrace_t *libtrace)918 static dag_record_t *dag_get_record(struct dag_per_stream_t *stream_data) 828 919 { 829 920 dag_record_t *erfptr = NULL; 830 921 uint16_t size; 831 922 832 erfptr = (dag_record_t *) FORMAT_DATA->bottom;923 erfptr = (dag_record_t *)stream_data->bottom; 833 924 if (!erfptr) 834 925 return NULL; … … 838 929 839 930 /* Make certain we have the full packet available */ 840 if (size > ( FORMAT_DATA->top - FORMAT_DATA->bottom))931 if (size > (stream_data->top - stream_data->bottom)) 841 932 return NULL; 842 933 843 FORMAT_DATA->bottom += size;844 FORMAT_DATA->processed += size;934 stream_data->bottom += size; 935 stream_data->processed += size; 845 936 return erfptr; 846 937 } … … 848 939 /* Converts a buffer containing a recently read DAG packet record into a 849 940 * libtrace packet */ 850 static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet, 851 void *buffer, libtrace_rt_types_t rt_type, 852 uint32_t flags) 941 static int dag_prepare_packet_real(libtrace_t *libtrace, 942 struct dag_per_stream_t *stream_data, 943 libtrace_packet_t *packet, 944 void *buffer, libtrace_rt_types_t rt_type, 945 uint32_t flags) 853 946 { 854 947 dag_record_t *erfptr; 855 libtrace_thread_t *t;856 948 857 949 /* If the packet previously owned a buffer that is not the buffer … … 859 951 * old one to avoid memory leaks */ 860 952 if (packet->buffer != buffer && 861 953 packet->buf_control == TRACE_CTRL_PACKET) { 862 954 free(packet->buffer); 863 955 } … … 896 988 } else { 897 989 /* Use the ERF loss counter */ 898 if (DATA(libtrace)->per_thread) { 899 t = get_thread_table(libtrace); 900 PERPKT_DATA(t)->drops += ntohs(erfptr->lctr); 990 if (stream_data->seeninterface[erfptr->flags.iface] 991 == 0) { 992 stream_data->seeninterface[erfptr->flags.iface] 993 = 1; 901 994 } else { 902 if (FORMAT_DATA->seeninterface[erfptr->flags.iface] 903 == 0) { 904 FORMAT_DATA->seeninterface[erfptr->flags.iface] 905 = 1; 906 } else { 907 FORMAT_DATA->drops += ntohs(erfptr->lctr); 908 } 909 } 910 } 995 stream_data->drops += ntohs(erfptr->lctr); 996 } 997 } 998 999 packet->error = 1; 911 1000 912 1001 return 0; 1002 } 1003 1004 static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet, 1005 void *buffer, libtrace_rt_types_t rt_type, 1006 uint32_t flags) 1007 { 1008 return dag_prepare_packet_real(libtrace, FORMAT_DATA_FIRST, packet, 1009 buffer, rt_type, flags); 913 1010 } 914 1011 … … 1090 1187 * If DUCK reporting is enabled, the packet returned may be a DUCK update 1091 1188 */ 1092 static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) 1093 { 1094 int size = 0; 1095 struct timeval tv; 1189 static int dag_read_packet_real(libtrace_t *libtrace, 1190 struct dag_per_stream_t *stream_data, 1191 libtrace_thread_t *t, /* Optional */ 1192 libtrace_packet_t *packet) 1193 { 1096 1194 dag_record_t *erfptr = NULL; 1097 1195 int numbytes = 0; 1098 1196 uint32_t flags = 0; 1099 struct timeval maxwait; 1100 struct timeval pollwait; 1197 struct timeval maxwait, pollwait; 1101 1198 1102 1199 pollwait.tv_sec = 0; … … 1105 1202 maxwait.tv_usec = 250000; 1106 1203 1107 /* Check if we're due for a DUCK report */ 1108 if (DUCK.last_pkt - DUCK.last_duck > DUCK.duck_freq && 1109 DUCK.duck_freq != 0) { 1110 size = dag_get_duckinfo(libtrace, packet); 1111 DUCK.last_duck = DUCK.last_pkt; 1112 if (size != 0) { 1113 return size; 1114 } 1115 /* No DUCK support, so don't waste our time anymore */ 1116 DUCK.duck_freq = 0; 1117 } 1204 /* TODO: Support DUCK reporting */ 1118 1205 1119 1206 /* Don't let anyone try to free our DAG memory hole! */ … … 1127 1214 } 1128 1215 1129 if (dag_set_stream_poll( FORMAT_DATA->device->fd, FORMAT_DATA->dagstream,1216 if (dag_set_stream_poll(stream_data->device->fd, stream_data->dagstream, 1130 1217 sizeof(dag_record_t), &maxwait, 1131 1218 &pollwait) == -1) { … … 1134 1221 } 1135 1222 1136 1137 1223 /* Grab a full ERF record */ 1138 1224 do { 1139 numbytes = dag_available(libtrace );1225 numbytes = dag_available(libtrace, stream_data); 1140 1226 if (numbytes < 0) 1141 1227 return numbytes; 1142 1228 if (numbytes < dag_record_size) { 1229 /* Check the message queue if we have one to check */ 1230 if (t != NULL && 1231 libtrace_message_queue_count(&t->messages) > 0) 1232 return -2; 1233 1143 1234 if (libtrace_halt) 1144 1235 return 0; … … 1146 1237 continue; 1147 1238 } 1148 erfptr = dag_get_record( libtrace);1239 erfptr = dag_get_record(stream_data); 1149 1240 } while (erfptr == NULL); 1150 1241 1151 1242 /* Prepare the libtrace packet */ 1152 if (dag_prepare_packet(libtrace, packet, erfptr, TRACE_RT_DATA_ERF, 1153 flags)) 1154 return -1; 1155 1156 /* Update the DUCK timer */ 1157 tv = trace_get_timeval(packet); 1158 DUCK.last_pkt = tv.tv_sec; 1243 if (dag_prepare_packet_real(libtrace, stream_data, packet, erfptr, 1244 TRACE_RT_DATA_ERF, flags)) 1245 return -1; 1159 1246 1160 1247 return packet->payload ? htons(erfptr->rlen) : 1161 1248 erf_get_framing_length(packet); 1249 } 1250 1251 static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) 1252 { 1253 return dag_read_packet_real(libtrace, FORMAT_DATA_FIRST, NULL, packet); 1254 } 1255 1256 static int dag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t, 1257 libtrace_packet_t **packets, size_t nb_packets) 1258 { 1259 int ret; 1260 size_t read_packets = 0; 1261 int numbytes = 0; 1262 1263 struct dag_per_stream_t *stream_data = 1264 (struct dag_per_stream_t *)t->format_data; 1265 1266 /* Read as many packets as we can, but read atleast one packet */ 1267 do { 1268 ret = dag_read_packet_real(libtrace, stream_data, t, 1269 packets[read_packets]); 1270 if (ret < 0) 1271 return ret; 1272 1273 read_packets++; 1274 1275 /* Make sure we don't read too many packets..! */ 1276 if (read_packets >= nb_packets) 1277 break; 1278 1279 numbytes = dag_available(libtrace, stream_data); 1280 } while (numbytes >= dag_record_size); 1281 1282 return read_packets; 1162 1283 } 1163 1284 … … 1178 1299 minwait.tv_usec = 10000; 1179 1300 1180 if (dag_set_stream_poll(FORMAT_DATA ->device->fd,1181 FORMAT_DATA ->dagstream, 0, &minwait,1301 if (dag_set_stream_poll(FORMAT_DATA_FIRST->device->fd, 1302 FORMAT_DATA_FIRST->dagstream, 0, &minwait, 1182 1303 &minwait) == -1) { 1183 1304 trace_set_err(libtrace, errno, "dag_set_stream_poll"); … … 1192 1313 /* Need to call dag_available so that the top pointer will get 1193 1314 * updated, otherwise we'll never see any data! */ 1194 numbytes = dag_available(libtrace );1315 numbytes = dag_available(libtrace, FORMAT_DATA_FIRST); 1195 1316 1196 1317 /* May as well not bother calling dag_get_record if 1197 1318 * dag_available suggests that there's no data */ 1198 1319 if (numbytes != 0) 1199 erfptr = dag_get_record( libtrace);1320 erfptr = dag_get_record(FORMAT_DATA_FIRST); 1200 1321 if (erfptr == NULL) { 1201 1322 /* No packet available - sleep for a very short time */ … … 1208 1329 break; 1209 1330 } 1210 if (dag_prepare_packet (libtrace, packet, erfptr,1211 1331 if (dag_prepare_packet_real(libtrace, FORMAT_DATA_FIRST, packet, 1332 erfptr, TRACE_RT_DATA_ERF, flags)) { 1212 1333 event.type = TRACE_EVENT_TERMINATE; 1213 1334 break; … … 1259 1380 1260 1381 /* Gets the number of dropped packets */ 1261 static uint64_t dag_get_dropped_packets(libtrace_t * trace)1382 static uint64_t dag_get_dropped_packets(libtrace_t *libtrace) 1262 1383 { 1263 1384 uint64_t sum = 0; 1264 int i, tot; 1265 1266 if (trace->format_data == NULL) 1267 return (uint64_t) - 1; 1268 1269 if (DATA(trace)->per_thread) { 1270 tot = trace->perpkt_thread_count; 1271 1272 for (i = 0; i < tot; i++) { 1273 printf("t%d: drops %" PRIu64 "\n", 1274 DATA(trace)->per_thread[i].drops); 1275 sum += DATA(trace)->per_thread[i].drops; 1276 } 1277 } 1278 1279 sum += DATA(trace)->drops; 1385 libtrace_list_node_t *tmp = FORMAT_DATA_HEAD; 1386 1387 /* Sum the drop counter for all the packets */ 1388 while (tmp != NULL) { 1389 sum += STREAM_DATA(tmp)->drops; 1390 tmp = tmp->next; 1391 } 1280 1392 1281 1393 return sum; … … 1295 1407 } 1296 1408 1297 static int dag_pstart_input(libtrace_t *libtrace) 1298 { 1299 char *scan, *tok; 1300 uint16_t stream_count = 0, max_streams; 1301 /* We keep our own pointer to per_thread as the system will free 1302 * up FORMAT_DATA without freeing this if something goes wrong */ 1303 struct dag_per_thread_t *per_thread = NULL; 1304 int iserror = 0; 1305 1306 /* Check we aren't trying to create more threads than the DAG card can 1307 * handle */ 1308 max_streams = dag_rx_get_stream_count(FORMAT_DATA->device->fd); 1309 if (libtrace->perpkt_thread_count > max_streams) { 1310 trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT, 1311 "trying to create too many threads (max is %u)", 1312 max_streams); 1313 iserror = 1; 1314 goto cleanup; 1315 } 1316 1317 /* Create the thread structures */ 1318 per_thread = calloc(libtrace->perpkt_thread_count, 1319 sizeof(struct dag_per_thread_t)); 1320 FORMAT_DATA->per_thread = per_thread; 1321 1322 /* Get the stream names from the uri */ 1323 if ((scan = strchr(libtrace->uridata, ',')) == NULL) { 1324 trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT, 1325 "format uri doesn't specify the DAG streams"); 1326 iserror = 1; 1327 goto cleanup; 1328 } 1329 1330 scan++; 1331 1332 tok = strtok(scan, ","); 1333 while (tok != NULL) { 1334 /* Ensure we haven't specified too many streams */ 1335 if (stream_count >= libtrace->perpkt_thread_count) { 1336 trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT, 1337 "format uri specifies too many streams. " 1338 "Max is %u", max_streams); 1339 iserror = 1; 1340 goto cleanup; 1341 } 1342 1343 /* Save the stream details */ 1344 per_thread[stream_count].device = FORMAT_DATA->device; 1345 per_thread[stream_count++].stream = (uint16_t)atoi(tok); 1346 1347 tok = strtok(NULL, ","); 1348 } 1349 1350 cleanup: 1351 if (iserror) { 1352 /* Free the per_thread memory */ 1353 free(per_thread); 1354 return -1; 1355 } else { 1356 return 0; 1357 } 1358 } 1359 1360 1361 1362 /* TODO: Fold this into dag_available */ 1363 static int dag_pavailable(libtrace_t *libtrace, libtrace_thread_t *t) 1364 { 1365 uint32_t diff = PERPKT_DATA(t)->top - PERPKT_DATA(t)->bottom; 1366 1367 /* If we've processed more than 4MB of data since we last called 1368 * dag_advance_stream, then we should call it again to allow the 1369 * space occupied by that 4MB to be released */ 1370 if (diff >= dag_record_size && PERPKT_DATA(t)->processed < 1371 4*1024*1024) 1372 return diff; 1373 1374 /* Update top and bottom pointers */ 1375 PERPKT_DATA(t)->top = dag_advance_stream(PERPKT_DATA(t)->device->fd, 1376 PERPKT_DATA(t)->stream, 1377 &(PERPKT_DATA(t)->bottom)); 1378 1379 if (PERPKT_DATA(t)->top == NULL) { 1380 trace_set_err(libtrace, errno, "dag_advance_stream failed!"); 1381 return -1; 1382 } 1383 1384 PERPKT_DATA(t)->processed = 0; 1385 diff = PERPKT_DATA(t)->top - PERPKT_DATA(t)->bottom; 1386 return diff; 1387 } 1388 1389 /* TODO: Fold this into dag_get_record */ 1390 static dag_record_t *dag_pget_record(libtrace_t *libtrace, 1391 libtrace_thread_t *t) 1392 { 1393 dag_record_t *erfptr = NULL; 1394 uint16_t size; 1395 1396 erfptr = (dag_record_t *)PERPKT_DATA(t)->bottom; 1397 if (!erfptr) 1398 return NULL; 1399 1400 /* Ensure we have a whole record */ 1401 size = ntohs(erfptr->rlen); 1402 assert(size >= dag_record_size); 1403 if (size > (PERPKT_DATA(t)->top - PERPKT_DATA(t)->bottom)) 1404 return NULL; 1405 1406 /* Advance the buffer pointers */ 1407 PERPKT_DATA(t)->bottom += size; 1408 PERPKT_DATA(t)->processed += size; 1409 1410 return erfptr; 1411 } 1412 1413 static int dag_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t, 1414 libtrace_packet_t *packet) 1415 { 1416 dag_record_t *erfptr = NULL; 1417 int numbytes = 0; 1418 uint32_t flags = 0; 1419 struct timeval maxwait, pollwait; 1420 1421 pollwait.tv_sec = 0; 1422 pollwait.tv_usec = 10000; 1423 maxwait.tv_sec = 0; 1424 maxwait.tv_usec = 250000; 1425 1426 /* TODO: Support DUCK reporting */ 1427 1428 /* Don't let anyone try to free our DAG memory hole! */ 1429 flags |= TRACE_PREP_DO_NOT_OWN_BUFFER; 1430 1431 /* If the packet buffer is currently owned by libtrace, free it so 1432 * that we can set the packet to point into the DAG memory hole */ 1433 if (packet->buf_control == TRACE_CTRL_PACKET) { 1434 free(packet->buffer); 1435 packet->buffer = 0; 1436 } 1437 1438 /* Configure DAG card stream polling */ 1439 if (dag_set_stream_poll(PERPKT_DATA(t)->device->fd, 1440 PERPKT_DATA(t)->stream, sizeof(dag_record_t), 1441 &maxwait, &pollwait) < 0) { 1442 trace_set_err(libtrace, errno, "dag_set_stream_poll"); 1443 return -1; 1444 } 1445 1446 /* Grab an ERF record */ 1447 do { 1448 numbytes = dag_pavailable(libtrace, t); 1449 if (numbytes < 0) 1450 return numbytes; 1451 if (numbytes < dag_record_size) { 1452 if (libtrace_halt) 1453 return 0; 1454 1455 /* Check message queue to see if we should 1456 * abort early */ 1457 if (libtrace_message_queue_count(&t->messages) > 0) 1458 return -2; 1459 1460 /* Keep trying until we see a packet */ 1461 continue; 1462 } 1463 1464 erfptr = dag_pget_record(libtrace, t); 1465 } while (erfptr == NULL); 1466 1467 /* Prepare the libtrace packet */ 1468 if (dag_prepare_packet(libtrace, packet, erfptr, TRACE_RT_DATA_ERF, 1469 flags)) 1470 return -1; 1471 1472 PERPKT_DATA(t)->pkt_count++; 1473 1474 return packet->payload ? htons(erfptr->rlen) : 1475 erf_get_framing_length(packet); 1476 } 1477 1478 static int dag_ppause_input(libtrace_t *libtrace) 1479 { 1480 int i, tot = libtrace->perpkt_thread_count; 1481 struct dag_per_thread_t *t_data; 1482 1483 /* Stop and detach all the streams */ 1484 printf("Stopping and detaching all streams\n"); 1485 for (i = 0; i < tot; i++) { 1486 t_data = &FORMAT_DATA->per_thread[i]; 1487 1488 if (dag_stop_stream(t_data->device->fd, 1489 t_data->stream) < 0) { 1490 trace_set_err(libtrace, errno, 1491 "can't stop DAG stream #%u", 1492 t_data->stream); 1493 return -1; 1494 } 1495 1496 if (dag_detach_stream(t_data->device->fd, 1497 t_data->stream) < 0) { 1498 trace_set_err(libtrace, errno, 1499 "can't detach DAG stream #%u", 1500 t_data->stream); 1501 return -1; 1502 } 1503 } 1504 1505 /* Free up the per_thread array */ 1506 free(FORMAT_DATA->per_thread); 1507 FORMAT_DATA->per_thread = NULL; 1508 1509 return 0; 1510 } 1511 1512 static int dag_pconfig_input(libtrace_t *libtrace, 1513 trace_parallel_option_t option, void *value) 1409 static int dag_pconfig_input(UNUSED libtrace_t *libtrace, 1410 trace_parallel_option_t option, UNUSED void *value) 1514 1411 { 1515 1412 /* We don't support any of these! Normally you configure the DAG card … … 1530 1427 } 1531 1428 1429 /* TODO: Should possibly make a more generic dag_start_input, as there's a 1430 * fair bit of code duplication between that and this */ 1532 1431 static int dag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t, 1533 1432 bool reader) 1534 1433 { 1535 /* XXX: This function gets run sequentially for all1536 * threads. Should investigate making it parallel as draining the1537 * memory could be needlessly time consuming.1538 */1539 uint8_t *top, *bottom;1540 /* XXX: Investigate this type, as I would assume the value1541 * could be larger than 255 */1542 uint8_t diff = 0;1543 1434 struct timeval zero, nopoll; 1544 1435 uint8_t *top, *bottom, *starttop; 1436 struct dag_per_stream_t *stream_data; 1545 1437 top = bottom = NULL; 1546 1438 … … 1552 1444 if (reader) { 1553 1445 if (t->type == THREAD_PERPKT) { 1446 stream_data = 1447 (struct dag_per_stream_t *) 1448 libtrace_list_get_index(FORMAT_DATA->per_stream, 1449 t->perpkt_num)->data; 1450 1554 1451 /* Pass the per thread data to the thread */ 1555 t->format_data = 1556 &FORMAT_DATA->per_thread[t->perpkt_num]; 1452 t->format_data = stream_data; 1557 1453 1558 1454 /* Attach and start the DAG stream */ 1559 1455 printf("t%u: starting and attaching stream #%u\n", 1560 t->perpkt_num, PERPKT_DATA(t)->stream);1561 if (dag_attach_stream( PERPKT_DATA(t)->device->fd,1562 PERPKT_DATA(t)->stream, 0,1456 t->perpkt_num, stream_data->dagstream); 1457 if (dag_attach_stream(stream_data->device->fd, 1458 stream_data->dagstream, 0, 1563 1459 0) < 0) { 1460 printf("can't attach DAG stream #%u\n", 1461 stream_data->dagstream); 1564 1462 trace_set_err(libtrace, errno, 1565 1463 "can't attach DAG stream #%u", 1566 PERPKT_DATA(t)->stream);1464 stream_data->dagstream); 1567 1465 return -1; 1568 1466 } 1569 if (dag_start_stream( PERPKT_DATA(t)->device->fd,1570 PERPKT_DATA(t)->stream) < 0) {1467 if (dag_start_stream(stream_data->device->fd, 1468 stream_data->dagstream) < 0) { 1571 1469 trace_set_err(libtrace, errno, 1572 1470 "can't start DAG stream #%u", 1573 PERPKT_DATA(t)->stream); 1471 stream_data->dagstream); 1472 printf("can't start DAG stream #%u\n", 1473 stream_data->dagstream); 1574 1474 return -1; 1575 1475 } 1576 1476 1577 /* Ensure that dag_advance_stream will return without blocking */ 1578 if(dag_set_stream_poll(PERPKT_DATA(t)->device->fd, 1579 PERPKT_DATA(t)->stream, 0, &zero, 1477 /* Ensure that dag_advance_stream will return without 1478 * blocking */ 1479 if(dag_set_stream_poll(stream_data->device->fd, 1480 stream_data->dagstream, 0, &zero, 1580 1481 &nopoll) < 0) { 1581 1482 trace_set_err(libtrace, errno, … … 1585 1486 1586 1487 /* Clear all the data from the memory hole */ 1587 do { 1588 top = dag_advance_stream(PERPKT_DATA(t)-> 1488 starttop = dag_advance_stream(stream_data-> 1489 device->fd, 1490 stream_data->dagstream, 1491 &bottom); 1492 1493 top = starttop; 1494 while (starttop - bottom > 0) { 1495 bottom += (starttop - bottom); 1496 top = dag_advance_stream(stream_data-> 1589 1497 device->fd, 1590 PERPKT_DATA(t)->stream,1498 stream_data->dagstream, 1591 1499 &bottom); 1592 1593 assert(top && bottom); 1594 diff = top - bottom; 1595 bottom -= diff; 1596 } while (diff != 0); 1597 1598 PERPKT_DATA(t)->top = NULL; 1599 PERPKT_DATA(t)->bottom = NULL; 1600 PERPKT_DATA(t)->pkt_count = 0; 1601 PERPKT_DATA(t)->drops = 0; 1500 } 1501 stream_data->top = top; 1502 stream_data->bottom = bottom; 1503 stream_data->pkt_count = 0; 1504 stream_data->drops = 0; 1602 1505 } else { 1603 /* TODO: Figure out why we need this */ 1604 t->format_data = &FORMAT_DATA->per_thread[0]; 1506 /* TODO: Figure out why t->type != THREAD_PERPKT in 1507 * order to figure out what this line does */ 1508 t->format_data = FORMAT_DATA_FIRST; 1605 1509 } 1606 1510 } … … 1652 1556 dag_help, /* help */ 1653 1557 NULL, /* next pointer */ 1654 1655 1656 dag_pread_packet,1657 dag_ppause_input,1658 1659 1660 1661 1558 {true, 0}, /* live packet capture, thread limit TBD */ 1559 dag_pstart_input, 1560 dag_pread_packets, 1561 dag_pause_input, 1562 NULL, 1563 dag_pconfig_input, 1564 dag_pregister_thread, 1565 NULL 1662 1566 }; 1663 1567 -
lib/libtrace_int.h
r12ae766 r0b01fea 153 153 #include "data-struct/message_queue.h" 154 154 #include "data-struct/deque.h" 155 #include "data-struct/linked_list.h" 155 156 #include "data-struct/sliding_window.h" 156 157 … … 1173 1174 /** Constructor for the Linux Native format module */ 1174 1175 void linuxnative_constructor(void); 1176 /** Constructor for the Linux Ring format module */ 1177 void linuxring_constructor(void); 1175 1178 /** Constructor for the PCAP format module */ 1176 1179 void pcap_constructor(void); -
lib/trace.c
r858ce90 r0b01fea 143 143 legacy_constructor(); 144 144 atmhdr_constructor(); 145 linuxring_constructor(); 145 146 linuxnative_constructor(); 146 147 #ifdef HAVE_LIBPCAP
Note: See TracChangeset
for help on using the changeset viewer.