Changeset 5ef19d8 for lib/format_etsilive.c
- Timestamp:
- 05/18/18 11:39:59 (3 years ago)
- Branches:
- cachetimestamps, develop, etsilive, master, rc-4.0.4, ringdecrementfix, ringperformance
- Children:
- 25c305d
- Parents:
- b94478f
- git-author:
- Shane Alcock <salcock@…> (03/06/18 17:36:45)
- git-committer:
- Shane Alcock <salcock@…> (05/18/18 11:39:59)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
lib/format_etsilive.c
r00365c6 r5ef19d8 90 90 } newsend_message_t; 91 91 92 static int send_etsili_keepalive_response(int fd, int64_t seqno); 92 93 93 94 static void *etsi_listener(void *tdata) { … … 292 293 etsisocket_t src = receiver->sources[i]; 293 294 libtrace_scb_destroy(&(src.recvbuffer)); 294 close(src.sock); 295 if (src.srcaddr) { 296 free(src.srcaddr); 297 } 298 if (src.sock != -1) { 299 close(src.sock); 300 } 295 301 } 296 302 wandder_free_etsili_decoder(receiver->etsidec); … … 393 399 } 394 400 401 static inline void inspect_next_packet(etsisocket_t *sock, 402 etsisocket_t **earliestsock, uint64_t *earliesttime, 403 wandder_etsispec_t *dec) { 404 405 406 struct timeval tv; 407 uint32_t available; 408 uint8_t *ptr = NULL; 409 uint32_t reclen = 0; 410 uint64_t current; 411 412 if (sock->sock == -1) { 413 return; 414 } 415 /* Have we already successfully decoded this? Cool, 416 * just use whatever we cached last time. 417 */ 418 if (sock->cached.timestamp != 0) { 419 current = sock->cached.timestamp; 420 421 if (*earliesttime == 0 || *earliesttime > current) { 422 *earliesttime = current; 423 *earliestsock = sock; 424 } 425 return; 426 } 427 428 ptr = libtrace_scb_get_read(&(sock->recvbuffer), &available); 429 430 if (available == 0 || ptr == NULL) { 431 return; 432 } 433 434 wandder_attach_etsili_buffer(dec, ptr, available, false); 435 if (sock->cached.length != 0) { 436 reclen = sock->cached.length; 437 } else { 438 reclen = wandder_etsili_get_pdu_length(dec); 439 440 if (reclen == 0) { 441 return; 442 } 443 } 444 445 if (available < reclen) { 446 /* Don't have the whole PDU yet */ 447 return; 448 } 449 450 if (wandder_etsili_is_keepalive(dec)) { 451 int64_t kaseq = wandder_etsili_get_sequence_number(dec); 452 if (kaseq < 0) { 453 fprintf(stderr, "bogus sequence number in ETSILI keep alive.\n"); 454 close(sock->sock); 455 sock->sock = -1; 456 return; 457 } 458 /* Send keep alive response */ 459 if (send_etsili_keepalive_response(sock->sock, kaseq) < 0) { 460 fprintf(stderr, "error sending response to ETSILI keep alive: %s.\n", strerror(errno)); 461 close(sock->sock); 462 sock->sock = -1; 463 return; 464 } 465 /* Skip past KA */ 466 libtrace_scb_advance_read(&(sock->recvbuffer), reclen); 467 return; 468 } 469 470 /* Get the timestamp */ 471 472 tv = wandder_etsili_get_header_timestamp(dec); 473 if (tv.tv_sec == 0) { 474 return; 475 } 476 current = ((((uint64_t)tv.tv_sec) << 32) + 477 (((uint64_t)tv.tv_usec << 32)/1000000)); 478 479 /* Success, cache everything we used so we don't have to 480 * decode this packet again. 481 */ 482 sock->cached.timestamp = current; 483 sock->cached.length = reclen; 484 485 486 /* Don't forget to update earliest and esock... */ 487 if (current < *earliesttime || *earliesttime == 0) { 488 *earliestsock = sock; 489 *earliesttime = current; 490 } 491 } 492 395 493 static etsisocket_t *select_next_packet(etsithread_t *et, libtrace_t *libtrace) { 396 494 … … 398 496 etsisocket_t *esock = NULL; 399 497 uint64_t earliest = 0; 400 uint64_t current;401 struct timeval tv;402 uint32_t available;403 uint8_t *ptr = NULL;404 wandder_decoder_t dec;405 uint32_t reclen = 0;406 498 407 499 for (i = 0; i < et->sourcecount; i++) { 408 if (et->sources[i].sock == -1) { 409 continue; 410 } 411 /* Have we already successfully decoded this? Cool, 412 * just use whatever we cached last time. 413 */ 414 if (et->sources[i].cached.timestamp != 0) { 415 current = et->sources[i].cached.timestamp; 416 417 if (earliest == 0 || earliest > current) { 418 earliest = current; 419 esock = &(et->sources[i]); 420 } 421 continue; 422 } 423 424 ptr = libtrace_scb_get_read(&(et->sources[i].recvbuffer), 425 &available); 426 427 if (available == 0 || ptr == NULL) { 428 continue; 429 } 430 431 wandder_attach_etsili_buffer(et->etsidec, ptr, available, false); 432 if (et->sources[i].cached.length != 0) { 433 reclen = et->sources[i].cached.length; 434 } else { 435 reclen = wandder_etsili_get_pdu_length(et->etsidec); 436 437 if (reclen == 0) { 438 continue; 439 } 440 } 441 442 if (available < reclen) { 443 /* Don't have the whole PDU yet */ 444 continue; 445 } 446 447 /* Get the timestamp */ 448 449 tv = wandder_etsili_get_header_timestamp(et->etsidec); 450 if (tv.tv_sec == 0) { 451 continue; 452 } 453 current = ((((uint64_t)tv.tv_sec) << 32) + 454 (((uint64_t)tv.tv_usec << 32)/1000000)); 455 456 /* Success, cache everything we used so we don't have to 457 * decode this packet again. 458 */ 459 et->sources[i].cached.timestamp = current; 460 et->sources[i].cached.length = reclen; 461 462 463 /* Don't forget to update earliest and esock... */ 464 if (current < earliest || earliest == 0) { 465 esock = &(et->sources[i]); 466 earliest = current; 467 } 468 500 inspect_next_packet(&(et->sources[i]), &esock, &earliest, 501 et->etsidec); 469 502 } 470 503 return esock; … … 604 637 etsilive_init_input, /* init_input */ 605 638 NULL, /* config_input */ 606 etsilive_start_input, /* sta etsilive_input */639 etsilive_start_input, /* start_input */ 607 640 etsilive_pause_input, /* pause */ 608 641 NULL, /* init_output */ 609 642 NULL, /* config_output */ 610 NULL, /* sta etsilive_output */643 NULL, /* start_output */ 611 644 etsilive_fin_input, /* fin_input */ 612 645 NULL, /* fin_output */ … … 644 677 register_format(&etsilive); 645 678 } 679 680 681 #define ENC_USEQUENCE(enc) wandder_encode_next(enc, WANDDER_TAG_SEQUENCE, \ 682 WANDDER_CLASS_UNIVERSAL_CONSTRUCT, WANDDER_TAG_SEQUENCE, NULL, 0) 683 684 #define ENC_CSEQUENCE(enc, x) wandder_encode_next(enc, WANDDER_TAG_SEQUENCE, \ 685 WANDDER_CLASS_CONTEXT_CONSTRUCT, x, NULL, 0) 686 687 #define LT_ETSI_LIID "none" 688 #define LT_ETSI_NA "NA" 689 #define LT_ETSI_OPERATOR "libtrace" 690 691 static int send_etsili_keepalive_response(int fd, int64_t seqno) { 692 693 wandder_encoder_t *encoder; 694 uint8_t *tosend; 695 uint32_t tosendlen; 696 int ret = 0; 697 uint64_t zero = 0; 698 struct timeval tv; 699 700 encoder = init_wandder_encoder(); 701 702 ENC_USEQUENCE(encoder); // starts outermost sequence 703 704 ENC_CSEQUENCE(encoder, 1); 705 wandder_encode_next(encoder, WANDDER_TAG_OID, 706 WANDDER_CLASS_CONTEXT_PRIMITIVE, 0, 707 WANDDER_ETSILI_PSDOMAINID, 708 sizeof(WANDDER_ETSILI_PSDOMAINID)); 709 wandder_encode_next(encoder, WANDDER_TAG_OCTETSTRING, 710 WANDDER_CLASS_CONTEXT_PRIMITIVE, 1, LT_ETSI_LIID, 711 strlen(LT_ETSI_LIID)); 712 wandder_encode_next(encoder, WANDDER_TAG_PRINTABLE, 713 WANDDER_CLASS_CONTEXT_PRIMITIVE, 2, LT_ETSI_NA, 714 strlen(LT_ETSI_NA)); 715 716 ENC_CSEQUENCE(encoder, 3); 717 718 ENC_CSEQUENCE(encoder, 0); 719 wandder_encode_next(encoder, WANDDER_TAG_OCTETSTRING, 720 WANDDER_CLASS_CONTEXT_PRIMITIVE, 0, LT_ETSI_OPERATOR, 721 strlen(LT_ETSI_OPERATOR)); 722 723 wandder_encode_next(encoder, WANDDER_TAG_OCTETSTRING, 724 WANDDER_CLASS_CONTEXT_PRIMITIVE, 1, LT_ETSI_OPERATOR, 725 strlen(LT_ETSI_OPERATOR)); 726 wandder_encode_endseq(encoder); 727 728 wandder_encode_next(encoder, WANDDER_TAG_INTEGER, 729 WANDDER_CLASS_CONTEXT_PRIMITIVE, 1, &(zero), 730 sizeof(zero)); 731 wandder_encode_next(encoder, WANDDER_TAG_PRINTABLE, 732 WANDDER_CLASS_CONTEXT_PRIMITIVE, 2, LT_ETSI_NA, 733 strlen(LT_ETSI_NA)); 734 wandder_encode_endseq(encoder); 735 736 wandder_encode_next(encoder, WANDDER_TAG_INTEGER, 737 WANDDER_CLASS_CONTEXT_PRIMITIVE, 4, &(seqno), 738 sizeof(seqno)); 739 740 gettimeofday(&tv, NULL); 741 wandder_encode_next(encoder, WANDDER_TAG_GENERALTIME, 742 WANDDER_CLASS_CONTEXT_PRIMITIVE, 5, &tv, 743 sizeof(struct timeval)); 744 745 wandder_encode_endseq(encoder); 746 747 ENC_CSEQUENCE(encoder, 2); // Payload 748 ENC_CSEQUENCE(encoder, 2); // TRIPayload 749 wandder_encode_next(encoder, WANDDER_TAG_NULL, 750 WANDDER_CLASS_CONTEXT_PRIMITIVE, 4, NULL, 0); 751 wandder_encode_endseq(encoder); // End TRIPayload 752 wandder_encode_endseq(encoder); // End Payload 753 wandder_encode_endseq(encoder); // End Outermost Sequence 754 755 tosend = wandder_encode_finish(encoder, &tosendlen); 756 757 if (tosend != NULL) { 758 /* Will block, but hopefully we shouldn't be doing much 759 * sending. 760 */ 761 ret = send(fd, tosend, tosendlen, 0); 762 } 763 764 free_wandder_encoder(encoder); 765 return ret; 766 } 767 768
Note: See TracChangeset
for help on using the changeset viewer.