- Timestamp:
- 11/19/18 11:26:13 (2 years ago)
- Branches:
- develop
- Children:
- ffae0a5
- Parents:
- 1e6d795
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
lib/format_etsilive.c
r2044185 rd24b1df 72 72 etsisocket_t *sources; 73 73 uint16_t sourcecount; 74 uint16_t sourcealloc; 75 uint16_t activesources; 74 76 int threadindex; 75 77 wandder_etsispec_t *etsidec; … … 269 271 270 272 FORMAT_DATA->receivers[i].sources = NULL; 273 FORMAT_DATA->receivers[i].sourcealloc = 0; 271 274 FORMAT_DATA->receivers[i].sourcecount = 0; 275 FORMAT_DATA->receivers[i].activesources = 0; 272 276 FORMAT_DATA->receivers[i].threadindex = i; 273 277 FORMAT_DATA->receivers[i].etsidec = … … 326 330 != LIBTRACE_MQ_FAILED) { 327 331 etsisocket_t *esock = NULL; 332 int i; 328 333 329 334 if (et->sourcecount == 0) { 330 335 et->sources = (etsisocket_t *)malloc( 331 336 sizeof(etsisocket_t) * 10); 332 } else if ((et->sourcecount % 10) == 0) { 337 et->sourcealloc = 10; 338 339 for (i = 0; i < et->sourcealloc; i++) { 340 et->sources[i].sock = -1; 341 et->sources[i].srcaddr = NULL; 342 } 343 344 esock = &(et->sources[0]); 345 et->sourcecount = 1; 346 } else { 347 for (i = 0; i < et->sourcealloc; i++) { 348 if (et->sources[i].sock == -1) { 349 esock = &(et->sources[i]); 350 break; 351 } 352 } 353 } 354 355 if (esock == NULL) { 333 356 et->sources = (etsisocket_t *)realloc(et->sources, 334 sizeof(etsisocket_t) * (et->sourcecount + 10)); 335 } 336 337 esock = &(et->sources[et->sourcecount]); 357 sizeof(etsisocket_t) * (et->sourcealloc + 10)); 358 359 esock = &(et->sources[et->sourcealloc]); 360 et->sourcealloc += 10; 361 et->sourcecount += 1; 362 363 } 364 338 365 esock->sock = msg.recvsock; 339 366 esock->srcaddr = msg.recvaddr; … … 343 370 esock->cached.length = 0; 344 371 345 et-> sourcecount+= 1;372 et->activesources += 1; 346 373 347 374 fprintf(stderr, "Thread %d is now handling %u sources.\n", 348 et->threadindex, et-> sourcecount);375 et->threadindex, et->activesources); 349 376 } 350 377 return 1; 351 378 } 352 379 353 static void receive_from_single_socket(etsisocket_t *esock ) {380 static void receive_from_single_socket(etsisocket_t *esock, etsithread_t *et) { 354 381 355 382 int ret = 0; … … 361 388 ret = libtrace_scb_recv_sock(&(esock->recvbuffer), esock->sock, 362 389 MSG_DONTWAIT); 363 if (ret == -1) {390 if (ret < 0) { 364 391 if (errno == EAGAIN || errno == EWOULDBLOCK) { 365 392 /* Would have blocked, nothing available */ … … 370 397 close(esock->sock); 371 398 esock->sock = -1; 399 et->activesources -= 1; 372 400 } 373 401 … … 376 404 close(esock->sock); 377 405 esock->sock = -1; 406 et->activesources -= 1; 378 407 } 379 408 … … 394 423 } 395 424 396 if (et-> sourcecount== 0) {425 if (et->activesources == 0) { 397 426 return 1; 398 427 } 399 428 400 429 for (i = 0; i < et->sourcecount; i++) { 401 receive_from_single_socket(&(et->sources[i]) );430 receive_from_single_socket(&(et->sources[i]), et); 402 431 } 403 432 return 1; … … 407 436 static inline void inspect_next_packet(etsisocket_t *sock, 408 437 etsisocket_t **earliestsock, uint64_t *earliesttime, 409 wandder_etsispec_t *dec ) {438 wandder_etsispec_t *dec, etsithread_t *et) { 410 439 411 440 … … 460 489 close(sock->sock); 461 490 sock->sock = -1; 491 et->activesources -= 1; 462 492 return; 463 493 } … … 467 497 close(sock->sock); 468 498 sock->sock = -1; 499 et->activesources -= 1; 469 500 return; 470 501 } … … 505 536 for (i = 0; i < et->sourcecount; i++) { 506 537 inspect_next_packet(&(et->sources[i]), &esock, &earliest, 507 et->etsidec );538 et->etsidec, et); 508 539 } 509 540 return esock;
Note: See TracChangeset
for help on using the changeset viewer.