Changeset 5b4d121 for lib/trace_parallel.c
- Timestamp:
- 08/19/14 13:59:33 (7 years ago)
- Branches:
- 4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
- Children:
- f0e8bd6
- Parents:
- 957a72a
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
lib/trace_parallel.c
rf051c1b r5b4d121 101 101 102 102 103 #define VERBOSE_DEBBUGING 1104 105 106 103 static size_t trace_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packets[], size_t nb_packets); 107 104 … … 215 212 } 216 213 217 #if VERBOSE_DEBBUGING 218 fprintf(stderr, "Thread %d state changed from %d to %d\n", (int) t->tid,219 prev_state, t->state);220 #endif 214 if (trace->config.debug_state) 215 fprintf(stderr, "Thread %d state changed from %d to %d\n", (int) t->tid, 216 prev_state, t->state); 217 221 218 if (need_lock) 222 219 pthread_mutex_unlock(&trace->libtrace_lock); … … 240 237 prev_state = trace->state; 241 238 trace->state = new_state; 242 #if VERBOSE_DEBBUGING 243 fprintf(stderr, "Trace(%s) state changed from %s to %s\n", 244 trace->uridata, get_trace_state_name(prev_state), 245 get_trace_state_name(trace->state)); 246 #endif 239 240 if (trace->config.debug_state) 241 fprintf(stderr, "Trace(%s) state changed from %s to %s\n", 242 trace->uridata, get_trace_state_name(prev_state), 243 get_trace_state_name(trace->state)); 244 247 245 if (need_lock) 248 246 pthread_mutex_unlock(&trace->libtrace_lock); … … 342 340 res->value = (void *)dup; 343 341 trace_destroy_packet(oldpkt); 344 fprintf(stderr, "Made a packet safe!!\n");345 342 } 346 343 } … … 375 372 376 373 374 375 /** 376 * Dispatches packets to their correct place and applies any translations 377 * as needed 378 * @param trace 379 * @param t 380 * @param packet (in, out) this will be set to NULL if the user doesn't return the packet for reuse 381 * @return -1 if an error or EOF has occured and the trace should end otherwise 0 to continue as normal 382 */ 383 static inline int dispatch_packets(libtrace_t *trace, libtrace_thread_t *t, libtrace_packet_t **packets, 384 size_t nb_packets) { 385 libtrace_message_t message; 386 size_t i; 387 for (i = 0; i < nb_packets; ++i) { 388 if (packets[i]->error > 0) { 389 packets[i] = (*trace->per_pkt)(trace, packets[i], NULL, t); 390 } else if (packets[i]->error == READ_TICK) { 391 message.code = MESSAGE_TICK; 392 message.additional.uint64 = trace_packet_get_order(packets[i]); 393 message.sender = t; 394 (*trace->per_pkt)(trace, NULL, &message, t); 395 } else if (packets[i]->error != READ_MESSAGE) { 396 // An error this should be the last packet we read 397 size_t z; 398 // We could have an eof or error and a message such as pause 399 for (z = i ; z < nb_packets; ++z) { 400 fprintf(stderr, "i=%d nb_packet=%d err=%d\n", (int) z, (int) nb_packets, packets[z]->error); 401 assert (packets[z]->error <= 0); 402 } 403 return -1; 404 } 405 // -2 is a message its not worth checking now just finish this lot and we'll check 406 // when we loop next 407 } 408 return 0; 409 } 410 377 411 /** 378 412 * The is the entry point for our packet processing threads. … … 417 451 message.sender = t; 418 452 (*trace->per_pkt)(trace, NULL, &message, t); 419 // If a hasher thread is running empty input queues so we don't lo ose data453 // If a hasher thread is running empty input queues so we don't lose data 420 454 if (trace_has_dedicated_hasher(trace)) { 421 455 fprintf(stderr, "Trace is using a hasher thread emptying queues\n"); … … 423 457 while(!libtrace_ringbuffer_is_empty(&t->rbuffer)) { 424 458 ASSERT_RET(trace_pread_packet(trace, t, packets, 1), == 1); 425 if (packets[0]->error > 0) 426 packets[0] = (*trace->per_pkt)(trace, packets[0], NULL, t); 427 else if (packets[0]->error != -2) { 459 if (dispatch_packets(trace, t, packets, 1) == -1) { 428 460 // EOF or error, either way we'll stop 429 461 while (!libtrace_ringbuffer_is_empty(&t->rbuffer)) { … … 460 492 } 461 493 // Loop through the packets we just read 462 for (i = 0; i < nb_packets; ++i) { 463 464 if (packets[i]->error > 0) { 465 packets[i] = (*trace->per_pkt)(trace, packets[i], NULL, t); 466 } else if (packets[i]->error != -2) { 467 // An error this should be the last packet we read 468 size_t z; 469 // We could have an eof or error and a message such as pause 470 for (z = i ; z < nb_packets; ++z) { 471 fprintf(stderr, "i=%d nb_packet=%d err=%d\n", (int) z, (int) nb_packets, packets[z]->error); 472 assert (packets[z]->error < 1); 473 } 474 goto stop; 475 } 476 // -2 is a message its not worth checking now just finish this lot and we'll check 477 // when we loop next 478 } 494 if (dispatch_packets(trace, t, packets, nb_packets) == -1) 495 break; 479 496 } 480 497 … … 523 540 * core to process it. 524 541 */ 525 static void* hasher_ start(void *data) {542 static void* hasher_entry(void *data) { 526 543 libtrace_t *trace = (libtrace_t *)data; 527 544 libtrace_thread_t * t; … … 569 586 assert(trace->started == false); 570 587 assert(trace->state == STATE_FINSHED); 588 break; 571 589 default: 572 590 fprintf(stderr, "Hasher thread didn't expect message code=%d\n", message.code); … … 585 603 /* Blocking write to the correct queue - I'm the only writer */ 586 604 if (trace->perpkt_threads[thread].state != THREAD_FINISHED) { 605 uint64_t order = trace_packet_get_order(packet); 587 606 libtrace_ringbuffer_write(&trace->perpkt_threads[thread].rbuffer, packet); 607 if (trace->config.tick_count && order % trace->config.tick_count == 0) { 608 // Write ticks to everyone else 609 libtrace_packet_t * pkts[trace->perpkt_thread_count]; 610 memset(pkts, 0, sizeof(void *) * trace->perpkt_thread_count); 611 libtrace_ocache_alloc(&trace->packet_freelist, (void **) pkts, trace->perpkt_thread_count, trace->perpkt_thread_count); 612 for (i = 0; i < trace->perpkt_thread_count; i++) { 613 pkts[i]->error = READ_TICK; 614 trace_packet_set_order(pkts[i], order); 615 libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, pkts[i]); 616 } 617 } 588 618 pkt_skipped = 0; 589 619 } else { … … 705 735 { 706 736 size_t i = 0; 737 bool tick_hit = false; 707 738 708 739 nb_packets = fill_array_with_empty_packets(libtrace, packets, nb_packets); … … 712 743 for (i = 0; i < nb_packets; ++i) { 713 744 packets[i]->error = trace_read_packet(libtrace, packets[i]); 714 // Doing this inside the lock ensures the first packet is always715 // recorded first716 745 if (packets[i]->error <= 0) { 717 746 ++i; 718 747 break; 719 748 } 720 } 749 /* 750 if (libtrace->config.tick_count && trace_packet_get_order(packets[i]) % libtrace->config.tick_count == 0) { 751 tick_hit = true; 752 }*/ 753 } 754 // Doing this inside the lock ensures the first packet is always 755 // recorded first 721 756 if (packets[0]->error > 0) { 722 757 store_first_packet(libtrace, packets[0], t); 723 758 } 724 759 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0); 760 /* XXX TODO this needs to be inband with packets, or we don't bother in this case 761 if (tick_hit) { 762 libtrace_message_t tick; 763 tick.additional.uint64 = trace_packet_get_order(packets[i]); 764 tick.code = MESSAGE_TICK; 765 trace_send_message_to_perpkts(libtrace, &tick); 766 } */ 725 767 return i; 726 768 } … … 739 781 libtrace_ocache_free(&libtrace->packet_freelist, (void **) packets, 1, 1); 740 782 packets[0] = libtrace_ringbuffer_read(&t->rbuffer); 741 742 743 if (packets[0] == NULL) {744 libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) packets, 1, 1);745 packets[0]->error = -2;746 }747 783 748 784 if (packets[0]->error < 0) … … 756 792 break; 757 793 } 758 // Message wating 759 if (packets[i] == NULL) { 760 libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &packets[i], 1, 1); 761 packets[i]->error = -2; 762 ++i; 794 // These are typically urgent 795 if (packets[i]->error < 0) 763 796 break; 764 }765 797 } 766 798 767 799 return i; 768 /*if (*packet) {769 return (*packet)->error;770 } else {771 // This is how we do a notify, we send a message before hand to note that the trace is over etc.772 // And this will notify the perpkt thread to read that message, this is easiest773 // since cases like pause can also be dealt with this way without actually774 // having to be the end of the stream.775 fprintf(stderr, "Got a NULL packet from hasher therefore message waiting\n");776 return -2;777 }*/778 800 } 779 801 … … 1166 1188 1167 1189 while (!trace_finished(trace)) { 1168 1169 //while ( != LIBTRACE_MQ_FAILED) { } 1170 libtrace_message_queue_get(&t->messages, &message); 1171 1190 if (trace->config.reporter_polling) { 1191 if (libtrace_message_queue_try_get(&t->messages, &message) == LIBTRACE_MQ_FAILED) 1192 message.code = MESSAGE_POST_REPORTER; 1193 } else { 1194 libtrace_message_queue_get(&t->messages, &message); 1195 } 1172 1196 switch (message.code) { 1173 1197 // Check for results … … 1198 1222 (*trace->reporter)(trace, &result, NULL); 1199 1223 } 1224 libtrace_vector_destroy(&results); 1200 1225 1201 1226 // GOODBYE … … 1320 1345 packet->trace = libtrace; 1321 1346 ret=libtrace->format->pread_packet(libtrace, t, packet); 1322 if (ret ==(size_t)-1 || ret==(size_t)-2 || ret==0) {1347 if (ret <= 0) { 1323 1348 return ret; 1324 1349 } … … 1499 1524 if (libtrace->config.packet_thread_cache_size <= 0) 1500 1525 libtrace->config.packet_thread_cache_size = 20; 1501 if (libtrace->config.packet_ global_cache_size <= 0)1502 libtrace->config.packet_ global_cache_size = (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count;1503 1504 if (libtrace->config.packet_ global_cache_size <1526 if (libtrace->config.packet_cache_size <= 0) 1527 libtrace->config.packet_cache_size = (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count; 1528 1529 if (libtrace->config.packet_cache_size < 1505 1530 (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count) 1506 1531 fprintf(stderr, "WARNING deadlocks may occur and extra memory allocating buffer sizes (packet_freelist_size) mismatched\n"); … … 1523 1548 t->state = THREAD_RUNNING; 1524 1549 libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t)); 1525 ASSERT_RET(pthread_create(&t->tid, NULL, hasher_ start, (void *) libtrace), == 0);1550 ASSERT_RET(pthread_create(&t->tid, NULL, hasher_entry, (void *) libtrace), == 0); 1526 1551 snprintf(name, sizeof(name), "hasher-thread"); 1527 1552 pthread_setname_np(t->tid, name); … … 1534 1559 (void (*)(void *))trace_destroy_packet, 1535 1560 libtrace->config.packet_thread_cache_size, 1536 libtrace->config.packet_ global_cache_size * 4,1561 libtrace->config.packet_cache_size * 4, 1537 1562 libtrace->config.fixed_packet_count); 1538 1563 // Unused slidingwindow code … … 1562 1587 t->perpkt_num = i; 1563 1588 if (libtrace->hasher) 1564 libtrace_ringbuffer_init(&t->rbuffer, libtrace->config.hasher_queue_size, LIBTRACE_RINGBUFFER_POLLING); 1589 libtrace_ringbuffer_init(&t->rbuffer, libtrace->config.hasher_queue_size, 1590 libtrace->config.hasher_polling?LIBTRACE_RINGBUFFER_POLLING:0); 1565 1591 // Depending on the mode vector or deque might be chosen 1566 1592 libtrace_vector_init(&t->vector, sizeof(libtrace_result_t)); … … 1654 1680 // Special case handle the hasher thread case 1655 1681 if (trace_has_dedicated_hasher(libtrace)) { 1656 fprintf(stderr, "Hasher thread running we deal with this special!\n"); 1682 if (libtrace->config.debug_state) 1683 fprintf(stderr, "Hasher thread is running, asking it to pause ..."); 1657 1684 libtrace_message_t message = {0}; 1658 1685 message.code = MESSAGE_DO_PAUSE; … … 1664 1691 } 1665 1692 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0); 1666 } 1667 1668 fprintf(stderr, "Sending messages \n"); 1693 if (libtrace->config.debug_state) 1694 fprintf(stderr, " DONE\n"); 1695 } 1696 1697 if (libtrace->config.debug_state) 1698 fprintf(stderr, "Asking perpkt threads to pause ..."); 1669 1699 // Stop threads, skip this one if it's a perpkt 1670 1700 for (i = 0; i < libtrace->perpkt_thread_count; i++) { … … 1676 1706 // The hasher has stopped and other threads have messages waiting therefore 1677 1707 // If the queues are empty the other threads would have no data 1678 // So send some NULL packets to simply ask the threads to check there message queues1708 // So send some message packets to simply ask the threads to check 1679 1709 // We are the only writer since hasher has paused 1680 libtrace_ringbuffer_write(&libtrace->perpkt_threads[i].rbuffer, NULL); 1710 libtrace_packet_t *pkt; 1711 libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &pkt, 1, 1); 1712 pkt->error = READ_MESSAGE; 1713 libtrace_ringbuffer_write(&libtrace->perpkt_threads[i].rbuffer, pkt); 1681 1714 } 1682 1715 } else { … … 1685 1718 } 1686 1719 1720 if (t) { 1721 // A perpkt is doing the pausing, interesting, fake an extra thread paused 1722 // We rely on the user to *not* return before starting the trace again 1723 thread_change_state(libtrace, t, THREAD_PAUSED, true); 1724 } 1725 1726 // Wait for all threads to pause 1727 ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0); 1728 while(libtrace->perpkt_thread_states[THREAD_RUNNING]) { 1729 ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0); 1730 } 1731 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0); 1732 1733 if (libtrace->config.debug_state) 1734 fprintf(stderr, " DONE\n"); 1735 1687 1736 // Deal with the reporter 1688 1737 if (trace_has_dedicated_reporter(libtrace)) { 1689 fprintf(stderr, "Reporter thread running we deal with this special!\n"); 1738 if (libtrace->config.debug_state) 1739 fprintf(stderr, "Reporter thread is running, asking it to pause ..."); 1690 1740 libtrace_message_t message = {0}; 1691 1741 message.code = MESSAGE_DO_PAUSE; … … 1697 1747 } 1698 1748 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0); 1699 } 1700 1701 // Formats must support native message handling if a message is ready 1702 // Approach per Perry's suggestion is a non-blocking read 1703 // followed by a blocking read. XXX STRIP THIS OUT 1704 1705 if (t) { 1706 // A perpkt is doing the pausing, interesting, fake an extra thread paused 1707 // We rely on the user to *not* return before starting the trace again 1708 thread_change_state(libtrace, t, THREAD_PAUSED, true); 1709 } 1710 1711 fprintf(stderr, "Asking threads to pause\n"); 1712 1713 // Wait for all threads to pause 1714 ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0); 1715 while(libtrace->perpkt_thread_states[THREAD_RUNNING]) { 1716 ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0); 1717 } 1718 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0); 1719 1720 fprintf(stderr, "Threads have paused\n"); 1749 if (libtrace->config.debug_state) 1750 fprintf(stderr, " DONE\n"); 1751 } 1721 1752 1722 1753 if (trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) { … … 1866 1897 1867 1898 /* Now the hasher */ 1868 // XXX signal it to stop if it hasn't already we should never be in this situation!!1869 1899 if (trace_has_dedicated_hasher(libtrace)) { 1870 fprintf(stderr, "Waiting to join with the hasher\n");1871 1900 pthread_join(libtrace->hasher_thread.tid, NULL); 1872 fprintf(stderr, "Joined with the hasher\n");1873 1901 assert(libtrace->hasher_thread.state == THREAD_FINISHED); 1874 1902 } … … 1892 1920 1893 1921 if (trace_has_dedicated_reporter(libtrace)) { 1894 fprintf(stderr, "Waiting to join with the reporter\n");1895 1922 pthread_join(libtrace->reporter_thread.tid, NULL); 1896 fprintf(stderr, "Joined with the reporter\n");1897 1923 assert(libtrace->reporter_thread.state == THREAD_FINISHED); 1898 1924 } … … 1902 1928 libtrace_message_t msg = {0}; 1903 1929 msg.code = MESSAGE_DO_STOP; 1904 fprintf(stderr, "Waiting to join with the keepalive\n");1905 1930 trace_send_message_to_thread(libtrace, &libtrace->keepalive_thread, &msg); 1906 1931 pthread_join(libtrace->keepalive_thread.tid, NULL); 1907 fprintf(stderr, "Joined with with the keepalive\n");1908 1932 } 1909 1933 … … 2226 2250 libtrace->tracetime = 0; 2227 2251 return 0; 2252 case TRACE_OPTION_SET_CONFIG: 2253 libtrace->config = *((struct user_configuration *) value); 2254 case TRACE_OPTION_GET_CONFIG: 2255 *((struct user_configuration *) value) = libtrace->config; 2228 2256 } 2229 2257 return 0; 2258 } 2259 2260 static bool config_bool_parse(char *value, size_t nvalue) { 2261 if (strncmp(value, "true", nvalue) == 0) 2262 return true; 2263 else if (strncmp(value, "false", nvalue) == 0) 2264 return false; 2265 else 2266 return strtoll(value, NULL, 10) != 0; 2267 } 2268 2269 static void config_string(struct user_configuration *uc, char *key, size_t nkey, char *value, size_t nvalue) { 2270 assert(key); 2271 assert(value); 2272 assert(uc); 2273 if (strncmp(key, "packet_cache_size", nkey) == 0 2274 || strncmp(key, "pcs", nkey) == 0) { 2275 uc->packet_cache_size = strtoll(value, NULL, 10); 2276 } else if (strncmp(key, "packet_thread_cache_size", nkey) == 0 2277 || strncmp(key, "ptcs", nkey) == 0) { 2278 uc->packet_thread_cache_size = strtoll(value, NULL, 10); 2279 } else if (strncmp(key, "fixed_packet_count", nkey) == 0 2280 || strncmp(key, "fpc", nkey) == 0) { 2281 uc->fixed_packet_count = config_bool_parse(value, nvalue); 2282 } else if (strncmp(key, "burst_size", nkey) == 0 2283 || strncmp(key, "bs", nkey) == 0) { 2284 uc->burst_size = strtoll(value, NULL, 10); 2285 } else if (strncmp(key, "tick_interval", nkey) == 0 2286 || strncmp(key, "ti", nkey) == 0) { 2287 uc->tick_interval = strtoll(value, NULL, 10); 2288 } else if (strncmp(key, "tick_count", nkey) == 0 2289 || strncmp(key, "tc", nkey) == 0) { 2290 uc->tick_count = strtoll(value, NULL, 10); 2291 } else if (strncmp(key, "perpkt_threads", nkey) == 0 2292 || strncmp(key, "pt", nkey) == 0) { 2293 uc->perpkt_threads = strtoll(value, NULL, 10); 2294 } else if (strncmp(key, "hasher_queue_size", nkey) == 0 2295 || strncmp(key, "hqs", nkey) == 0) { 2296 uc->hasher_queue_size = strtoll(value, NULL, 10); 2297 } else if (strncmp(key, "hasher_polling", nkey) == 0 2298 || strncmp(key, "hp", nkey) == 0) { 2299 uc->hasher_polling = config_bool_parse(value, nvalue); 2300 } else if (strncmp(key, "reporter_polling", nkey) == 0 2301 || strncmp(key, "rp", nkey) == 0) { 2302 uc->reporter_polling = config_bool_parse(value, nvalue); 2303 } else if (strncmp(key, "reporter_thold", nkey) == 0 2304 || strncmp(key, "rt", nkey) == 0) { 2305 uc->reporter_thold = strtoll(value, NULL, 10); 2306 } else if (strncmp(key, "debug_state", nkey) == 0 2307 || strncmp(key, "ds", nkey) == 0) { 2308 uc->debug_state = config_bool_parse(value, nvalue); 2309 } else { 2310 fprintf(stderr, "No matching value %s(=%s)\n", key, value); 2311 } 2312 } 2313 2314 DLLEXPORT void parse_user_config(struct user_configuration* uc, char * str) { 2315 char *pch; 2316 char key[100]; 2317 char value[100]; 2318 assert(str); 2319 assert(uc); 2320 printf ("Splitting string \"%s\" into tokens:\n",str); 2321 pch = strtok (str," ,.-"); 2322 while (pch != NULL) 2323 { 2324 if (sscanf(pch, "%99[^=]=%99s", key, value) == 2) { 2325 config_string(uc, key, sizeof(key), value, sizeof(value)); 2326 } else { 2327 fprintf(stderr, "Error parsing %s\n", pch); 2328 } 2329 pch = strtok (NULL," ,.-"); 2330 } 2331 } 2332 2333 DLLEXPORT void parse_user_config_file(struct user_configuration* uc, FILE *file) { 2334 char line[1024]; 2335 while (fgets(line, sizeof(line), file) != NULL) 2336 { 2337 parse_user_config(uc, line); 2338 } 2230 2339 } 2231 2340
Note: See TracChangeset
for help on using the changeset viewer.