Changeset 4bab977


Ignore:
Timestamp:
11/10/17 14:37:19 (4 years ago)
Author:
Shane Alcock <salcock@…>
Branches:
cachetimestamps, develop, dpdk-ndag, etsilive, master, ndag_format, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance
Children:
d39cd1e
Parents:
0317e3c
Message:

Modifications in response to changes in the nDAG protocol.

  • Updated protocol to account for new laststart field in encap. header. Use this field to recognise when the monitor has been restarted.
  • When determining sequence number gap, don't count 0 as a valid sequence number -- seqno 0 is reserved in the nDAG protocol and will never be lost.
Location:
lib
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • lib/format_ndag.c

    r05b65ae r4bab977  
    3434volatile int ndag_paused = 0;
    3535
     36typedef struct monitor {
     37        uint16_t monitorid;
     38        uint64_t laststart;
     39} ndag_monitor_t;
     40
     41
    3642typedef struct streamsource {
    3743        uint16_t monitor;
     
    4753        uint16_t port;
    4854        uint32_t expectedseq;
    49         uint16_t monitorid;
    50 
     55        ndag_monitor_t *monitorptr;
    5156        char **saved;
    5257        char *nextread;
     
    6469        libtrace_message_queue_t mqueue;
    6570        int threadindex;
     71        ndag_monitor_t *knownmonitors;
     72        uint16_t monitorcount;
    6673
    6774        uint64_t dropped_upstream;
     
    8390enum {
    8491        NDAG_CLIENT_HALT = 0x01,
    85         NDAG_CLIENT_RESTARTED = 0x02,
     92        NDAG_CLIENT_RESTARTED = 0x02,   // redundant
    8693        NDAG_CLIENT_NEWGROUP = 0x03
    8794};
     
    101108                return (int) (seq_a - seq_b);
    102109        }
    103         return (int) (0xffffffff - ((seq_b - seq_a) - 1));
     110
     111        /* -1 for the wrap and another -1 because we don't use zero */
     112        return (int) (0xffffffff - ((seq_b - seq_a) - 2));
    104113}
    105114
     
    329338                        ptr ++;
    330339                }
    331         } else if (msgtype == NDAG_PKT_RESTARTED) {
    332                 /* If message is a restart, push that to all active message
    333                  * queues. */
    334                 ndag_internal_message_t alert;
    335                 alert.type = NDAG_CLIENT_RESTARTED;
    336                 alert.contents.monitor = ntohs(ndaghdr->monitorid);
    337                 alert.contents.groupaddr = NULL;
    338                 alert.contents.localiface = NULL;
    339                 alert.contents.port = 0;
    340                 for (i = 0; i < libtrace->perpkt_thread_count; i++) {
    341                         libtrace_message_queue_put(
    342                                         &(FORMAT_DATA->receivers[i].mqueue),
    343                                         (void *)&alert);
    344                 }
    345340        } else {
    346341                fprintf(stderr,
     
    446441                FORMAT_DATA->receivers[i].sources = NULL;
    447442                FORMAT_DATA->receivers[i].sourcecount = 0;
     443                FORMAT_DATA->receivers[i].knownmonitors = NULL;
     444                FORMAT_DATA->receivers[i].monitorcount = 0;
    448445                FORMAT_DATA->receivers[i].threadindex = i;
    449446                FORMAT_DATA->receivers[i].dropped_upstream = 0;
     
    495492                close(src.sock);
    496493        }
     494        if (receiver->knownmonitors) {
     495                free(receiver->knownmonitors);
     496        }
     497
    497498        if (receiver->sources) {
    498499                free(receiver->sources);
     
    612613}
    613614
     615static ndag_monitor_t *add_new_knownmonitor(recvstream_t *rt, uint16_t monid) {
     616
     617        ndag_monitor_t *mon;
     618
     619        if (rt->monitorcount == 0) {
     620                rt->knownmonitors = (ndag_monitor_t *)
     621                                malloc(sizeof(ndag_monitor_t) * 5);
     622        } else {
     623                rt->knownmonitors = (ndag_monitor_t *)
     624                            realloc(rt->knownmonitors,
     625                            sizeof(ndag_monitor_t) * (rt->monitorcount * 5));
     626        }
     627
     628        mon = &(rt->knownmonitors[rt->monitorcount]);
     629        mon->monitorid = monid;
     630        mon->laststart = 0;
     631
     632        rt->monitorcount ++;
     633        return mon;
     634}
     635
    614636static int add_new_streamsock(recvstream_t *rt, streamsource_t src) {
    615637
    616638        streamsock_t *ssock = NULL;
     639        ndag_monitor_t *mon = NULL;
    617640        int i;
    618641
     
    638661        }
    639662
     663        for (i = 0; i < rt->monitorcount; i++) {
     664                if (rt->knownmonitors[i].monitorid == src.monitor) {
     665                        mon = &(rt->knownmonitors[i]);
     666                        break;
     667                }
     668        }
     669
     670        if (mon == NULL) {
     671                mon = add_new_knownmonitor(rt, src.monitor);
     672        }
     673
    640674        ssock->port = src.port;
    641675        ssock->groupaddr = src.groupaddr;
    642676        ssock->expectedseq = 0;
    643         ssock->monitorid = src.monitor;
     677        ssock->monitorptr = mon;
    644678        ssock->saved = (char **)malloc(sizeof(char *) * ENCAP_BUFFERS);
    645679        ssock->startidle = 0;
     
    673707                                }
    674708                                break;
    675                         case NDAG_CLIENT_RESTARTED:
    676                                 /* TODO */
    677 
    678                                 break;
    679709                        case NDAG_CLIENT_HALT:
    680710                                return 0;
     
    702732}
    703733
     734static inline void reset_expected_seqs(recvstream_t *rt, ndag_monitor_t *mon) {
     735
     736        int i;
     737        for (i = 0; i < rt->sourcecount; i++) {
     738                if (rt->sources[i].monitorptr == mon) {
     739                        rt->sources[i].expectedseq = 0;
     740                }
     741        }
     742
     743}
     744
    704745static int receive_from_sockets(recvstream_t *rt) {
    705746
     
    714755                int nw;
    715756                ndag_encap_t *encaphdr;
     757                ndag_monitor_t *mon;
    716758
    717759                if (rt->sources[i].sock == -1) {
     
    787829                                sizeof(ndag_common_t));
    788830
     831                mon = rt->sources[i].monitorptr;
     832                assert(mon);
     833
     834                if (mon->laststart == 0) {
     835                        mon->laststart = bswap_be_to_host64(encaphdr->started);
     836                } else if (mon->laststart != bswap_be_to_host64(encaphdr->started)) {
     837                        mon->laststart = bswap_be_to_host64(encaphdr->started);
     838                        reset_expected_seqs(rt, mon);
     839
     840                        /* TODO what is a good way to indicate this to clients?
     841                         * set the loss counter in the ERF header? a bit rude?
     842                         * use another bit in the ERF header?
     843                         * add a queryable flag to libtrace_packet_t?
     844                         */
     845
     846                }
     847
    789848                if (rt->sources[i].expectedseq != 0) {
    790849                        rt->missing_records += seq_cmp(
     
    793852                }
    794853                rt->sources[i].expectedseq = ntohl(encaphdr->seqno) + 1;
     854                if (rt->sources[i].expectedseq == 0) {
     855                        rt->sources[i].expectedseq ++;
     856                }
    795857
    796858                if (rt->sources[i].nextread == NULL) {
  • lib/format_ndag.h

    re68325b r4bab977  
    3939/* Encapsulation header -- used by both ENCAPERF and ENCAPRT records */
    4040typedef struct ndag_encap {
     41        uint64_t started;
    4142        uint32_t seqno;
    4243        uint16_t streamid;
Note: See TracChangeset for help on using the changeset viewer.