Changeset f0b87a7
- Timestamp:
- 03/23/09 12:27:39 (13 years ago)
- Branches:
- 4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, getfragoff, help, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
- Children:
- 5f4bef4
- Parents:
- 013de36e
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
lib/format_dag25.c
r91b72d3 rf0b87a7 67 67 #define DATA(x) ((struct dag_format_data_t *)x->format_data) 68 68 #define FORMAT_DATA DATA(libtrace) 69 #define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data) 70 #define FORMAT_DATA_OUT DATA_OUT(libtrace) 71 69 72 #define DUCK FORMAT_DATA->duck 70 73 static struct libtrace_format_t dag; … … 77 80 struct dag_dev_t *prev; 78 81 struct dag_dev_t *next; 79 }; 80 81 struct dag_format_data_t { 82 struct { 83 uint32_t last_duck; 84 uint32_t duck_freq; 85 uint32_t last_pkt; 86 libtrace_t *dummy_duck; 87 } duck; 88 82 }; 83 84 struct dag_format_data_out_t { 89 85 struct dag_dev_t *device; 90 86 unsigned int dagstream; … … 96 92 }; 97 93 94 struct dag_format_data_t { 95 struct { 96 uint32_t last_duck; 97 uint32_t duck_freq; 98 uint32_t last_pkt; 99 libtrace_t *dummy_duck; 100 } duck; 101 102 struct dag_dev_t *device; 103 unsigned int dagstream; 104 int stream_attached; 105 uint8_t *bottom; 106 uint8_t *top; 107 uint32_t processed; 108 uint64_t drops; 109 }; 110 98 111 pthread_mutex_t open_dag_mutex; 99 112 struct dag_dev_t *open_dags = NULL; 100 113 101 static void dag_probe_filename(const char *filename) 114 static void dag_probe_filename(const char *filename) 102 115 { 103 116 struct stat statbuf; … … 114 127 } 115 128 129 static void dag_init_format_out_data(libtrace_out_t *libtrace) { 130 libtrace->format_data = (struct dag_format_data_out_t *) malloc(sizeof(struct dag_format_data_out_t)); 131 // no DUCK on output 132 FORMAT_DATA_OUT->stream_attached = 0; 133 FORMAT_DATA_OUT->drops = 0; 134 FORMAT_DATA_OUT->device = NULL; 135 FORMAT_DATA_OUT->dagstream = 0; 136 FORMAT_DATA_OUT->processed = 0; 137 FORMAT_DATA_OUT->bottom = NULL; 138 FORMAT_DATA_OUT->top = NULL; 139 140 } 141 116 142 static void dag_init_format_data(libtrace_t *libtrace) { 117 143 libtrace->format_data = (struct dag_format_data_t *) 118 144 malloc(sizeof(struct dag_format_data_t)); 119 145 DUCK.last_duck = 0; 120 146 DUCK.duck_freq = 0; … … 133 159 static struct dag_dev_t *dag_find_open_device(char *dev_name) { 134 160 struct dag_dev_t *dag_dev; 135 161 136 162 dag_dev = open_dags; 137 163 … … 142 168 dag_dev->ref_count ++; 143 169 return dag_dev; 144 170 145 171 } 146 172 dag_dev = dag_dev->next; 147 173 } 148 174 return NULL; 149 150 175 176 151 177 } 152 178 … … 154 180 static void dag_close_device(struct dag_dev_t *dev) { 155 181 /* Need to remove from the device list */ 156 182 157 183 assert(dev->ref_count == 0); 158 184 159 185 if (dev->prev == NULL) { 160 186 open_dags = dev->next; … … 168 194 169 195 dag_close(dev->fd); 196 if (dev->dev_name) 170 197 free(dev->dev_name); 171 198 free(dev); 172 199 } 200 201 static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace, char *dev_name) { 202 struct stat buf; 203 int fd; 204 struct dag_dev_t *new_dev; 205 206 if (stat(dev_name, &buf) == -1) { 207 trace_set_err_out(libtrace,errno,"stat(%s)",dev_name); 208 return NULL; 209 } 210 211 if (S_ISCHR(buf.st_mode)) { 212 if((fd = dag_open(dev_name)) < 0) { 213 trace_set_err_out(libtrace,errno,"Cannot open DAG %s", 214 dev_name); 215 return NULL; 216 } 217 } else { 218 trace_set_err_out(libtrace,errno,"Not a valid dag device: %s", 219 dev_name); 220 return NULL; 221 } 222 223 new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t)); 224 new_dev->fd = fd; 225 new_dev->dev_name = dev_name; 226 new_dev->ref_count = 1; 227 228 new_dev->prev = NULL; 229 new_dev->next = open_dags; 230 if (open_dags) 231 open_dags->prev = new_dev; 232 233 open_dags = new_dev; 234 235 return new_dev; 173 236 } 174 237 … … 178 241 int fd; 179 242 struct dag_dev_t *new_dev; 180 243 181 244 if (stat(dev_name, &buf) == -1) { 182 245 trace_set_err(libtrace,errno,"stat(%s)",dev_name); 183 246 return NULL; 184 247 } 185 248 186 249 if (S_ISCHR(buf.st_mode)) { 187 250 if((fd = dag_open(dev_name)) < 0) { … … 195 258 return NULL; 196 259 } 197 260 198 261 new_dev = (struct dag_dev_t *)malloc(sizeof(struct dag_dev_t)); 199 262 new_dev->fd = fd; 200 263 new_dev->dev_name = dev_name; 201 264 new_dev->ref_count = 1; 202 265 203 266 new_dev->prev = NULL; 204 267 new_dev->next = open_dags; 205 268 if (open_dags) 206 269 open_dags->prev = new_dev; 207 270 208 271 open_dags = new_dev; 209 272 210 273 return new_dev; 211 274 } 212 275 276 static int dag_init_output(libtrace_out_t *libtrace) { 277 char *dag_dev_name = NULL; 278 char *scan = NULL; 279 struct dag_dev_t *dag_device = NULL; 280 int stream = 1; 281 282 dag_init_format_out_data(libtrace); 283 pthread_mutex_lock(&open_dag_mutex); 284 if ((scan = strchr(libtrace->uridata,',')) == NULL) { 285 dag_dev_name = strdup(libtrace->uridata); 286 } else { 287 dag_dev_name = (char *)strndup(libtrace->uridata, 288 (size_t)(scan - libtrace->uridata)); 289 stream = atoi(++scan); 290 } 291 FORMAT_DATA->dagstream = stream; 292 293 dag_device = dag_find_open_device(dag_dev_name); 294 295 if (dag_device == NULL) { 296 /* Device not yet opened - open it ourselves */ 297 dag_device = dag_open_output_device(libtrace, dag_dev_name); 298 } else { 299 free(dag_dev_name); 300 dag_dev_name = NULL; 301 } 302 303 if (dag_device == NULL) { 304 if (dag_dev_name) 305 free(dag_dev_name); 306 return -1; 307 } 308 309 FORMAT_DATA->device = dag_device; 310 pthread_mutex_unlock(&open_dag_mutex); 311 return 0; 312 } 213 313 214 314 static int dag_init_input(libtrace_t *libtrace) { … … 217 317 int stream = 0; 218 318 struct dag_dev_t *dag_device = NULL; 219 319 220 320 dag_init_format_data(libtrace); 221 321 pthread_mutex_lock(&open_dag_mutex); … … 227 327 stream = atoi(++scan); 228 328 } 229 230 231 232 /* For now, we don't offer the ability to select the stream */ 329 233 330 FORMAT_DATA->dagstream = stream; 234 331 … … 246 343 if (dag_dev_name) 247 344 free(dag_dev_name); 345 dag_dev_name = NULL; 248 346 return -1; 249 347 } 250 348 251 349 FORMAT_DATA->device = dag_device; 252 350 253 351 pthread_mutex_unlock(&open_dag_mutex); 254 352 return 0; 255 353 } 256 354 257 355 static int dag_config_input(libtrace_t *libtrace, trace_option_t option, 258 356 void *data) { … … 264 362 case TRACE_OPTION_SNAPLEN: 265 363 snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data); 266 if (dag_configure(FORMAT_DATA->device->fd, 364 if (dag_configure(FORMAT_DATA->device->fd, 267 365 conf_str) != 0) { 268 366 trace_set_err(libtrace, errno, "Failed to configure snaplen on DAG card: %s", libtrace->uridata); … … 275 373 case TRACE_OPTION_FILTER: 276 374 return -1; 277 case TRACE_OPTION_EVENT_REALTIME: 375 case TRACE_OPTION_EVENT_REALTIME: 278 376 return -1; 279 377 } 280 378 return -1; 379 } 380 static int dag_start_output(libtrace_out_t *libtrace) { 381 struct timeval zero, nopoll; 382 uint8_t *top, *bottom; 383 uint8_t diff = 0; 384 top = bottom = NULL; 385 386 zero.tv_sec = 0; 387 zero.tv_usec = 0; 388 nopoll = zero; 389 390 if (dag_attach_stream(FORMAT_DATA->device->fd, 391 FORMAT_DATA->dagstream, 0, 1048576) < 0) { 392 trace_set_err_out(libtrace, errno, "Cannot attach DAG stream"); 393 return -1; 394 } 395 396 if (dag_start_stream(FORMAT_DATA->device->fd, 397 FORMAT_DATA->dagstream) < 0) { 398 trace_set_err_out(libtrace, errno, "Cannot start DAG stream"); 399 return -1; 400 } 401 FORMAT_DATA->stream_attached = 1; 402 403 /* We don't want the dag card to do any sleeping */ 404 dag_set_stream_poll(FORMAT_DATA->device->fd, 405 FORMAT_DATA->dagstream, 0, &zero, 406 &nopoll); 407 408 return 0; 281 409 } 282 410 … … 292 420 293 421 294 295 if (dag_attach_stream(FORMAT_DATA->device->fd, 422 423 if (dag_attach_stream(FORMAT_DATA->device->fd, 296 424 FORMAT_DATA->dagstream, 0, 0) < 0) { 297 425 trace_set_err(libtrace, errno, "Cannot attach DAG stream"); … … 299 427 } 300 428 301 if (dag_start_stream(FORMAT_DATA->device->fd, 429 if (dag_start_stream(FORMAT_DATA->device->fd, 302 430 FORMAT_DATA->dagstream) < 0) { 303 431 trace_set_err(libtrace, errno, "Cannot start DAG stream"); … … 306 434 FORMAT_DATA->stream_attached = 1; 307 435 /* We don't want the dag card to do any sleeping */ 308 dag_set_stream_poll(FORMAT_DATA->device->fd, 309 FORMAT_DATA->dagstream, 0, &zero, 436 dag_set_stream_poll(FORMAT_DATA->device->fd, 437 FORMAT_DATA->dagstream, 0, &zero, 310 438 &nopoll); 311 439 312 440 /* Should probably flush the memory hole now */ 313 441 314 442 do { 315 443 top = dag_advance_stream(FORMAT_DATA->device->fd, … … 324 452 FORMAT_DATA->processed = 0; 325 453 FORMAT_DATA->drops = 0; 326 454 327 455 return 0; 328 456 } 329 457 458 static int dag_pause_output(libtrace_out_t *libtrace) { 459 if (dag_stop_stream(FORMAT_DATA->device->fd, 460 FORMAT_DATA->dagstream) < 0) { 461 trace_set_err_out(libtrace, errno, "Could not stop DAG stream"); 462 return -1; 463 } 464 if (dag_detach_stream(FORMAT_DATA->device->fd, 465 FORMAT_DATA->dagstream) < 0) { 466 trace_set_err_out(libtrace, errno, "Could not detach DAG stream"); 467 return -1; 468 } 469 FORMAT_DATA->stream_attached = 0; 470 return 0; 471 } 472 330 473 static int dag_pause_input(libtrace_t *libtrace) { 331 if (dag_stop_stream(FORMAT_DATA->device->fd, 474 if (dag_stop_stream(FORMAT_DATA->device->fd, 332 475 FORMAT_DATA->dagstream) < 0) { 333 476 trace_set_err(libtrace, errno, "Could not stop DAG stream"); 334 477 return -1; 335 478 } 336 if (dag_detach_stream(FORMAT_DATA->device->fd, 479 if (dag_detach_stream(FORMAT_DATA->device->fd, 337 480 FORMAT_DATA->dagstream) < 0) { 338 481 trace_set_err(libtrace, errno, "Could not detach DAG stream"); … … 348 491 dag_pause_input(libtrace); 349 492 FORMAT_DATA->device->ref_count --; 350 493 351 494 if (FORMAT_DATA->device->ref_count == 0) 352 495 dag_close_device(FORMAT_DATA->device); … … 356 499 pthread_mutex_unlock(&open_dag_mutex); 357 500 return 0; /* success */ 501 } 502 503 static int dag_fin_output(libtrace_out_t *libtrace) { 504 pthread_mutex_lock(&open_dag_mutex); 505 if (FORMAT_DATA->stream_attached) 506 dag_pause_output(libtrace); 507 FORMAT_DATA->device->ref_count --; 508 509 if (FORMAT_DATA->device->ref_count == 0) 510 dag_close_device(FORMAT_DATA->device); 511 free(libtrace->format_data); 512 pthread_mutex_unlock(&open_dag_mutex); 513 return 0; /* success */ 358 514 } 359 515 … … 376 532 /* No need to check if we can get DUCK or not - we're modern 377 533 * enough */ 378 if ((ioctl(FORMAT_DATA->device->fd, DAGIOCDUCK, 534 if ((ioctl(FORMAT_DATA->device->fd, DAGIOCDUCK, 379 535 (duckinf_t *)packet->payload) < 0)) { 380 536 trace_set_err(libtrace, errno, "Error using DAGIOCDUCK"); … … 392 548 uint32_t diff = FORMAT_DATA->top - FORMAT_DATA->bottom; 393 549 /* If we've processed more than 4MB of data since we last called 394 * dag_advance_stream, then we should call it again to allow the 550 * dag_advance_stream, then we should call it again to allow the 395 551 * space occupied by that 4MB to be released */ 396 552 if (diff >= dag_record_size && FORMAT_DATA->processed < 4 * 1024 * 1024) 397 553 return diff; 398 FORMAT_DATA->top = dag_advance_stream(FORMAT_DATA->device->fd, 399 FORMAT_DATA->dagstream, 554 FORMAT_DATA->top = dag_advance_stream(FORMAT_DATA->device->fd, 555 FORMAT_DATA->dagstream, 400 556 &(FORMAT_DATA->bottom)); 401 557 if (FORMAT_DATA->top == NULL) { … … 426 582 static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet, 427 583 void *buffer, libtrace_rt_types_t rt_type, uint32_t flags) { 428 584 429 585 dag_record_t *erfptr; 430 431 if (packet->buffer != buffer && 586 587 if (packet->buffer != buffer && 432 588 packet->buf_control == TRACE_CTRL_PACKET) { 433 589 free(packet->buffer); 434 590 } 435 591 436 592 if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) { 437 593 packet->buf_control = TRACE_CTRL_PACKET; 438 594 } else 439 595 packet->buf_control = TRACE_CTRL_EXTERNAL; 440 596 441 597 erfptr = (dag_record_t *)buffer; 442 598 packet->buffer = erfptr; 443 599 packet->header = erfptr; 444 600 packet->type = rt_type; 445 601 446 602 if (erfptr->flags.rxerror == 1) { 447 603 /* rxerror means the payload is corrupt - drop it … … 453 609 + erf_get_framing_length(packet); 454 610 } 455 611 456 612 if (libtrace->format_data == NULL) { 457 dag_init_format_data(libtrace); 458 } 459 460 /* No loss counter for DSM coloured records - have to use 613 dag_init_format_data(libtrace); 614 } 615 616 /* No loss counter for DSM coloured records - have to use 461 617 * some other API */ 462 618 if (erfptr->type == TYPE_DSM_COLOR_ETH) { 463 619 464 620 } else { 465 621 DATA(libtrace)->drops += ntohs(erfptr->lctr); … … 469 625 } 470 626 627 628 static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet) { 629 int err; 630 void *record; 631 int size = trace_get_capture_length(packet); 632 size = size + (8 - (size % 8)); 633 634 err = dag_tx_stream_copy_bytes(FORMAT_DATA->device->fd, FORMAT_DATA->dagstream, 635 packet->buffer, size); 636 637 if (err == NULL) 638 trace_set_err(libtrace, errno, "dag_tx_stream_copy_bytes failed!"); 639 640 return 0; 641 } 471 642 472 643 static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) { … … 476 647 int numbytes = 0; 477 648 uint32_t flags = 0; 478 649 479 650 if (DUCK.last_pkt - DUCK.last_duck > DUCK.duck_freq && 480 651 DUCK.duck_freq != 0) { … … 489 660 490 661 flags |= TRACE_PREP_DO_NOT_OWN_BUFFER; 491 662 492 663 if (packet->buf_control == TRACE_CTRL_PACKET) { 493 664 free(packet->buffer); … … 511 682 tv = trace_get_timeval(packet); 512 683 DUCK.last_pkt = tv.tv_sec; 513 514 return packet->payload ? htons(erfptr->rlen) : 684 685 return packet->payload ? htons(erfptr->rlen) : 515 686 erf_get_framing_length(packet); 516 687 } … … 522 693 int numbytes; 523 694 uint32_t flags = 0; 524 695 525 696 /* Need to call dag_available so that the top pointer will get 526 697 * updated, otherwise we'll never see any data! */ … … 538 709 } 539 710 //dag_form_packet(erfptr, packet); 540 if (dag_prepare_packet(trace, packet, erfptr, TRACE_RT_DATA_ERF, 711 if (dag_prepare_packet(trace, packet, erfptr, TRACE_RT_DATA_ERF, 541 712 flags)) { 542 713 event.type = TRACE_EVENT_TERMINATE; 543 714 return event; 544 715 } 545 546 716 717 547 718 event.size = trace_get_capture_length(packet) + trace_get_framing_length(packet); 548 719 if (trace->filter) { … … 593 764 dag_start_input, /* start_input */ 594 765 dag_pause_input, /* pause_input */ 595 NULL, /* init_output*/766 dag_init_output, /* init_output */ /* done */ 596 767 NULL, /* config_output */ 597 NULL, /* start_output*/768 dag_start_output, /* start_output */ /* done */ 598 769 dag_fin_input, /* fin_input */ 599 NULL, /* fin_output*/770 dag_fin_output, /* fin_output */ /* done */ 600 771 dag_read_packet, /* read_packet */ 601 772 dag_prepare_packet, /* prepare_packet */ 602 773 NULL, /* fin_packet */ 603 NULL, /* write_packet*/774 dag_write_packet, /* write_packet */ /* todo */ 604 775 erf_get_link_type, /* get_link_type */ 605 776 erf_get_direction, /* get_direction */
Note: See TracChangeset
for help on using the changeset viewer.