Changeset 5ef19d8


Ignore:
Timestamp:
05/18/18 11:39:59 (2 years ago)
Author:
Shane Alcock <salcock@…>
Branches:
cachetimestamps, develop, etsilive, master, rc-4.0.4, ringdecrementfix, ringperformance
Children:
25c305d
Parents:
b94478f
git-author:
Shane Alcock <salcock@…> (03/06/18 17:36:45)
git-committer:
Shane Alcock <salcock@…> (05/18/18 11:39:59)
Message:

Add ability to receive and respond to ETSI LI keep alives.

Also tidied up a few little potential sources of bugs, like
not checking if an fd is open before closing it or not setting
an fd to -1 once it has been closed etc.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • lib/format_etsilive.c

    r00365c6 r5ef19d8  
    9090} newsend_message_t;
    9191
     92static int send_etsili_keepalive_response(int fd, int64_t seqno);
    9293
    9394static void *etsi_listener(void *tdata) {
     
    292293                etsisocket_t src = receiver->sources[i];
    293294                libtrace_scb_destroy(&(src.recvbuffer));
    294                 close(src.sock);
     295                if (src.srcaddr) {
     296                        free(src.srcaddr);
     297                }
     298                if (src.sock != -1) {
     299                        close(src.sock);
     300                }
    295301        }
    296302        wandder_free_etsili_decoder(receiver->etsidec);
     
    393399}
    394400
     401static inline void inspect_next_packet(etsisocket_t *sock,
     402                etsisocket_t **earliestsock, uint64_t *earliesttime,
     403                wandder_etsispec_t *dec) {
     404
     405
     406        struct timeval tv;
     407        uint32_t available;
     408        uint8_t *ptr = NULL;
     409        uint32_t reclen = 0;
     410        uint64_t current;
     411
     412        if (sock->sock == -1) {
     413                return;
     414        }
     415        /* Have we already successfully decoded this? Cool,
     416         * just use whatever we cached last time.
     417         */
     418        if (sock->cached.timestamp != 0) {
     419                current = sock->cached.timestamp;
     420
     421                if (*earliesttime == 0 || *earliesttime > current) {
     422                        *earliesttime = current;
     423                        *earliestsock = sock;
     424                }
     425                return;
     426        }
     427
     428        ptr = libtrace_scb_get_read(&(sock->recvbuffer), &available);
     429
     430        if (available == 0 || ptr == NULL) {
     431                return;
     432        }
     433
     434        wandder_attach_etsili_buffer(dec, ptr, available, false);
     435        if (sock->cached.length != 0) {
     436                reclen = sock->cached.length;
     437        } else {
     438                reclen = wandder_etsili_get_pdu_length(dec);
     439
     440                if (reclen == 0) {
     441                        return;
     442                }
     443        }
     444
     445        if (available < reclen) {
     446                /* Don't have the whole PDU yet */
     447                return;
     448        }
     449
     450        if (wandder_etsili_is_keepalive(dec)) {
     451                int64_t kaseq = wandder_etsili_get_sequence_number(dec);
     452                if (kaseq < 0) {
     453                        fprintf(stderr, "bogus sequence number in ETSILI keep alive.\n");
     454                        close(sock->sock);
     455                        sock->sock = -1;
     456                        return;
     457                }
     458                /* Send keep alive response */
     459                if (send_etsili_keepalive_response(sock->sock, kaseq) < 0) {
     460                        fprintf(stderr, "error sending response to ETSILI keep alive: %s.\n", strerror(errno));
     461                        close(sock->sock);
     462                        sock->sock = -1;
     463                        return;
     464                }
     465                /* Skip past KA */
     466                libtrace_scb_advance_read(&(sock->recvbuffer), reclen);
     467                return;
     468        }
     469
     470        /* Get the timestamp */
     471
     472        tv = wandder_etsili_get_header_timestamp(dec);
     473        if (tv.tv_sec == 0) {
     474                return;
     475        }
     476        current = ((((uint64_t)tv.tv_sec) << 32) +
     477                        (((uint64_t)tv.tv_usec << 32)/1000000));
     478
     479        /* Success, cache everything we used so we don't have to
     480         * decode this packet again.
     481         */
     482        sock->cached.timestamp = current;
     483        sock->cached.length = reclen;
     484
     485
     486        /* Don't forget to update earliest and esock... */
     487        if (current < *earliesttime || *earliesttime == 0) {
     488                *earliestsock = sock;
     489                *earliesttime = current;
     490        }
     491}
     492
    395493static etsisocket_t *select_next_packet(etsithread_t *et, libtrace_t *libtrace) {
    396494
     
    398496        etsisocket_t *esock = NULL;
    399497        uint64_t earliest = 0;
    400         uint64_t current;
    401         struct timeval tv;
    402         uint32_t available;
    403         uint8_t *ptr = NULL;
    404         wandder_decoder_t dec;
    405         uint32_t reclen = 0;
    406498
    407499        for (i = 0; i < et->sourcecount; i++) {
    408                 if (et->sources[i].sock == -1) {
    409                         continue;
    410                 }
    411                 /* Have we already successfully decoded this? Cool,
    412                  * just use whatever we cached last time.
    413                  */
    414                 if (et->sources[i].cached.timestamp != 0) {
    415                         current = et->sources[i].cached.timestamp;
    416 
    417                         if (earliest == 0 || earliest > current) {
    418                                 earliest = current;
    419                                 esock = &(et->sources[i]);
    420                         }
    421                         continue;
    422                 }
    423 
    424                 ptr = libtrace_scb_get_read(&(et->sources[i].recvbuffer),
    425                                 &available);
    426 
    427                 if (available == 0 || ptr == NULL) {
    428                         continue;
    429                 }
    430 
    431                 wandder_attach_etsili_buffer(et->etsidec, ptr, available, false);
    432                 if (et->sources[i].cached.length != 0) {
    433                         reclen = et->sources[i].cached.length;
    434                 } else {
    435                         reclen = wandder_etsili_get_pdu_length(et->etsidec);
    436 
    437                         if (reclen == 0) {
    438                                 continue;
    439                         }
    440                 }
    441 
    442                 if (available < reclen) {
    443                         /* Don't have the whole PDU yet */
    444                         continue;
    445                 }
    446 
    447                 /* Get the timestamp */
    448 
    449                 tv = wandder_etsili_get_header_timestamp(et->etsidec);
    450                 if (tv.tv_sec == 0) {
    451                         continue;
    452                 }
    453                 current = ((((uint64_t)tv.tv_sec) << 32) +
    454                                 (((uint64_t)tv.tv_usec << 32)/1000000));
    455 
    456                 /* Success, cache everything we used so we don't have to
    457                  * decode this packet again.
    458                  */
    459                 et->sources[i].cached.timestamp = current;
    460                 et->sources[i].cached.length = reclen;
    461 
    462 
    463                 /* Don't forget to update earliest and esock... */
    464                 if (current < earliest || earliest == 0) {
    465                         esock = &(et->sources[i]);
    466                         earliest = current;
    467                 }
    468 
     500                inspect_next_packet(&(et->sources[i]), &esock, &earliest,
     501                        et->etsidec);
    469502        }
    470503        return esock;
     
    604637        etsilive_init_input,            /* init_input */
    605638        NULL,                           /* config_input */
    606         etsilive_start_input,           /* staetsilive_input */
     639        etsilive_start_input,           /* start_input */
    607640        etsilive_pause_input,           /* pause */
    608641        NULL,                           /* init_output */
    609642        NULL,                           /* config_output */
    610         NULL,                           /* staetsilive_output */
     643        NULL,                           /* start_output */
    611644        etsilive_fin_input,             /* fin_input */
    612645        NULL,                           /* fin_output */
     
    644677        register_format(&etsilive);
    645678}
     679
     680
     681#define ENC_USEQUENCE(enc) wandder_encode_next(enc, WANDDER_TAG_SEQUENCE, \
     682        WANDDER_CLASS_UNIVERSAL_CONSTRUCT, WANDDER_TAG_SEQUENCE, NULL, 0)
     683
     684#define ENC_CSEQUENCE(enc, x) wandder_encode_next(enc, WANDDER_TAG_SEQUENCE, \
     685        WANDDER_CLASS_CONTEXT_CONSTRUCT, x, NULL, 0)
     686
     687#define LT_ETSI_LIID "none"
     688#define LT_ETSI_NA "NA"
     689#define LT_ETSI_OPERATOR "libtrace"
     690
     691static int send_etsili_keepalive_response(int fd, int64_t seqno) {
     692
     693        wandder_encoder_t *encoder;
     694        uint8_t *tosend;
     695        uint32_t tosendlen;
     696        int ret = 0;
     697        uint64_t zero = 0;
     698        struct timeval tv;
     699
     700        encoder = init_wandder_encoder();
     701
     702        ENC_USEQUENCE(encoder);             // starts outermost sequence
     703
     704        ENC_CSEQUENCE(encoder, 1);
     705        wandder_encode_next(encoder, WANDDER_TAG_OID,
     706                        WANDDER_CLASS_CONTEXT_PRIMITIVE, 0,
     707                        WANDDER_ETSILI_PSDOMAINID,
     708                        sizeof(WANDDER_ETSILI_PSDOMAINID));
     709        wandder_encode_next(encoder, WANDDER_TAG_OCTETSTRING,
     710                        WANDDER_CLASS_CONTEXT_PRIMITIVE, 1, LT_ETSI_LIID,
     711                        strlen(LT_ETSI_LIID));
     712        wandder_encode_next(encoder, WANDDER_TAG_PRINTABLE,
     713                        WANDDER_CLASS_CONTEXT_PRIMITIVE, 2, LT_ETSI_NA,
     714                        strlen(LT_ETSI_NA));
     715
     716        ENC_CSEQUENCE(encoder, 3);
     717
     718        ENC_CSEQUENCE(encoder, 0);
     719        wandder_encode_next(encoder, WANDDER_TAG_OCTETSTRING,
     720                        WANDDER_CLASS_CONTEXT_PRIMITIVE, 0, LT_ETSI_OPERATOR,
     721                        strlen(LT_ETSI_OPERATOR));
     722
     723        wandder_encode_next(encoder, WANDDER_TAG_OCTETSTRING,
     724                        WANDDER_CLASS_CONTEXT_PRIMITIVE, 1, LT_ETSI_OPERATOR,
     725                        strlen(LT_ETSI_OPERATOR));
     726        wandder_encode_endseq(encoder);
     727
     728        wandder_encode_next(encoder, WANDDER_TAG_INTEGER,
     729                        WANDDER_CLASS_CONTEXT_PRIMITIVE, 1, &(zero),
     730                        sizeof(zero));
     731        wandder_encode_next(encoder, WANDDER_TAG_PRINTABLE,
     732                        WANDDER_CLASS_CONTEXT_PRIMITIVE, 2, LT_ETSI_NA,
     733                        strlen(LT_ETSI_NA));
     734        wandder_encode_endseq(encoder);
     735
     736        wandder_encode_next(encoder, WANDDER_TAG_INTEGER,
     737                        WANDDER_CLASS_CONTEXT_PRIMITIVE, 4, &(seqno),
     738                        sizeof(seqno));
     739
     740        gettimeofday(&tv, NULL);
     741        wandder_encode_next(encoder, WANDDER_TAG_GENERALTIME,
     742                        WANDDER_CLASS_CONTEXT_PRIMITIVE, 5, &tv,
     743                        sizeof(struct timeval));
     744
     745        wandder_encode_endseq(encoder);
     746
     747        ENC_CSEQUENCE(encoder, 2);          // Payload
     748        ENC_CSEQUENCE(encoder, 2);          // TRIPayload
     749        wandder_encode_next(encoder, WANDDER_TAG_NULL,
     750                        WANDDER_CLASS_CONTEXT_PRIMITIVE, 4, NULL, 0);
     751        wandder_encode_endseq(encoder);     // End TRIPayload
     752        wandder_encode_endseq(encoder);     // End Payload
     753        wandder_encode_endseq(encoder);     // End Outermost Sequence
     754
     755        tosend = wandder_encode_finish(encoder, &tosendlen);
     756
     757        if (tosend != NULL) {
     758                /* Will block, but hopefully we shouldn't be doing much
     759                 * sending.
     760                 */
     761                ret = send(fd, tosend, tosendlen, 0);
     762        }
     763
     764        free_wandder_encoder(encoder);
     765        return ret;
     766}
     767
     768
Note: See TracChangeset for help on using the changeset viewer.