Changeset d24b1df for lib


Ignore:
Timestamp:
11/19/18 11:26:13 (2 years ago)
Author:
Shane Alcock <salcock@…>
Branches:
develop
Children:
ffae0a5
Parents:
1e6d795
Message:

Tidy up source tracking in etsilive: input format

  • Attempt to re-use any "old" source entries that have been closed.
  • Fix misleading source count output when a new source connects.
File:
1 edited

Legend:

Unmodified
Added
Removed
  • lib/format_etsilive.c

    r2044185 rd24b1df  
    7272        etsisocket_t *sources;
    7373        uint16_t sourcecount;
     74        uint16_t sourcealloc;
     75        uint16_t activesources;
    7476        int threadindex;
    7577        wandder_etsispec_t *etsidec;
     
    269271
    270272                FORMAT_DATA->receivers[i].sources = NULL;
     273                FORMAT_DATA->receivers[i].sourcealloc = 0;
    271274                FORMAT_DATA->receivers[i].sourcecount = 0;
     275                FORMAT_DATA->receivers[i].activesources = 0;
    272276                FORMAT_DATA->receivers[i].threadindex = i;
    273277                FORMAT_DATA->receivers[i].etsidec =
     
    326330                        != LIBTRACE_MQ_FAILED) {
    327331                etsisocket_t *esock = NULL;
     332                int i;
    328333
    329334                if (et->sourcecount == 0) {
    330335                        et->sources = (etsisocket_t *)malloc(
    331336                                        sizeof(etsisocket_t) * 10);
    332                 } else if ((et->sourcecount % 10) == 0) {
     337                        et->sourcealloc = 10;
     338
     339                        for (i = 0; i < et->sourcealloc; i++) {
     340                                et->sources[i].sock = -1;
     341                                et->sources[i].srcaddr = NULL;
     342                        }
     343
     344                        esock = &(et->sources[0]);
     345                        et->sourcecount = 1;
     346                } else {
     347                        for (i = 0; i < et->sourcealloc; i++) {
     348                                if (et->sources[i].sock == -1) {
     349                                        esock = &(et->sources[i]);
     350                                        break;
     351                                }
     352                        }
     353                }
     354
     355                if (esock == NULL) {
    333356                        et->sources = (etsisocket_t *)realloc(et->sources,
    334                                 sizeof(etsisocket_t) * (et->sourcecount + 10));
    335                 }
    336 
    337                 esock = &(et->sources[et->sourcecount]);
     357                                sizeof(etsisocket_t) * (et->sourcealloc + 10));
     358
     359                        esock = &(et->sources[et->sourcealloc]);
     360                        et->sourcealloc += 10;
     361                        et->sourcecount += 1;
     362
     363                }
     364
    338365                esock->sock = msg.recvsock;
    339366                esock->srcaddr = msg.recvaddr;
     
    343370                esock->cached.length = 0;
    344371
    345                 et->sourcecount += 1;
     372                et->activesources += 1;
    346373
    347374                fprintf(stderr, "Thread %d is now handling %u sources.\n",
    348                                 et->threadindex, et->sourcecount);
     375                                et->threadindex, et->activesources);
    349376        }
    350377        return 1;
    351378}
    352379
    353 static void receive_from_single_socket(etsisocket_t *esock) {
     380static void receive_from_single_socket(etsisocket_t *esock, etsithread_t *et) {
    354381
    355382        int ret = 0;
     
    361388        ret = libtrace_scb_recv_sock(&(esock->recvbuffer), esock->sock,
    362389                        MSG_DONTWAIT);
    363         if (ret == -1) {
     390        if (ret < 0) {
    364391                if (errno == EAGAIN || errno == EWOULDBLOCK) {
    365392                        /* Would have blocked, nothing available */
     
    370397                close(esock->sock);
    371398                esock->sock = -1;
     399                et->activesources -= 1;
    372400        }
    373401
     
    376404                close(esock->sock);
    377405                esock->sock = -1;
     406                et->activesources -= 1;
    378407        }
    379408
     
    394423        }
    395424
    396         if (et->sourcecount == 0) {
     425        if (et->activesources == 0) {
    397426                return 1;
    398427        }
    399428
    400429        for (i = 0; i < et->sourcecount; i++) {
    401                 receive_from_single_socket(&(et->sources[i]));
     430                receive_from_single_socket(&(et->sources[i]), et);
    402431        }
    403432        return 1;
     
    407436static inline void inspect_next_packet(etsisocket_t *sock,
    408437                etsisocket_t **earliestsock, uint64_t *earliesttime,
    409                 wandder_etsispec_t *dec) {
     438                wandder_etsispec_t *dec, etsithread_t *et) {
    410439
    411440
     
    460489                        close(sock->sock);
    461490                        sock->sock = -1;
     491                        et->activesources -= 1;
    462492                        return;
    463493                }
     
    467497                        close(sock->sock);
    468498                        sock->sock = -1;
     499                        et->activesources -= 1;
    469500                        return;
    470501                }
     
    505536        for (i = 0; i < et->sourcecount; i++) {
    506537                inspect_next_packet(&(et->sources[i]), &esock, &earliest,
    507                         et->etsidec);
     538                        et->etsidec, et);
    508539        }
    509540        return esock;
Note: See TracChangeset for help on using the changeset viewer.