Changeset 82facc5 for lib/trace_parallel.c
- Timestamp:
- 06/04/14 02:28:58 (8 years ago)
- Branches:
- 4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
- Children:
- b13b939
- Parents:
- fac8c46
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
lib/trace_parallel.c
rfac8c46 r82facc5 256 256 message.code = MESSAGE_STARTED; 257 257 message.sender = t; 258 message.additional = NULL;259 258 260 259 // Let the per_packet function know we have started … … 330 329 // Let the per_packet function know we have stopped 331 330 message.code = MESSAGE_STOPPED; 332 message.sender = message.additional = NULL; 331 message.sender = NULL; 332 message.additional.uint64 = 0; 333 333 (*trace->per_pkt)(trace, NULL, &message, t); 334 334 … … 340 340 // Notify only after we've defiantly set the state to finished 341 341 message.code = MESSAGE_PERPKT_ENDED; 342 message.additional = NULL;342 message.additional.uint64 = 0; 343 343 trace_send_message_to_reducer(trace, &message); 344 344 … … 451 451 // Notify only after we've defiantly set the state to finished 452 452 message.code = MESSAGE_PERPKT_ENDED; 453 message.additional = NULL;453 message.additional.uint64 = 0; 454 454 trace_send_message_to_reducer(trace, &message); 455 455 … … 854 854 libtrace_message_t mesg = {0}; 855 855 mesg.code = MESSAGE_FIRST_PACKET; 856 mesg.additional = NULL;857 856 trace_send_message_to_reducer(libtrace, &mesg); 858 857 t->recorded_first = true; … … 874 873 *packet = libtrace->first_packets.packets[libtrace->first_packets.first].packet; 875 874 *tv = &libtrace->first_packets.packets[libtrace->first_packets.first].tv; 876 if (libtrace->first_packets.count == libtrace->perpkt_thread_count) {875 if (libtrace->first_packets.count == (size_t) libtrace->perpkt_thread_count) { 877 876 ret = 1; 878 877 } else { … … 908 907 } 909 908 909 /** Similar to delay_tracetime but send messages to all threads periodically */ 910 static void* keepalive_entry(void *data) { 911 struct timeval prev, next; 912 libtrace_message_t message = {0}; 913 libtrace_t *trace = (libtrace_t *)data; 914 int delay_usec = 1000000; // ! second hard coded !! 915 uint64_t next_release; 916 fprintf(stderr, "keepalive thread is starting\n"); 917 // TODO mark this thread as running against the libtrace object 918 gettimeofday(&prev, NULL); 919 message.code = MESSAGE_TICK; 920 while (trace->state != STATE_FINSHED) { 921 fd_set rfds; 922 next_release = tv_to_usec(&prev) + delay_usec; 923 gettimeofday(&next, NULL); 924 if (next_release > tv_to_usec(&next)) { 925 next = usec_to_tv(next_release - tv_to_usec(&next)); 926 // Wait for timeout or a message 927 FD_ZERO(&rfds); 928 FD_SET(libtrace_message_queue_get_fd(&trace->keepalive_thread.messages), &rfds); 929 if (select(libtrace_message_queue_get_fd(&trace->keepalive_thread.messages)+1, &rfds, NULL, NULL, &next) == 1) { 930 libtrace_message_t msg; 931 libtrace_message_queue_get(&trace->keepalive_thread.messages, &msg); 932 assert(msg.code == MESSAGE_DO_STOP); 933 goto done; 934 } 935 } 936 prev = usec_to_tv(next_release); 937 if (trace->state == STATE_RUNNING) { 938 message.additional.uint64 = tv_to_usec(&prev); 939 trace_send_message_to_perpkts(trace, &message); 940 } 941 } 942 done: 943 fprintf(stderr, "keepalive thread is finishing\n"); 944 trace->keepalive_thread.state = THREAD_FINISHED; 945 return NULL; 946 } 910 947 911 948 /** … … 1074 1111 assert (libtrace->perpkts_pausing != 0); 1075 1112 fprintf(stderr, "Restarting trace\n"); 1076 int err;1113 UNUSED int err; 1077 1114 if (libtrace->perpkt_thread_count > 1 && trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) { 1078 1115 printf("This format has direct support for p's\n"); … … 1099 1136 libtrace->reducer = reducer; 1100 1137 libtrace->perpkts_finishing = 0; 1101 // libtrace->hasher = &rand_hash; /* Hasher now set via option */1102 1138 1103 1139 assert(pthread_mutex_init(&libtrace->libtrace_lock, NULL) == 0); … … 1197 1233 threads_started = trace_start_perpkt_threads(libtrace); 1198 1234 1235 libtrace->keepalive_thread.type = THREAD_KEEPALIVE; 1236 libtrace->keepalive_thread.state = THREAD_RUNNING; 1237 libtrace_message_queue_init(&libtrace->keepalive_thread.messages, sizeof(libtrace_message_t)); 1238 assert(pthread_create(&libtrace->keepalive_thread.tid, NULL, keepalive_entry, (void *) libtrace) == 0); 1199 1239 1200 1240 // Revert back - Allow signals again … … 1248 1288 libtrace_message_t message = {0}; 1249 1289 message.code = MESSAGE_DO_PAUSE; 1250 message.additional = NULL;1251 1290 trace_send_message_to_thread(libtrace, &libtrace->hasher_thread, &message); 1252 1291 // Wait for it to pause … … 1265 1304 libtrace_message_t message = {0}; 1266 1305 message.code = MESSAGE_DO_PAUSE; 1267 message.additional = NULL;1268 1306 trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message); 1269 1307 if(trace_has_dedicated_hasher(libtrace)) { … … 1348 1386 1349 1387 message.code = MESSAGE_DO_STOP; 1350 message.additional = NULL;1351 1388 trace_send_message_to_perpkts(libtrace, &message); 1352 1389 if (trace_has_dedicated_hasher(libtrace)) … … 1405 1442 return 0; 1406 1443 case HASHER_BIDIRECTIONAL: 1407 trace->hasher = &toeplitz_hash_packet;1444 trace->hasher = (fn_hasher) toeplitz_hash_packet; 1408 1445 trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t)); 1409 1446 toeplitz_init_config(trace->hasher_data, 1); 1410 1447 return 0; 1411 1448 case HASHER_UNIDIRECTIONAL: 1412 trace->hasher = &toeplitz_hash_packet;1449 trace->hasher = (fn_hasher) toeplitz_hash_packet; 1413 1450 trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t)); 1414 1451 toeplitz_init_config(trace->hasher_data, 0); … … 1472 1509 // Cannot destroy vector yet, this happens with trace_destroy 1473 1510 } 1511 libtrace->state = STATE_FINSHED; 1512 // Wait for the ticker thread 1513 1514 if (libtrace->keepalive_thread.state != THREAD_FINISHED) { 1515 libtrace_message_t msg = {0}; 1516 msg.code = MESSAGE_DO_STOP; 1517 fprintf(stderr, "Waiting to join with the keepalive\n"); 1518 trace_send_message_to_thread(libtrace, &libtrace->keepalive_thread, &msg); 1519 pthread_join(libtrace->keepalive_thread.tid, NULL); 1520 } 1521 fprintf(stderr, "Joined with with the keepalive\n"); 1522 1474 1523 1475 1524 // Lets mark this as done for now 1476 1525 libtrace->joined = true; 1477 }1478 1479 // Don't use extra overhead = :( directly place in storage structure using1480 // post1481 DLLEXPORT libtrace_result_t *trace_create_result()1482 {1483 libtrace_result_t *result = malloc(sizeof(libtrace_result_t));1484 assert(result);1485 result->key = 0;1486 result->value = NULL;1487 // TODO automatically back with a free list!!1488 return result;1489 1526 } 1490 1527 … … 1517 1554 libtrace_message_t message = {0}; 1518 1555 message.code = MESSAGE_POST_REDUCE; 1519 message.additional = NULL;1520 1556 message.sender = get_thread_descriptor(libtrace); 1521 1557 return libtrace_message_queue_put(&libtrace->reducer_thread.messages, (void *) &message); … … 1610 1646 1611 1647 /** 1612 * Note: This function grabs a lock and expects trace_update_inprogress_result1613 * to be called to release the lock.1614 *1615 * Expected to be used in trace-time situations to allow a result to be pending1616 * a publish that can be taken by the reducer before publish if it wants to1617 * publish a result. Such as publish a result every second but a queue hasn't1618 * processed a packet (or is overloaded) and hasn't published yet.1619 *1620 * Currently this only supports a single temporary result,1621 * as such if a key is different to the current temporary result the existing1622 * one will be published and NULL returned.1623 */1624 DLLEXPORT void * trace_retrive_inprogress_result(libtrace_t *libtrace, uint64_t key)1625 {1626 int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?1627 libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];1628 1629 assert (pthread_spin_lock(&t->tmp_spinlock) == 0);1630 if (t->tmp_key != key) {1631 if (t->tmp_data) {1632 //printf("publising data key=%"PRIu64"\n", t->tmp_key);1633 trace_publish_result(libtrace, t->tmp_key, t->tmp_data);1634 }1635 t->tmp_data = NULL;1636 t->tmp_key = key;1637 }1638 return t->tmp_data;1639 }1640 1641 /**1642 * Updates a temporary result and releases the lock previously grabbed by trace_retrive_inprogress_result1643 */1644 DLLEXPORT void trace_update_inprogress_result(libtrace_t *libtrace, uint64_t key, void * value)1645 {1646 int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?1647 libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];1648 if (t->tmp_key != key) {1649 if (t->tmp_data) {1650 printf("BAD publihsing data key=%"PRIu64"\n", t->tmp_key);1651 trace_publish_result(libtrace, t->tmp_key, t->tmp_data);1652 }1653 t->tmp_key = key;1654 }1655 t->tmp_data = value;1656 assert (pthread_spin_unlock(&t->tmp_spinlock) == 0);1657 }1658 1659 /**1660 1648 * Publish to the reduce queue, return 1661 1649 */ … … 1667 1655 libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread]; 1668 1656 // Now put it into my table 1669 static __thread int count = 0;1657 UNUSED static __thread int count = 0; 1670 1658 1671 1659 … … 1704 1692 libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread]; 1705 1693 // Now put it into my table 1706 static __thread int count = 0;1694 UNUSED static __thread int count = 0; 1707 1695 1708 1696 res.is_packet = 1; … … 1744 1732 else 1745 1733 return 1; 1746 }1747 1748 /* Retrieves all results with the key requested from the temporary result1749 * holding zone.1750 */1751 DLLEXPORT int trace_get_results_check_temp(libtrace_t *libtrace, libtrace_vector_t *results, uint64_t key)1752 {1753 int i;1754 1755 libtrace_vector_empty(results);1756 // Check all of the temp queues1757 for (i = 0; i < libtrace->perpkt_thread_count; ++i) {1758 libtrace_result_t r = {0,0};1759 assert (pthread_spin_lock(&libtrace->perpkt_threads[i].tmp_spinlock) == 0);1760 if (libtrace->perpkt_threads[i].tmp_key == key) {1761 libtrace_result_set_key_value(&r, key, libtrace->perpkt_threads[i].tmp_data);1762 libtrace->perpkt_threads[i].tmp_data = NULL;1763 printf("Found in temp queue\n");1764 }1765 assert (pthread_spin_unlock(&libtrace->perpkt_threads[i].tmp_spinlock) == 0);1766 if (libtrace_result_get_value(&r)) {1767 // Got a result still in temporary1768 printf("Publishing from temp queue\n");1769 libtrace_vector_push_back(results, &r);1770 } else {1771 // This might be waiting on the actual queue1772 libtrace_queue_t *v = &libtrace->perpkt_threads[i].deque;1773 if (libtrace_deque_peek_front(v, (void *) &r) &&1774 libtrace_result_get_value(&r)) {1775 assert (libtrace_deque_pop_front(&libtrace->perpkt_threads[i].deque, (void *) &r) == 1);1776 printf("Found in real queue\n");1777 libtrace_vector_push_back(results, &r);1778 } // else no data (probably means no packets)1779 else {1780 printf("Result missing in real queue\n");1781 }1782 }1783 }1784 //printf("Loop done yo, that means we've got #%d results to print fool!\n", libtrace_vector_get_size(results));1785 return libtrace_vector_get_size(results);1786 1734 } 1787 1735 … … 1908 1856 DLLEXPORT int trace_parallel_config(libtrace_t *libtrace, trace_parallel_option_t option, void *value) 1909 1857 { 1910 int ret = -1;1858 UNUSED int ret = -1; 1911 1859 switch (option) { 1912 1860 case TRACE_OPTION_SET_HASHER:
Note: See TracChangeset
for help on using the changeset viewer.