Ignore:
Timestamp:
06/04/14 02:28:58 (7 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
b13b939
Parents:
fac8c46
Message:

Adds a thread keepalive that sends a messages to the perpkt threads every second(still todo make this configurable)
Updated tracertstats to use this rather than the temporary result system which has been removed
Also fixes a handful of compile warnings

File:
1 edited

Legend:

Unmodified
Added
Removed
  • lib/trace_parallel.c

    rfac8c46 r82facc5  
    256256        message.code = MESSAGE_STARTED;
    257257        message.sender = t;
    258         message.additional = NULL;
    259258
    260259        // Let the per_packet function know we have started
     
    330329        // Let the per_packet function know we have stopped
    331330        message.code = MESSAGE_STOPPED;
    332         message.sender = message.additional = NULL;
     331        message.sender = NULL;
     332        message.additional.uint64 = 0;
    333333        (*trace->per_pkt)(trace, NULL, &message, t);
    334334
     
    340340        // Notify only after we've defiantly set the state to finished
    341341        message.code = MESSAGE_PERPKT_ENDED;
    342         message.additional = NULL;
     342        message.additional.uint64 = 0;
    343343        trace_send_message_to_reducer(trace, &message);
    344344
     
    451451        // Notify only after we've defiantly set the state to finished
    452452        message.code = MESSAGE_PERPKT_ENDED;
    453         message.additional = NULL;
     453        message.additional.uint64 = 0;
    454454        trace_send_message_to_reducer(trace, &message);
    455455
     
    854854                libtrace_message_t mesg = {0};
    855855                mesg.code = MESSAGE_FIRST_PACKET;
    856                 mesg.additional = NULL;
    857856                trace_send_message_to_reducer(libtrace, &mesg);
    858857                t->recorded_first = true;
     
    874873                *packet = libtrace->first_packets.packets[libtrace->first_packets.first].packet;
    875874                *tv = &libtrace->first_packets.packets[libtrace->first_packets.first].tv;
    876                 if (libtrace->first_packets.count == libtrace->perpkt_thread_count) {
     875                if (libtrace->first_packets.count == (size_t) libtrace->perpkt_thread_count) {
    877876                        ret = 1;
    878877                } else {
     
    908907}
    909908
     909/** Similar to delay_tracetime but send messages to all threads periodically */
     910static void* keepalive_entry(void *data) {
     911        struct timeval prev, next;
     912        libtrace_message_t message = {0};
     913        libtrace_t *trace = (libtrace_t *)data;
     914        int delay_usec = 1000000; // ! second hard coded !!
     915        uint64_t next_release;
     916        fprintf(stderr, "keepalive thread is starting\n");
     917        // TODO mark this thread as running against the libtrace object
     918        gettimeofday(&prev, NULL);
     919        message.code = MESSAGE_TICK;
     920        while (trace->state != STATE_FINSHED) {
     921                fd_set rfds;
     922                next_release = tv_to_usec(&prev) + delay_usec;
     923                gettimeofday(&next, NULL);
     924                if (next_release > tv_to_usec(&next)) {
     925                        next = usec_to_tv(next_release - tv_to_usec(&next));
     926                        // Wait for timeout or a message
     927                        FD_ZERO(&rfds);
     928                FD_SET(libtrace_message_queue_get_fd(&trace->keepalive_thread.messages), &rfds);
     929                        if (select(libtrace_message_queue_get_fd(&trace->keepalive_thread.messages)+1, &rfds, NULL, NULL, &next) == 1) {
     930                                libtrace_message_t msg;
     931                                libtrace_message_queue_get(&trace->keepalive_thread.messages, &msg);
     932                                assert(msg.code == MESSAGE_DO_STOP);
     933                                goto done;
     934                        }
     935                }
     936                prev = usec_to_tv(next_release);
     937                if (trace->state == STATE_RUNNING) {
     938                        message.additional.uint64 = tv_to_usec(&prev);
     939                        trace_send_message_to_perpkts(trace, &message);
     940                }
     941        }
     942done:
     943        fprintf(stderr, "keepalive thread is finishing\n");
     944        trace->keepalive_thread.state = THREAD_FINISHED;
     945        return NULL;
     946}
    910947
    911948/**
     
    10741111                assert (libtrace->perpkts_pausing != 0);
    10751112                fprintf(stderr, "Restarting trace\n");
    1076                 int err;
     1113                UNUSED int err;
    10771114                if (libtrace->perpkt_thread_count > 1 && trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
    10781115                        printf("This format has direct support for p's\n");
     
    10991136        libtrace->reducer = reducer;
    11001137        libtrace->perpkts_finishing = 0;
    1101         // libtrace->hasher = &rand_hash; /* Hasher now set via option */
    11021138
    11031139        assert(pthread_mutex_init(&libtrace->libtrace_lock, NULL) == 0);
     
    11971233                threads_started = trace_start_perpkt_threads(libtrace);
    11981234
     1235        libtrace->keepalive_thread.type = THREAD_KEEPALIVE;
     1236        libtrace->keepalive_thread.state = THREAD_RUNNING;
     1237        libtrace_message_queue_init(&libtrace->keepalive_thread.messages, sizeof(libtrace_message_t));
     1238        assert(pthread_create(&libtrace->keepalive_thread.tid, NULL, keepalive_entry, (void *) libtrace) == 0);
    11991239
    12001240        // Revert back - Allow signals again
     
    12481288                libtrace_message_t message = {0};
    12491289                message.code = MESSAGE_DO_PAUSE;
    1250                 message.additional = NULL;
    12511290                trace_send_message_to_thread(libtrace, &libtrace->hasher_thread, &message);
    12521291                // Wait for it to pause
     
    12651304                        libtrace_message_t message = {0};
    12661305                        message.code = MESSAGE_DO_PAUSE;
    1267                         message.additional = NULL;
    12681306                        trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message);
    12691307                        if(trace_has_dedicated_hasher(libtrace)) {
     
    13481386       
    13491387        message.code = MESSAGE_DO_STOP;
    1350         message.additional = NULL;
    13511388        trace_send_message_to_perpkts(libtrace, &message);
    13521389        if (trace_has_dedicated_hasher(libtrace))
     
    14051442                                        return 0;
    14061443                                case HASHER_BIDIRECTIONAL:
    1407                                         trace->hasher = &toeplitz_hash_packet;
     1444                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
    14081445                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
    14091446                                        toeplitz_init_config(trace->hasher_data, 1);
    14101447                                        return 0;
    14111448                                case HASHER_UNIDIRECTIONAL:
    1412                                         trace->hasher = &toeplitz_hash_packet;
     1449                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
    14131450                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
    14141451                                        toeplitz_init_config(trace->hasher_data, 0);
     
    14721509                // Cannot destroy vector yet, this happens with trace_destroy
    14731510        }
     1511        libtrace->state = STATE_FINSHED;
     1512        // Wait for the ticker thread
     1513       
     1514        if (libtrace->keepalive_thread.state != THREAD_FINISHED) {
     1515                libtrace_message_t msg = {0};
     1516                msg.code = MESSAGE_DO_STOP;
     1517                fprintf(stderr, "Waiting to join with the keepalive\n");
     1518                trace_send_message_to_thread(libtrace, &libtrace->keepalive_thread, &msg);
     1519                pthread_join(libtrace->keepalive_thread.tid, NULL);
     1520        }
     1521        fprintf(stderr, "Joined with with the keepalive\n");
     1522       
    14741523
    14751524        // Lets mark this as done for now
    14761525        libtrace->joined = true;
    1477 }
    1478 
    1479 // Don't use extra overhead = :( directly place in storage structure using
    1480 // post
    1481 DLLEXPORT libtrace_result_t *trace_create_result()
    1482 {
    1483         libtrace_result_t *result = malloc(sizeof(libtrace_result_t));
    1484         assert(result);
    1485         result->key = 0;
    1486         result->value = NULL;
    1487         // TODO automatically back with a free list!!
    1488         return result;
    14891526}
    14901527
     
    15171554        libtrace_message_t message = {0};
    15181555        message.code = MESSAGE_POST_REDUCE;
    1519         message.additional = NULL;
    15201556        message.sender = get_thread_descriptor(libtrace);
    15211557        return libtrace_message_queue_put(&libtrace->reducer_thread.messages, (void *) &message);
     
    16101646
    16111647/**
    1612  * Note: This function grabs a lock and expects trace_update_inprogress_result
    1613  * to be called to release the lock.
    1614  *
    1615  * Expected to be used in trace-time situations to allow a result to be pending
    1616  * a publish that can be taken by the reducer before publish if it wants to
    1617  * publish a result. Such as publish a result every second but a queue hasn't
    1618  * processed a packet (or is overloaded) and hasn't published yet.
    1619  *
    1620  * Currently this only supports a single temporary result,
    1621  * as such if a key is different to the current temporary result the existing
    1622  * one will be published and NULL returned.
    1623  */
    1624 DLLEXPORT void * trace_retrive_inprogress_result(libtrace_t *libtrace, uint64_t key)
    1625 {
    1626         int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
    1627         libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
    1628 
    1629         assert (pthread_spin_lock(&t->tmp_spinlock) == 0);
    1630         if (t->tmp_key != key) {
    1631                 if (t->tmp_data) {
    1632                         //printf("publising data key=%"PRIu64"\n", t->tmp_key);
    1633                         trace_publish_result(libtrace, t->tmp_key, t->tmp_data);
    1634                 }
    1635                 t->tmp_data = NULL;
    1636                 t->tmp_key = key;
    1637         }
    1638         return t->tmp_data;
    1639 }
    1640 
    1641 /**
    1642  * Updates a temporary result and releases the lock previously grabbed by trace_retrive_inprogress_result
    1643  */
    1644 DLLEXPORT void trace_update_inprogress_result(libtrace_t *libtrace, uint64_t key, void * value)
    1645 {
    1646         int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
    1647         libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
    1648         if (t->tmp_key != key) {
    1649                 if (t->tmp_data) {
    1650                         printf("BAD publihsing data key=%"PRIu64"\n", t->tmp_key);
    1651                         trace_publish_result(libtrace, t->tmp_key, t->tmp_data);
    1652                 }
    1653                 t->tmp_key = key;
    1654         }
    1655         t->tmp_data = value;
    1656         assert (pthread_spin_unlock(&t->tmp_spinlock) == 0);
    1657 }
    1658 
    1659 /**
    16601648 * Publish to the reduce queue, return
    16611649 */
     
    16671655        libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
    16681656        // Now put it into my table
    1669         static __thread int count = 0;
     1657        UNUSED static __thread int count = 0;
    16701658
    16711659
     
    17041692        libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
    17051693        // Now put it into my table
    1706         static __thread int count = 0;
     1694        UNUSED static __thread int count = 0;
    17071695
    17081696        res.is_packet = 1;
     
    17441732        else
    17451733                return 1;
    1746 }
    1747 
    1748 /* Retrieves all results with the key requested from the temporary result
    1749  * holding zone.
    1750  */
    1751 DLLEXPORT int trace_get_results_check_temp(libtrace_t *libtrace, libtrace_vector_t *results, uint64_t key)
    1752 {
    1753         int i;
    1754 
    1755         libtrace_vector_empty(results);
    1756         // Check all of the temp queues
    1757         for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
    1758                 libtrace_result_t r = {0,0};
    1759                 assert (pthread_spin_lock(&libtrace->perpkt_threads[i].tmp_spinlock) == 0);
    1760                 if (libtrace->perpkt_threads[i].tmp_key == key) {
    1761                         libtrace_result_set_key_value(&r, key, libtrace->perpkt_threads[i].tmp_data);
    1762                         libtrace->perpkt_threads[i].tmp_data = NULL;
    1763                         printf("Found in temp queue\n");
    1764                 }
    1765                 assert (pthread_spin_unlock(&libtrace->perpkt_threads[i].tmp_spinlock) == 0);
    1766                 if (libtrace_result_get_value(&r)) {
    1767                         // Got a result still in temporary
    1768                         printf("Publishing from temp queue\n");
    1769                         libtrace_vector_push_back(results, &r);
    1770                 } else {
    1771                         // This might be waiting on the actual queue
    1772                         libtrace_queue_t *v = &libtrace->perpkt_threads[i].deque;
    1773                         if (libtrace_deque_peek_front(v, (void *) &r) &&
    1774                                         libtrace_result_get_value(&r)) {
    1775                                 assert (libtrace_deque_pop_front(&libtrace->perpkt_threads[i].deque, (void *) &r) == 1);
    1776                                 printf("Found in real queue\n");
    1777                                 libtrace_vector_push_back(results, &r);
    1778                         } // else no data (probably means no packets)
    1779                         else {
    1780                                 printf("Result missing in real queue\n");
    1781                         }
    1782                 }
    1783         }
    1784         //printf("Loop done yo, that means we've got #%d results to print fool!\n", libtrace_vector_get_size(results));
    1785         return libtrace_vector_get_size(results);
    17861734}
    17871735
     
    19081856DLLEXPORT int trace_parallel_config(libtrace_t *libtrace, trace_parallel_option_t option, void *value)
    19091857{
    1910         int ret = -1;
     1858        UNUSED int ret = -1;
    19111859        switch (option) {
    19121860                case TRACE_OPTION_SET_HASHER:
Note: See TracChangeset for help on using the changeset viewer.