- Timestamp:
- 11/10/17 14:37:19 (3 years ago)
- Branches:
- cachetimestamps, develop, dpdk-ndag, etsilive, master, ndag_format, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance
- Children:
- d39cd1e
- Parents:
- 0317e3c
- Location:
- lib
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
lib/format_ndag.c
r05b65ae r4bab977 34 34 volatile int ndag_paused = 0; 35 35 36 typedef struct monitor { 37 uint16_t monitorid; 38 uint64_t laststart; 39 } ndag_monitor_t; 40 41 36 42 typedef struct streamsource { 37 43 uint16_t monitor; … … 47 53 uint16_t port; 48 54 uint32_t expectedseq; 49 uint16_t monitorid; 50 55 ndag_monitor_t *monitorptr; 51 56 char **saved; 52 57 char *nextread; … … 64 69 libtrace_message_queue_t mqueue; 65 70 int threadindex; 71 ndag_monitor_t *knownmonitors; 72 uint16_t monitorcount; 66 73 67 74 uint64_t dropped_upstream; … … 83 90 enum { 84 91 NDAG_CLIENT_HALT = 0x01, 85 NDAG_CLIENT_RESTARTED = 0x02, 92 NDAG_CLIENT_RESTARTED = 0x02, // redundant 86 93 NDAG_CLIENT_NEWGROUP = 0x03 87 94 }; … … 101 108 return (int) (seq_a - seq_b); 102 109 } 103 return (int) (0xffffffff - ((seq_b - seq_a) - 1)); 110 111 /* -1 for the wrap and another -1 because we don't use zero */ 112 return (int) (0xffffffff - ((seq_b - seq_a) - 2)); 104 113 } 105 114 … … 329 338 ptr ++; 330 339 } 331 } else if (msgtype == NDAG_PKT_RESTARTED) {332 /* If message is a restart, push that to all active message333 * queues. */334 ndag_internal_message_t alert;335 alert.type = NDAG_CLIENT_RESTARTED;336 alert.contents.monitor = ntohs(ndaghdr->monitorid);337 alert.contents.groupaddr = NULL;338 alert.contents.localiface = NULL;339 alert.contents.port = 0;340 for (i = 0; i < libtrace->perpkt_thread_count; i++) {341 libtrace_message_queue_put(342 &(FORMAT_DATA->receivers[i].mqueue),343 (void *)&alert);344 }345 340 } else { 346 341 fprintf(stderr, … … 446 441 FORMAT_DATA->receivers[i].sources = NULL; 447 442 FORMAT_DATA->receivers[i].sourcecount = 0; 443 FORMAT_DATA->receivers[i].knownmonitors = NULL; 444 FORMAT_DATA->receivers[i].monitorcount = 0; 448 445 FORMAT_DATA->receivers[i].threadindex = i; 449 446 FORMAT_DATA->receivers[i].dropped_upstream = 0; … … 495 492 close(src.sock); 496 493 } 494 if (receiver->knownmonitors) { 495 free(receiver->knownmonitors); 496 } 497 497 498 if (receiver->sources) { 498 499 free(receiver->sources); … … 612 613 } 613 614 615 static ndag_monitor_t *add_new_knownmonitor(recvstream_t *rt, uint16_t monid) { 616 617 ndag_monitor_t *mon; 618 619 if (rt->monitorcount == 0) { 620 rt->knownmonitors = (ndag_monitor_t *) 621 malloc(sizeof(ndag_monitor_t) * 5); 622 } else { 623 rt->knownmonitors = (ndag_monitor_t *) 624 realloc(rt->knownmonitors, 625 sizeof(ndag_monitor_t) * (rt->monitorcount * 5)); 626 } 627 628 mon = &(rt->knownmonitors[rt->monitorcount]); 629 mon->monitorid = monid; 630 mon->laststart = 0; 631 632 rt->monitorcount ++; 633 return mon; 634 } 635 614 636 static int add_new_streamsock(recvstream_t *rt, streamsource_t src) { 615 637 616 638 streamsock_t *ssock = NULL; 639 ndag_monitor_t *mon = NULL; 617 640 int i; 618 641 … … 638 661 } 639 662 663 for (i = 0; i < rt->monitorcount; i++) { 664 if (rt->knownmonitors[i].monitorid == src.monitor) { 665 mon = &(rt->knownmonitors[i]); 666 break; 667 } 668 } 669 670 if (mon == NULL) { 671 mon = add_new_knownmonitor(rt, src.monitor); 672 } 673 640 674 ssock->port = src.port; 641 675 ssock->groupaddr = src.groupaddr; 642 676 ssock->expectedseq = 0; 643 ssock->monitor id = src.monitor;677 ssock->monitorptr = mon; 644 678 ssock->saved = (char **)malloc(sizeof(char *) * ENCAP_BUFFERS); 645 679 ssock->startidle = 0; … … 673 707 } 674 708 break; 675 case NDAG_CLIENT_RESTARTED:676 /* TODO */677 678 break;679 709 case NDAG_CLIENT_HALT: 680 710 return 0; … … 702 732 } 703 733 734 static inline void reset_expected_seqs(recvstream_t *rt, ndag_monitor_t *mon) { 735 736 int i; 737 for (i = 0; i < rt->sourcecount; i++) { 738 if (rt->sources[i].monitorptr == mon) { 739 rt->sources[i].expectedseq = 0; 740 } 741 } 742 743 } 744 704 745 static int receive_from_sockets(recvstream_t *rt) { 705 746 … … 714 755 int nw; 715 756 ndag_encap_t *encaphdr; 757 ndag_monitor_t *mon; 716 758 717 759 if (rt->sources[i].sock == -1) { … … 787 829 sizeof(ndag_common_t)); 788 830 831 mon = rt->sources[i].monitorptr; 832 assert(mon); 833 834 if (mon->laststart == 0) { 835 mon->laststart = bswap_be_to_host64(encaphdr->started); 836 } else if (mon->laststart != bswap_be_to_host64(encaphdr->started)) { 837 mon->laststart = bswap_be_to_host64(encaphdr->started); 838 reset_expected_seqs(rt, mon); 839 840 /* TODO what is a good way to indicate this to clients? 841 * set the loss counter in the ERF header? a bit rude? 842 * use another bit in the ERF header? 843 * add a queryable flag to libtrace_packet_t? 844 */ 845 846 } 847 789 848 if (rt->sources[i].expectedseq != 0) { 790 849 rt->missing_records += seq_cmp( … … 793 852 } 794 853 rt->sources[i].expectedseq = ntohl(encaphdr->seqno) + 1; 854 if (rt->sources[i].expectedseq == 0) { 855 rt->sources[i].expectedseq ++; 856 } 795 857 796 858 if (rt->sources[i].nextread == NULL) { -
lib/format_ndag.h
re68325b r4bab977 39 39 /* Encapsulation header -- used by both ENCAPERF and ENCAPRT records */ 40 40 typedef struct ndag_encap { 41 uint64_t started; 41 42 uint32_t seqno; 42 43 uint16_t streamid;
Note: See TracChangeset
for help on using the changeset viewer.