Changeset 5b4d121
- Timestamp:
- 08/19/14 13:59:33 (7 years ago)
- Branches:
- 4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
- Children:
- f0e8bd6
- Parents:
- 957a72a
- Files:
-
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
lib/libtrace.h.in
rf051c1b r5b4d121 3288 3288 * or less this is ignored. 3289 3289 */ 3290 TRACE_OPTION_TICK_INTERVAL 3290 TRACE_OPTION_TICK_INTERVAL, 3291 TRACE_OPTION_GET_CONFIG, 3292 TRACE_OPTION_SET_CONFIG 3291 3293 } trace_parallel_option_t; 3292 3294 … … 3382 3384 * A unblockable error message will be printed. 3383 3385 */ 3384 size_t packet_global_cache_size; 3385 // Per thread 3386 size_t packet_cache_size; 3387 /** 3388 * Per thread local cache size for the packet freelist 3389 */ 3386 3390 size_t packet_thread_cache_size; 3387 // Packet count limited 3391 /** 3392 * If true the total number of packets that can be created by a trace is limited 3393 * to the packet_cache_size, otherwise once packet_cache_size is exceeded alloc 3394 * and free will be used to create and free packets, this will be slower than 3395 * using the freelist and could run a machine out of memory. 3396 * 3397 * However this does make it easier to ensure that deadlocks will not occur 3398 * due to running out of packets 3399 */ 3388 3400 bool fixed_packet_count; 3389 // Bursts/Batches of packets this size are combined, used in single thread mode 3401 /** 3402 * When reading from a single threaded input source to reduce 3403 * lock contention a 'burst' of packets is read per pkt thread 3404 * this determines the bursts size. 3405 */ 3390 3406 size_t burst_size; 3391 3407 // Each perpkt thread has a queue leading into the reporter 3392 3408 //size_t reporter_queue_size; 3393 /** The tick interval - in milliseconds (0) */ 3409 3410 /** 3411 * The tick interval - in milliseconds 3412 * When a live trace is used messages are sent at the tick 3413 * interval to ensure that all perpkt threads receive data 3414 * this allows results to be printed in cases flows are 3415 * not being directed to a certian thread, while still 3416 * maintaining order. 3417 */ 3394 3418 size_t tick_interval; 3395 // The tick interval for file based traces, in number of packets TODO implement this 3419 3420 /** 3421 * Like the tick interval but used in the case of file format 3422 * This specifies the number of packets before inserting a tick to 3423 * every thread. 3424 */ 3396 3425 size_t tick_count; 3397 3426 3398 // The number of per packet threads requested 3427 /** 3428 * The number of per packet threads requested, 0 means use default. 3429 * Default typically be the number of processor threads detected less one or two. 3430 */ 3399 3431 size_t perpkt_threads; 3400 3432 … … 3409 3441 size_t hasher_queue_size; 3410 3442 3411 // Reporter threashold before results are sent 3443 /** 3444 * If true use a polling hasher queue, that means that we will spin/or yeild 3445 * when rather than blocking on a lock. This applies to both the hasher thread 3446 * and perpkts reading the queues. 3447 */ 3448 bool hasher_polling; 3449 3450 /** 3451 * If true the reporter thread will continuously poll waiting for results 3452 * if false they are only checked when a message is received, this message 3453 * is controlled by reporter_thold. 3454 */ 3455 bool reporter_polling; 3456 3457 /** 3458 * Perpkt thread result queue size before triggering the reporter step to read results 3459 */ 3412 3460 size_t reporter_thold; 3461 3462 /** 3463 * Prints a line to standard error for every state change 3464 * for both the trace as a whole and for each thread. 3465 */ 3466 bool debug_state; 3413 3467 }; 3468 #include <stdio.h> 3469 DLLEXPORT void parse_user_config(struct user_configuration* uc, char * str); 3470 DLLEXPORT void parse_user_config_file(struct user_configuration* uc, FILE *file); 3471 3472 #define READ_EOF 0 3473 #define READ_ERROR -1 3474 #define READ_MESSAGE -2 3475 // Used for inband tick message 3476 #define READ_TICK -3 3414 3477 3415 3478 #define ZERO_USER_CONFIG(config) memset(&config, 0, sizeof(struct user_configuration)); -
lib/trace.c
rf051c1b r5b4d121 919 919 DLLEXPORT int trace_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet) { 920 920 assert(libtrace); 921 assert(packet); 921 assert(packet); 922 922 /* Verify the packet is valid */ 923 923 if (!libtrace->started) { -
lib/trace_parallel.c
rf051c1b r5b4d121 101 101 102 102 103 #define VERBOSE_DEBBUGING 1104 105 106 103 static size_t trace_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packets[], size_t nb_packets); 107 104 … … 215 212 } 216 213 217 #if VERBOSE_DEBBUGING 218 fprintf(stderr, "Thread %d state changed from %d to %d\n", (int) t->tid,219 prev_state, t->state);220 #endif 214 if (trace->config.debug_state) 215 fprintf(stderr, "Thread %d state changed from %d to %d\n", (int) t->tid, 216 prev_state, t->state); 217 221 218 if (need_lock) 222 219 pthread_mutex_unlock(&trace->libtrace_lock); … … 240 237 prev_state = trace->state; 241 238 trace->state = new_state; 242 #if VERBOSE_DEBBUGING 243 fprintf(stderr, "Trace(%s) state changed from %s to %s\n", 244 trace->uridata, get_trace_state_name(prev_state), 245 get_trace_state_name(trace->state)); 246 #endif 239 240 if (trace->config.debug_state) 241 fprintf(stderr, "Trace(%s) state changed from %s to %s\n", 242 trace->uridata, get_trace_state_name(prev_state), 243 get_trace_state_name(trace->state)); 244 247 245 if (need_lock) 248 246 pthread_mutex_unlock(&trace->libtrace_lock); … … 342 340 res->value = (void *)dup; 343 341 trace_destroy_packet(oldpkt); 344 fprintf(stderr, "Made a packet safe!!\n");345 342 } 346 343 } … … 375 372 376 373 374 375 /** 376 * Dispatches packets to their correct place and applies any translations 377 * as needed 378 * @param trace 379 * @param t 380 * @param packet (in, out) this will be set to NULL if the user doesn't return the packet for reuse 381 * @return -1 if an error or EOF has occured and the trace should end otherwise 0 to continue as normal 382 */ 383 static inline int dispatch_packets(libtrace_t *trace, libtrace_thread_t *t, libtrace_packet_t **packets, 384 size_t nb_packets) { 385 libtrace_message_t message; 386 size_t i; 387 for (i = 0; i < nb_packets; ++i) { 388 if (packets[i]->error > 0) { 389 packets[i] = (*trace->per_pkt)(trace, packets[i], NULL, t); 390 } else if (packets[i]->error == READ_TICK) { 391 message.code = MESSAGE_TICK; 392 message.additional.uint64 = trace_packet_get_order(packets[i]); 393 message.sender = t; 394 (*trace->per_pkt)(trace, NULL, &message, t); 395 } else if (packets[i]->error != READ_MESSAGE) { 396 // An error this should be the last packet we read 397 size_t z; 398 // We could have an eof or error and a message such as pause 399 for (z = i ; z < nb_packets; ++z) { 400 fprintf(stderr, "i=%d nb_packet=%d err=%d\n", (int) z, (int) nb_packets, packets[z]->error); 401 assert (packets[z]->error <= 0); 402 } 403 return -1; 404 } 405 // -2 is a message its not worth checking now just finish this lot and we'll check 406 // when we loop next 407 } 408 return 0; 409 } 410 377 411 /** 378 412 * The is the entry point for our packet processing threads. … … 417 451 message.sender = t; 418 452 (*trace->per_pkt)(trace, NULL, &message, t); 419 // If a hasher thread is running empty input queues so we don't lo ose data453 // If a hasher thread is running empty input queues so we don't lose data 420 454 if (trace_has_dedicated_hasher(trace)) { 421 455 fprintf(stderr, "Trace is using a hasher thread emptying queues\n"); … … 423 457 while(!libtrace_ringbuffer_is_empty(&t->rbuffer)) { 424 458 ASSERT_RET(trace_pread_packet(trace, t, packets, 1), == 1); 425 if (packets[0]->error > 0) 426 packets[0] = (*trace->per_pkt)(trace, packets[0], NULL, t); 427 else if (packets[0]->error != -2) { 459 if (dispatch_packets(trace, t, packets, 1) == -1) { 428 460 // EOF or error, either way we'll stop 429 461 while (!libtrace_ringbuffer_is_empty(&t->rbuffer)) { … … 460 492 } 461 493 // Loop through the packets we just read 462 for (i = 0; i < nb_packets; ++i) { 463 464 if (packets[i]->error > 0) { 465 packets[i] = (*trace->per_pkt)(trace, packets[i], NULL, t); 466 } else if (packets[i]->error != -2) { 467 // An error this should be the last packet we read 468 size_t z; 469 // We could have an eof or error and a message such as pause 470 for (z = i ; z < nb_packets; ++z) { 471 fprintf(stderr, "i=%d nb_packet=%d err=%d\n", (int) z, (int) nb_packets, packets[z]->error); 472 assert (packets[z]->error < 1); 473 } 474 goto stop; 475 } 476 // -2 is a message its not worth checking now just finish this lot and we'll check 477 // when we loop next 478 } 494 if (dispatch_packets(trace, t, packets, nb_packets) == -1) 495 break; 479 496 } 480 497 … … 523 540 * core to process it. 524 541 */ 525 static void* hasher_ start(void *data) {542 static void* hasher_entry(void *data) { 526 543 libtrace_t *trace = (libtrace_t *)data; 527 544 libtrace_thread_t * t; … … 569 586 assert(trace->started == false); 570 587 assert(trace->state == STATE_FINSHED); 588 break; 571 589 default: 572 590 fprintf(stderr, "Hasher thread didn't expect message code=%d\n", message.code); … … 585 603 /* Blocking write to the correct queue - I'm the only writer */ 586 604 if (trace->perpkt_threads[thread].state != THREAD_FINISHED) { 605 uint64_t order = trace_packet_get_order(packet); 587 606 libtrace_ringbuffer_write(&trace->perpkt_threads[thread].rbuffer, packet); 607 if (trace->config.tick_count && order % trace->config.tick_count == 0) { 608 // Write ticks to everyone else 609 libtrace_packet_t * pkts[trace->perpkt_thread_count]; 610 memset(pkts, 0, sizeof(void *) * trace->perpkt_thread_count); 611 libtrace_ocache_alloc(&trace->packet_freelist, (void **) pkts, trace->perpkt_thread_count, trace->perpkt_thread_count); 612 for (i = 0; i < trace->perpkt_thread_count; i++) { 613 pkts[i]->error = READ_TICK; 614 trace_packet_set_order(pkts[i], order); 615 libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, pkts[i]); 616 } 617 } 588 618 pkt_skipped = 0; 589 619 } else { … … 705 735 { 706 736 size_t i = 0; 737 bool tick_hit = false; 707 738 708 739 nb_packets = fill_array_with_empty_packets(libtrace, packets, nb_packets); … … 712 743 for (i = 0; i < nb_packets; ++i) { 713 744 packets[i]->error = trace_read_packet(libtrace, packets[i]); 714 // Doing this inside the lock ensures the first packet is always715 // recorded first716 745 if (packets[i]->error <= 0) { 717 746 ++i; 718 747 break; 719 748 } 720 } 749 /* 750 if (libtrace->config.tick_count && trace_packet_get_order(packets[i]) % libtrace->config.tick_count == 0) { 751 tick_hit = true; 752 }*/ 753 } 754 // Doing this inside the lock ensures the first packet is always 755 // recorded first 721 756 if (packets[0]->error > 0) { 722 757 store_first_packet(libtrace, packets[0], t); 723 758 } 724 759 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0); 760 /* XXX TODO this needs to be inband with packets, or we don't bother in this case 761 if (tick_hit) { 762 libtrace_message_t tick; 763 tick.additional.uint64 = trace_packet_get_order(packets[i]); 764 tick.code = MESSAGE_TICK; 765 trace_send_message_to_perpkts(libtrace, &tick); 766 } */ 725 767 return i; 726 768 } … … 739 781 libtrace_ocache_free(&libtrace->packet_freelist, (void **) packets, 1, 1); 740 782 packets[0] = libtrace_ringbuffer_read(&t->rbuffer); 741 742 743 if (packets[0] == NULL) {744 libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) packets, 1, 1);745 packets[0]->error = -2;746 }747 783 748 784 if (packets[0]->error < 0) … … 756 792 break; 757 793 } 758 // Message wating 759 if (packets[i] == NULL) { 760 libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &packets[i], 1, 1); 761 packets[i]->error = -2; 762 ++i; 794 // These are typically urgent 795 if (packets[i]->error < 0) 763 796 break; 764 }765 797 } 766 798 767 799 return i; 768 /*if (*packet) {769 return (*packet)->error;770 } else {771 // This is how we do a notify, we send a message before hand to note that the trace is over etc.772 // And this will notify the perpkt thread to read that message, this is easiest773 // since cases like pause can also be dealt with this way without actually774 // having to be the end of the stream.775 fprintf(stderr, "Got a NULL packet from hasher therefore message waiting\n");776 return -2;777 }*/778 800 } 779 801 … … 1166 1188 1167 1189 while (!trace_finished(trace)) { 1168 1169 //while ( != LIBTRACE_MQ_FAILED) { } 1170 libtrace_message_queue_get(&t->messages, &message); 1171 1190 if (trace->config.reporter_polling) { 1191 if (libtrace_message_queue_try_get(&t->messages, &message) == LIBTRACE_MQ_FAILED) 1192 message.code = MESSAGE_POST_REPORTER; 1193 } else { 1194 libtrace_message_queue_get(&t->messages, &message); 1195 } 1172 1196 switch (message.code) { 1173 1197 // Check for results … … 1198 1222 (*trace->reporter)(trace, &result, NULL); 1199 1223 } 1224 libtrace_vector_destroy(&results); 1200 1225 1201 1226 // GOODBYE … … 1320 1345 packet->trace = libtrace; 1321 1346 ret=libtrace->format->pread_packet(libtrace, t, packet); 1322 if (ret ==(size_t)-1 || ret==(size_t)-2 || ret==0) {1347 if (ret <= 0) { 1323 1348 return ret; 1324 1349 } … … 1499 1524 if (libtrace->config.packet_thread_cache_size <= 0) 1500 1525 libtrace->config.packet_thread_cache_size = 20; 1501 if (libtrace->config.packet_ global_cache_size <= 0)1502 libtrace->config.packet_ global_cache_size = (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count;1503 1504 if (libtrace->config.packet_ global_cache_size <1526 if (libtrace->config.packet_cache_size <= 0) 1527 libtrace->config.packet_cache_size = (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count; 1528 1529 if (libtrace->config.packet_cache_size < 1505 1530 (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count) 1506 1531 fprintf(stderr, "WARNING deadlocks may occur and extra memory allocating buffer sizes (packet_freelist_size) mismatched\n"); … … 1523 1548 t->state = THREAD_RUNNING; 1524 1549 libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t)); 1525 ASSERT_RET(pthread_create(&t->tid, NULL, hasher_ start, (void *) libtrace), == 0);1550 ASSERT_RET(pthread_create(&t->tid, NULL, hasher_entry, (void *) libtrace), == 0); 1526 1551 snprintf(name, sizeof(name), "hasher-thread"); 1527 1552 pthread_setname_np(t->tid, name); … … 1534 1559 (void (*)(void *))trace_destroy_packet, 1535 1560 libtrace->config.packet_thread_cache_size, 1536 libtrace->config.packet_ global_cache_size * 4,1561 libtrace->config.packet_cache_size * 4, 1537 1562 libtrace->config.fixed_packet_count); 1538 1563 // Unused slidingwindow code … … 1562 1587 t->perpkt_num = i; 1563 1588 if (libtrace->hasher) 1564 libtrace_ringbuffer_init(&t->rbuffer, libtrace->config.hasher_queue_size, LIBTRACE_RINGBUFFER_POLLING); 1589 libtrace_ringbuffer_init(&t->rbuffer, libtrace->config.hasher_queue_size, 1590 libtrace->config.hasher_polling?LIBTRACE_RINGBUFFER_POLLING:0); 1565 1591 // Depending on the mode vector or deque might be chosen 1566 1592 libtrace_vector_init(&t->vector, sizeof(libtrace_result_t)); … … 1654 1680 // Special case handle the hasher thread case 1655 1681 if (trace_has_dedicated_hasher(libtrace)) { 1656 fprintf(stderr, "Hasher thread running we deal with this special!\n"); 1682 if (libtrace->config.debug_state) 1683 fprintf(stderr, "Hasher thread is running, asking it to pause ..."); 1657 1684 libtrace_message_t message = {0}; 1658 1685 message.code = MESSAGE_DO_PAUSE; … … 1664 1691 } 1665 1692 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0); 1666 } 1667 1668 fprintf(stderr, "Sending messages \n"); 1693 if (libtrace->config.debug_state) 1694 fprintf(stderr, " DONE\n"); 1695 } 1696 1697 if (libtrace->config.debug_state) 1698 fprintf(stderr, "Asking perpkt threads to pause ..."); 1669 1699 // Stop threads, skip this one if it's a perpkt 1670 1700 for (i = 0; i < libtrace->perpkt_thread_count; i++) { … … 1676 1706 // The hasher has stopped and other threads have messages waiting therefore 1677 1707 // If the queues are empty the other threads would have no data 1678 // So send some NULL packets to simply ask the threads to check there message queues1708 // So send some message packets to simply ask the threads to check 1679 1709 // We are the only writer since hasher has paused 1680 libtrace_ringbuffer_write(&libtrace->perpkt_threads[i].rbuffer, NULL); 1710 libtrace_packet_t *pkt; 1711 libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &pkt, 1, 1); 1712 pkt->error = READ_MESSAGE; 1713 libtrace_ringbuffer_write(&libtrace->perpkt_threads[i].rbuffer, pkt); 1681 1714 } 1682 1715 } else { … … 1685 1718 } 1686 1719 1720 if (t) { 1721 // A perpkt is doing the pausing, interesting, fake an extra thread paused 1722 // We rely on the user to *not* return before starting the trace again 1723 thread_change_state(libtrace, t, THREAD_PAUSED, true); 1724 } 1725 1726 // Wait for all threads to pause 1727 ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0); 1728 while(libtrace->perpkt_thread_states[THREAD_RUNNING]) { 1729 ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0); 1730 } 1731 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0); 1732 1733 if (libtrace->config.debug_state) 1734 fprintf(stderr, " DONE\n"); 1735 1687 1736 // Deal with the reporter 1688 1737 if (trace_has_dedicated_reporter(libtrace)) { 1689 fprintf(stderr, "Reporter thread running we deal with this special!\n"); 1738 if (libtrace->config.debug_state) 1739 fprintf(stderr, "Reporter thread is running, asking it to pause ..."); 1690 1740 libtrace_message_t message = {0}; 1691 1741 message.code = MESSAGE_DO_PAUSE; … … 1697 1747 } 1698 1748 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0); 1699 } 1700 1701 // Formats must support native message handling if a message is ready 1702 // Approach per Perry's suggestion is a non-blocking read 1703 // followed by a blocking read. XXX STRIP THIS OUT 1704 1705 if (t) { 1706 // A perpkt is doing the pausing, interesting, fake an extra thread paused 1707 // We rely on the user to *not* return before starting the trace again 1708 thread_change_state(libtrace, t, THREAD_PAUSED, true); 1709 } 1710 1711 fprintf(stderr, "Asking threads to pause\n"); 1712 1713 // Wait for all threads to pause 1714 ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0); 1715 while(libtrace->perpkt_thread_states[THREAD_RUNNING]) { 1716 ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0); 1717 } 1718 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0); 1719 1720 fprintf(stderr, "Threads have paused\n"); 1749 if (libtrace->config.debug_state) 1750 fprintf(stderr, " DONE\n"); 1751 } 1721 1752 1722 1753 if (trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) { … … 1866 1897 1867 1898 /* Now the hasher */ 1868 // XXX signal it to stop if it hasn't already we should never be in this situation!!1869 1899 if (trace_has_dedicated_hasher(libtrace)) { 1870 fprintf(stderr, "Waiting to join with the hasher\n");1871 1900 pthread_join(libtrace->hasher_thread.tid, NULL); 1872 fprintf(stderr, "Joined with the hasher\n");1873 1901 assert(libtrace->hasher_thread.state == THREAD_FINISHED); 1874 1902 } … … 1892 1920 1893 1921 if (trace_has_dedicated_reporter(libtrace)) { 1894 fprintf(stderr, "Waiting to join with the reporter\n");1895 1922 pthread_join(libtrace->reporter_thread.tid, NULL); 1896 fprintf(stderr, "Joined with the reporter\n");1897 1923 assert(libtrace->reporter_thread.state == THREAD_FINISHED); 1898 1924 } … … 1902 1928 libtrace_message_t msg = {0}; 1903 1929 msg.code = MESSAGE_DO_STOP; 1904 fprintf(stderr, "Waiting to join with the keepalive\n");1905 1930 trace_send_message_to_thread(libtrace, &libtrace->keepalive_thread, &msg); 1906 1931 pthread_join(libtrace->keepalive_thread.tid, NULL); 1907 fprintf(stderr, "Joined with with the keepalive\n");1908 1932 } 1909 1933 … … 2226 2250 libtrace->tracetime = 0; 2227 2251 return 0; 2252 case TRACE_OPTION_SET_CONFIG: 2253 libtrace->config = *((struct user_configuration *) value); 2254 case TRACE_OPTION_GET_CONFIG: 2255 *((struct user_configuration *) value) = libtrace->config; 2228 2256 } 2229 2257 return 0; 2258 } 2259 2260 static bool config_bool_parse(char *value, size_t nvalue) { 2261 if (strncmp(value, "true", nvalue) == 0) 2262 return true; 2263 else if (strncmp(value, "false", nvalue) == 0) 2264 return false; 2265 else 2266 return strtoll(value, NULL, 10) != 0; 2267 } 2268 2269 static void config_string(struct user_configuration *uc, char *key, size_t nkey, char *value, size_t nvalue) { 2270 assert(key); 2271 assert(value); 2272 assert(uc); 2273 if (strncmp(key, "packet_cache_size", nkey) == 0 2274 || strncmp(key, "pcs", nkey) == 0) { 2275 uc->packet_cache_size = strtoll(value, NULL, 10); 2276 } else if (strncmp(key, "packet_thread_cache_size", nkey) == 0 2277 || strncmp(key, "ptcs", nkey) == 0) { 2278 uc->packet_thread_cache_size = strtoll(value, NULL, 10); 2279 } else if (strncmp(key, "fixed_packet_count", nkey) == 0 2280 || strncmp(key, "fpc", nkey) == 0) { 2281 uc->fixed_packet_count = config_bool_parse(value, nvalue); 2282 } else if (strncmp(key, "burst_size", nkey) == 0 2283 || strncmp(key, "bs", nkey) == 0) { 2284 uc->burst_size = strtoll(value, NULL, 10); 2285 } else if (strncmp(key, "tick_interval", nkey) == 0 2286 || strncmp(key, "ti", nkey) == 0) { 2287 uc->tick_interval = strtoll(value, NULL, 10); 2288 } else if (strncmp(key, "tick_count", nkey) == 0 2289 || strncmp(key, "tc", nkey) == 0) { 2290 uc->tick_count = strtoll(value, NULL, 10); 2291 } else if (strncmp(key, "perpkt_threads", nkey) == 0 2292 || strncmp(key, "pt", nkey) == 0) { 2293 uc->perpkt_threads = strtoll(value, NULL, 10); 2294 } else if (strncmp(key, "hasher_queue_size", nkey) == 0 2295 || strncmp(key, "hqs", nkey) == 0) { 2296 uc->hasher_queue_size = strtoll(value, NULL, 10); 2297 } else if (strncmp(key, "hasher_polling", nkey) == 0 2298 || strncmp(key, "hp", nkey) == 0) { 2299 uc->hasher_polling = config_bool_parse(value, nvalue); 2300 } else if (strncmp(key, "reporter_polling", nkey) == 0 2301 || strncmp(key, "rp", nkey) == 0) { 2302 uc->reporter_polling = config_bool_parse(value, nvalue); 2303 } else if (strncmp(key, "reporter_thold", nkey) == 0 2304 || strncmp(key, "rt", nkey) == 0) { 2305 uc->reporter_thold = strtoll(value, NULL, 10); 2306 } else if (strncmp(key, "debug_state", nkey) == 0 2307 || strncmp(key, "ds", nkey) == 0) { 2308 uc->debug_state = config_bool_parse(value, nvalue); 2309 } else { 2310 fprintf(stderr, "No matching value %s(=%s)\n", key, value); 2311 } 2312 } 2313 2314 DLLEXPORT void parse_user_config(struct user_configuration* uc, char * str) { 2315 char *pch; 2316 char key[100]; 2317 char value[100]; 2318 assert(str); 2319 assert(uc); 2320 printf ("Splitting string \"%s\" into tokens:\n",str); 2321 pch = strtok (str," ,.-"); 2322 while (pch != NULL) 2323 { 2324 if (sscanf(pch, "%99[^=]=%99s", key, value) == 2) { 2325 config_string(uc, key, sizeof(key), value, sizeof(value)); 2326 } else { 2327 fprintf(stderr, "Error parsing %s\n", pch); 2328 } 2329 pch = strtok (NULL," ,.-"); 2330 } 2331 } 2332 2333 DLLEXPORT void parse_user_config_file(struct user_configuration* uc, FILE *file) { 2334 char line[1024]; 2335 while (fgets(line, sizeof(line), file) != NULL) 2336 { 2337 parse_user_config(uc, line); 2338 } 2230 2339 } 2231 2340 -
tools/traceanon/traceanon_parallel.c
rf051c1b r5b4d121 198 198 case MESSAGE_STARTING: 199 199 enc_init(enc_type,key); 200 break; 201 case MESSAGE_TICK: 202 trace_publish_result(trace, t, mesg->additional.uint64, NULL, RESULT_TICK); 200 203 } 201 204 } … … 207 210 static void* write_out(libtrace_t *trace, libtrace_result_t *result, UNUSED libtrace_message_t *mesg) { 208 211 static uint64_t packet_count = 0; // TESTING PURPOSES, this is not going to work with a live format 212 209 213 if (result) { 210 libtrace_packet_t *packet = (libtrace_packet_t*) libtrace_result_get_value(result); 211 assert(libtrace_result_get_key(result) == packet_count++); 212 if (trace_write_packet(writer,packet)==-1) { 213 trace_perror_output(writer,"writer"); 214 trace_interrupt(); 215 } 216 trace_free_result_packet(trace, packet); 214 if (result->type == RESULT_PACKET) { 215 libtrace_packet_t *packet = (libtrace_packet_t*) libtrace_result_get_value(result); 216 assert(libtrace_result_get_key(result) == packet_count++); 217 if (trace_write_packet(writer,packet)==-1) { 218 trace_perror_output(writer,"writer"); 219 trace_interrupt(); 220 } 221 trace_free_result_packet(trace, packet); 222 223 } else { 224 assert(result->type == RESULT_TICK); 225 // Ignore it 226 } 217 227 } 218 228 return NULL; … … 228 238 char *compress_type_str=NULL; 229 239 trace_option_compresstype_t compress_type = TRACE_OPTION_COMPRESSTYPE_NONE; 230 240 struct user_configuration uc; 241 ZERO_USER_CONFIG(uc); 231 242 232 243 if (argc<2) … … 244 255 { "compress-type", 1, 0, 'Z' }, 245 256 { "libtrace-help", 0, 0, 'H' }, 257 { "config", 1, 0, 'u' }, 258 { "config-file", 1, 0, 'U' }, 246 259 { NULL, 0, 0, 0 }, 247 260 }; 248 261 249 int c=getopt_long(argc, argv, "Z:z:sc:f:dp:H ",262 int c=getopt_long(argc, argv, "Z:z:sc:f:dp:Hu:U:", 250 263 long_options, &option_index); 251 264 … … 297 310 exit(1); 298 311 break; 312 case 'u': 313 parse_user_config(&uc, optarg); 314 break; 315 case 'U':; 316 FILE * f = fopen(optarg, "r"); 317 if (f != NULL) { 318 parse_user_config_file(&uc, f); 319 } else { 320 perror("Failed to open configuration file\n"); 321 usage(argv[0]); 322 } 323 break; 299 324 default: 300 325 fprintf(stderr,"unknown option: %c\n",c); … … 391 416 392 417 int i = 1; 393 trace_parallel_config(trace, TRACE_OPTION_ SEQUENTIAL, &i);418 trace_parallel_config(trace, TRACE_OPTION_ORDERED, &i); 394 419 //trace_parallel_config(trace, TRACE_OPTION_SET_PERPKT_BUFFER_SIZE, &i); 395 i = 2;420 i = 6; 396 421 trace_parallel_config(trace, TRACE_OPTION_SET_PERPKT_THREAD_COUNT, &i); 422 trace_parallel_config(trace, TRACE_OPTION_SET_CONFIG, &uc); 423 trace_set_hasher(trace, HASHER_CUSTOM, bad_hash, NULL); 397 424 398 425 if (trace_pstart(trace, NULL, &per_packet, &write_out)==-1) {
Note: See TracChangeset
for help on using the changeset viewer.