Changeset 82facc5


Ignore:
Timestamp:
06/04/14 02:28:58 (6 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
b13b939
Parents:
fac8c46
Message:

Adds a thread keepalive that sends a messages to the perpkt threads every second(still todo make this configurable)
Updated tracertstats to use this rather than the temporary result system which has been removed
Also fixes a handful of compile warnings

Files:
7 edited

Legend:

Unmodified
Added
Removed
  • lib/data-struct/message_queue.h

    rd6a56b6 r82facc5  
    1414} libtrace_message_queue_t;
    1515
    16 typedef struct libtrace_message_t {
    17         int code;
    18         void *additional;
    19         libtrace_thread_t *sender;
    20 } libtrace_message_t;
    21 
    2216inline void libtrace_message_queue_init(libtrace_message_queue_t *mq, size_t message_len);
    2317inline int libtrace_message_queue_put(libtrace_message_queue_t *mq, const void *message);
  • lib/format_linux.c

    rfac8c46 r82facc5  
    415415{       
    416416        init_input(libtrace);
    417         FORMAT(libtrace->format_data)->format = TRACE_FORMAT_LINUX_RING;
     417        FORMAT(libtrace->format_data)->format = TRACE_RT_DATA_LINUX_RING;
    418418        return 0;
    419419}
     
    421421{
    422422        init_input(libtrace);
    423         FORMAT(libtrace->format_data)->format = TRACE_FORMAT_LINUX_NATIVE;
     423        FORMAT(libtrace->format_data)->format = TRACE_RT_DATA_LINUX_NATIVE;
    424424        return 0;
    425425}
     
    707707        for (i = 0; i < tot; ++i)
    708708        {
    709                 if (FORMAT(libtrace->format_data)->format == TRACE_FORMAT_LINUX_NATIVE) {
     709                if (FORMAT(libtrace->format_data)->format == TRACE_RT_DATA_LINUX_NATIVE) {
    710710                        if (linuxnative_start_input(libtrace) != 0) {
    711711                                iserror = 1;
     
    730730                }
    731731                per_thread[i].fd = FORMAT(libtrace->format_data)->fd;
    732                 if (FORMAT(libtrace->format_data)->format == TRACE_FORMAT_LINUX_RING) {
     732                if (FORMAT(libtrace->format_data)->format == TRACE_RT_DATA_LINUX_RING) {
    733733                        per_thread[i].rxring_offset = FORMAT(libtrace->format_data)->rxring_offset;
    734734                        per_thread[i].rx_ring = FORMAT(libtrace->format_data)->rx_ring;
  • lib/libtrace.h.in

    rfac8c46 r82facc5  
    31143114typedef void* (*fn_per_pkt)(libtrace_t* trace, libtrace_packet_t *p, libtrace_message_t *m, libtrace_thread_t *thread);
    31153115typedef void* (*fn_reducer)(libtrace_t* trace, void* global_blob);
    3116 typedef uint64_t (*fn_hasher)(libtrace_packet_t* packet, void *data);
     3116typedef uint64_t (*fn_hasher)(const libtrace_packet_t* packet, void *data);
    31173117
    31183118DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob, fn_per_pkt per_pkt, fn_reducer reducer);
     
    31223122DLLEXPORT void trace_join(libtrace_t * trace);
    31233123DLLEXPORT void print_contention_stats (libtrace_t *libtrace);
    3124 DLLEXPORT libtrace_result_t *trace_create_result();
    31253124
    31263125DLLEXPORT void libtrace_result_set_key(libtrace_result_t * result, uint64_t key);
     
    31373136DLLEXPORT void * trace_get_tls(libtrace_thread_t *t);
    31383137
     3138
    31393139DLLEXPORT void trace_publish_result(libtrace_t *libtrace, uint64_t key, void * value);
    31403140DLLEXPORT void trace_publish_packet(libtrace_t *libtrace, libtrace_packet_t *packet);
     
    31423142DLLEXPORT int trace_get_results(libtrace_t *libtrace, libtrace_vector_t * results);
    31433143
    3144 DLLEXPORT libtrace_result_t *trace_create_result();
     3144DLLEXPORT int trace_post_reduce(libtrace_t *libtrace);
    31453145DLLEXPORT int libtrace_thread_get_message_count(libtrace_t * libtrace);
    31463146DLLEXPORT int libtrace_thread_get_message(libtrace_t * libtrace, libtrace_message_t * message);
     
    31483148DLLEXPORT int trace_send_message_to_reducer(libtrace_t * libtrace, libtrace_message_t * message);
    31493149DLLEXPORT int trace_send_message_to_perpkts(libtrace_t * libtrace, libtrace_message_t * message);
     3150DLLEXPORT int trace_send_message_to_thread(libtrace_t * libtrace, libtrace_thread_t *t, libtrace_message_t * message);
    31503151DLLEXPORT int trace_finished(libtrace_t * libtrace);
    31513152DLLEXPORT uint64_t trace_packet_get_order(libtrace_packet_t * packet);
     
    31533154DLLEXPORT void trace_packet_set_order(libtrace_packet_t * packet, uint64_t order);
    31543155DLLEXPORT void trace_packet_set_hash(libtrace_packet_t * packet, uint64_t hash);
     3156DLLEXPORT inline uint64_t tv_to_usec(struct timeval *tv);
    31553157
    31563158DLLEXPORT int retrive_first_packet(libtrace_t *libtrace, libtrace_packet_t **packet, struct timeval **tv);
     3159
     3160union libtrace_64byte_things {
     3161        void *ptr;
     3162        int64_t sint64;
     3163        uint64_t uint64;
     3164        uint32_t uint32s[2];
     3165        int32_t sint32s[2];
     3166        uint32_t uint32;
     3167        int32_t sint32;
     3168        int sint;
     3169        unsigned int uint;
     3170        char schars[8];
     3171        char uchars[8];
     3172};
     3173
     3174typedef struct libtrace_message_t {
     3175        int code;
     3176        union libtrace_64byte_things additional;
     3177        libtrace_thread_t *sender;
     3178} libtrace_message_t;
    31573179
    31583180typedef enum {
     
    32323254        MESSAGE_POST_REDUCE,
    32333255        MESSAGE_POST_RANGE,
     3256        MESSAGE_TICK,
    32343257        MESSAGE_USER
    32353258};
  • lib/libtrace_int.h

    rfac8c46 r82facc5  
    176176        THREAD_HASHER,
    177177        THREAD_PERPKT,
    178         THREAD_REDUCER
     178        THREAD_REDUCER,
     179        THREAD_KEEPALIVE
    179180};
    180181
     
    336337        libtrace_thread_t hasher_thread;
    337338        libtrace_thread_t reducer_thread;
     339        libtrace_thread_t keepalive_thread;
    338340        int perpkt_thread_count;
    339341        libtrace_thread_t * perpkt_threads; // All our perpkt threads
     
    345347};
    346348
     349void trace_fin_packet(libtrace_packet_t *packet);
    347350inline void libtrace_zero_thread(libtrace_thread_t * t);
    348351inline void store_first_packet(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t);
  • lib/trace.c

    rfac8c46 r82facc5  
    279279        libtrace_zero_thread(&libtrace->hasher_thread);
    280280        libtrace_zero_thread(&libtrace->reducer_thread);
     281        libtrace_zero_thread(&libtrace->keepalive_thread);
    281282        libtrace_zero_slidingwindow(&libtrace->sliding_window);
    282283        libtrace->reducer_thread.type = THREAD_EMPTY;
     
    399400        libtrace_zero_thread(&libtrace->hasher_thread);
    400401        libtrace_zero_thread(&libtrace->reducer_thread);
     402        libtrace_zero_thread(&libtrace->keepalive_thread);
    401403        libtrace_zero_slidingwindow(&libtrace->sliding_window);
    402404        libtrace->reducer_thread.type = THREAD_EMPTY;
     
    770772 * use trace_destroy_packet() for those diabolical purposes.
    771773 */
    772 void trace_fin_packet(libtrace_packet_t *packet);
    773774void trace_fin_packet(libtrace_packet_t *packet) {
    774775        if (packet)
  • lib/trace_parallel.c

    rfac8c46 r82facc5  
    256256        message.code = MESSAGE_STARTED;
    257257        message.sender = t;
    258         message.additional = NULL;
    259258
    260259        // Let the per_packet function know we have started
     
    330329        // Let the per_packet function know we have stopped
    331330        message.code = MESSAGE_STOPPED;
    332         message.sender = message.additional = NULL;
     331        message.sender = NULL;
     332        message.additional.uint64 = 0;
    333333        (*trace->per_pkt)(trace, NULL, &message, t);
    334334
     
    340340        // Notify only after we've defiantly set the state to finished
    341341        message.code = MESSAGE_PERPKT_ENDED;
    342         message.additional = NULL;
     342        message.additional.uint64 = 0;
    343343        trace_send_message_to_reducer(trace, &message);
    344344
     
    451451        // Notify only after we've defiantly set the state to finished
    452452        message.code = MESSAGE_PERPKT_ENDED;
    453         message.additional = NULL;
     453        message.additional.uint64 = 0;
    454454        trace_send_message_to_reducer(trace, &message);
    455455
     
    854854                libtrace_message_t mesg = {0};
    855855                mesg.code = MESSAGE_FIRST_PACKET;
    856                 mesg.additional = NULL;
    857856                trace_send_message_to_reducer(libtrace, &mesg);
    858857                t->recorded_first = true;
     
    874873                *packet = libtrace->first_packets.packets[libtrace->first_packets.first].packet;
    875874                *tv = &libtrace->first_packets.packets[libtrace->first_packets.first].tv;
    876                 if (libtrace->first_packets.count == libtrace->perpkt_thread_count) {
     875                if (libtrace->first_packets.count == (size_t) libtrace->perpkt_thread_count) {
    877876                        ret = 1;
    878877                } else {
     
    908907}
    909908
     909/** Similar to delay_tracetime but send messages to all threads periodically */
     910static void* keepalive_entry(void *data) {
     911        struct timeval prev, next;
     912        libtrace_message_t message = {0};
     913        libtrace_t *trace = (libtrace_t *)data;
     914        int delay_usec = 1000000; // ! second hard coded !!
     915        uint64_t next_release;
     916        fprintf(stderr, "keepalive thread is starting\n");
     917        // TODO mark this thread as running against the libtrace object
     918        gettimeofday(&prev, NULL);
     919        message.code = MESSAGE_TICK;
     920        while (trace->state != STATE_FINSHED) {
     921                fd_set rfds;
     922                next_release = tv_to_usec(&prev) + delay_usec;
     923                gettimeofday(&next, NULL);
     924                if (next_release > tv_to_usec(&next)) {
     925                        next = usec_to_tv(next_release - tv_to_usec(&next));
     926                        // Wait for timeout or a message
     927                        FD_ZERO(&rfds);
     928                FD_SET(libtrace_message_queue_get_fd(&trace->keepalive_thread.messages), &rfds);
     929                        if (select(libtrace_message_queue_get_fd(&trace->keepalive_thread.messages)+1, &rfds, NULL, NULL, &next) == 1) {
     930                                libtrace_message_t msg;
     931                                libtrace_message_queue_get(&trace->keepalive_thread.messages, &msg);
     932                                assert(msg.code == MESSAGE_DO_STOP);
     933                                goto done;
     934                        }
     935                }
     936                prev = usec_to_tv(next_release);
     937                if (trace->state == STATE_RUNNING) {
     938                        message.additional.uint64 = tv_to_usec(&prev);
     939                        trace_send_message_to_perpkts(trace, &message);
     940                }
     941        }
     942done:
     943        fprintf(stderr, "keepalive thread is finishing\n");
     944        trace->keepalive_thread.state = THREAD_FINISHED;
     945        return NULL;
     946}
    910947
    911948/**
     
    10741111                assert (libtrace->perpkts_pausing != 0);
    10751112                fprintf(stderr, "Restarting trace\n");
    1076                 int err;
     1113                UNUSED int err;
    10771114                if (libtrace->perpkt_thread_count > 1 && trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
    10781115                        printf("This format has direct support for p's\n");
     
    10991136        libtrace->reducer = reducer;
    11001137        libtrace->perpkts_finishing = 0;
    1101         // libtrace->hasher = &rand_hash; /* Hasher now set via option */
    11021138
    11031139        assert(pthread_mutex_init(&libtrace->libtrace_lock, NULL) == 0);
     
    11971233                threads_started = trace_start_perpkt_threads(libtrace);
    11981234
     1235        libtrace->keepalive_thread.type = THREAD_KEEPALIVE;
     1236        libtrace->keepalive_thread.state = THREAD_RUNNING;
     1237        libtrace_message_queue_init(&libtrace->keepalive_thread.messages, sizeof(libtrace_message_t));
     1238        assert(pthread_create(&libtrace->keepalive_thread.tid, NULL, keepalive_entry, (void *) libtrace) == 0);
    11991239
    12001240        // Revert back - Allow signals again
     
    12481288                libtrace_message_t message = {0};
    12491289                message.code = MESSAGE_DO_PAUSE;
    1250                 message.additional = NULL;
    12511290                trace_send_message_to_thread(libtrace, &libtrace->hasher_thread, &message);
    12521291                // Wait for it to pause
     
    12651304                        libtrace_message_t message = {0};
    12661305                        message.code = MESSAGE_DO_PAUSE;
    1267                         message.additional = NULL;
    12681306                        trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message);
    12691307                        if(trace_has_dedicated_hasher(libtrace)) {
     
    13481386       
    13491387        message.code = MESSAGE_DO_STOP;
    1350         message.additional = NULL;
    13511388        trace_send_message_to_perpkts(libtrace, &message);
    13521389        if (trace_has_dedicated_hasher(libtrace))
     
    14051442                                        return 0;
    14061443                                case HASHER_BIDIRECTIONAL:
    1407                                         trace->hasher = &toeplitz_hash_packet;
     1444                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
    14081445                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
    14091446                                        toeplitz_init_config(trace->hasher_data, 1);
    14101447                                        return 0;
    14111448                                case HASHER_UNIDIRECTIONAL:
    1412                                         trace->hasher = &toeplitz_hash_packet;
     1449                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
    14131450                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
    14141451                                        toeplitz_init_config(trace->hasher_data, 0);
     
    14721509                // Cannot destroy vector yet, this happens with trace_destroy
    14731510        }
     1511        libtrace->state = STATE_FINSHED;
     1512        // Wait for the ticker thread
     1513       
     1514        if (libtrace->keepalive_thread.state != THREAD_FINISHED) {
     1515                libtrace_message_t msg = {0};
     1516                msg.code = MESSAGE_DO_STOP;
     1517                fprintf(stderr, "Waiting to join with the keepalive\n");
     1518                trace_send_message_to_thread(libtrace, &libtrace->keepalive_thread, &msg);
     1519                pthread_join(libtrace->keepalive_thread.tid, NULL);
     1520        }
     1521        fprintf(stderr, "Joined with with the keepalive\n");
     1522       
    14741523
    14751524        // Lets mark this as done for now
    14761525        libtrace->joined = true;
    1477 }
    1478 
    1479 // Don't use extra overhead = :( directly place in storage structure using
    1480 // post
    1481 DLLEXPORT libtrace_result_t *trace_create_result()
    1482 {
    1483         libtrace_result_t *result = malloc(sizeof(libtrace_result_t));
    1484         assert(result);
    1485         result->key = 0;
    1486         result->value = NULL;
    1487         // TODO automatically back with a free list!!
    1488         return result;
    14891526}
    14901527
     
    15171554        libtrace_message_t message = {0};
    15181555        message.code = MESSAGE_POST_REDUCE;
    1519         message.additional = NULL;
    15201556        message.sender = get_thread_descriptor(libtrace);
    15211557        return libtrace_message_queue_put(&libtrace->reducer_thread.messages, (void *) &message);
     
    16101646
    16111647/**
    1612  * Note: This function grabs a lock and expects trace_update_inprogress_result
    1613  * to be called to release the lock.
    1614  *
    1615  * Expected to be used in trace-time situations to allow a result to be pending
    1616  * a publish that can be taken by the reducer before publish if it wants to
    1617  * publish a result. Such as publish a result every second but a queue hasn't
    1618  * processed a packet (or is overloaded) and hasn't published yet.
    1619  *
    1620  * Currently this only supports a single temporary result,
    1621  * as such if a key is different to the current temporary result the existing
    1622  * one will be published and NULL returned.
    1623  */
    1624 DLLEXPORT void * trace_retrive_inprogress_result(libtrace_t *libtrace, uint64_t key)
    1625 {
    1626         int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
    1627         libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
    1628 
    1629         assert (pthread_spin_lock(&t->tmp_spinlock) == 0);
    1630         if (t->tmp_key != key) {
    1631                 if (t->tmp_data) {
    1632                         //printf("publising data key=%"PRIu64"\n", t->tmp_key);
    1633                         trace_publish_result(libtrace, t->tmp_key, t->tmp_data);
    1634                 }
    1635                 t->tmp_data = NULL;
    1636                 t->tmp_key = key;
    1637         }
    1638         return t->tmp_data;
    1639 }
    1640 
    1641 /**
    1642  * Updates a temporary result and releases the lock previously grabbed by trace_retrive_inprogress_result
    1643  */
    1644 DLLEXPORT void trace_update_inprogress_result(libtrace_t *libtrace, uint64_t key, void * value)
    1645 {
    1646         int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
    1647         libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
    1648         if (t->tmp_key != key) {
    1649                 if (t->tmp_data) {
    1650                         printf("BAD publihsing data key=%"PRIu64"\n", t->tmp_key);
    1651                         trace_publish_result(libtrace, t->tmp_key, t->tmp_data);
    1652                 }
    1653                 t->tmp_key = key;
    1654         }
    1655         t->tmp_data = value;
    1656         assert (pthread_spin_unlock(&t->tmp_spinlock) == 0);
    1657 }
    1658 
    1659 /**
    16601648 * Publish to the reduce queue, return
    16611649 */
     
    16671655        libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
    16681656        // Now put it into my table
    1669         static __thread int count = 0;
     1657        UNUSED static __thread int count = 0;
    16701658
    16711659
     
    17041692        libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
    17051693        // Now put it into my table
    1706         static __thread int count = 0;
     1694        UNUSED static __thread int count = 0;
    17071695
    17081696        res.is_packet = 1;
     
    17441732        else
    17451733                return 1;
    1746 }
    1747 
    1748 /* Retrieves all results with the key requested from the temporary result
    1749  * holding zone.
    1750  */
    1751 DLLEXPORT int trace_get_results_check_temp(libtrace_t *libtrace, libtrace_vector_t *results, uint64_t key)
    1752 {
    1753         int i;
    1754 
    1755         libtrace_vector_empty(results);
    1756         // Check all of the temp queues
    1757         for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
    1758                 libtrace_result_t r = {0,0};
    1759                 assert (pthread_spin_lock(&libtrace->perpkt_threads[i].tmp_spinlock) == 0);
    1760                 if (libtrace->perpkt_threads[i].tmp_key == key) {
    1761                         libtrace_result_set_key_value(&r, key, libtrace->perpkt_threads[i].tmp_data);
    1762                         libtrace->perpkt_threads[i].tmp_data = NULL;
    1763                         printf("Found in temp queue\n");
    1764                 }
    1765                 assert (pthread_spin_unlock(&libtrace->perpkt_threads[i].tmp_spinlock) == 0);
    1766                 if (libtrace_result_get_value(&r)) {
    1767                         // Got a result still in temporary
    1768                         printf("Publishing from temp queue\n");
    1769                         libtrace_vector_push_back(results, &r);
    1770                 } else {
    1771                         // This might be waiting on the actual queue
    1772                         libtrace_queue_t *v = &libtrace->perpkt_threads[i].deque;
    1773                         if (libtrace_deque_peek_front(v, (void *) &r) &&
    1774                                         libtrace_result_get_value(&r)) {
    1775                                 assert (libtrace_deque_pop_front(&libtrace->perpkt_threads[i].deque, (void *) &r) == 1);
    1776                                 printf("Found in real queue\n");
    1777                                 libtrace_vector_push_back(results, &r);
    1778                         } // else no data (probably means no packets)
    1779                         else {
    1780                                 printf("Result missing in real queue\n");
    1781                         }
    1782                 }
    1783         }
    1784         //printf("Loop done yo, that means we've got #%d results to print fool!\n", libtrace_vector_get_size(results));
    1785         return libtrace_vector_get_size(results);
    17861734}
    17871735
     
    19081856DLLEXPORT int trace_parallel_config(libtrace_t *libtrace, trace_parallel_option_t option, void *value)
    19091857{
    1910         int ret = -1;
     1858        UNUSED int ret = -1;
    19111859        switch (option) {
    19121860                case TRACE_OPTION_SET_HASHER:
  • tools/tracertstats/tracertstats_parallel.c

    r17a3dff r82facc5  
    6565
    6666#define DEFAULT_OUTPUT_FMT "txt"
    67 #define TRACE_TIME 1
    6867
    6968struct libtrace_t *trace;
     
    190189} timestamp_sync_t;
    191190
    192 
    193 static int reduce_tracetime(libtrace_t* trace, void* global_blob, uint64_t *last_ts)
    194 {
    195         int i,j;
    196         //uint64_t count=0, bytes=0;
    197         static uint64_t ts = 0;
    198         libtrace_vector_t results;
    199         libtrace_vector_init(&results, sizeof(libtrace_result_t));
    200         trace_get_results_check_temp(trace, &results, *last_ts);
    201         //trace_get_results(trace, &results);
    202         //uint64_t packets;
    203        
    204         /* Get the results from each core and sum 'em up */
    205         for (i = 0 ; i < libtrace_vector_get_size(&results) ; i++) {
    206                 libtrace_result_t result;
    207                
    208                 assert(libtrace_vector_get(&results, i, (void *) &result) == 1);
    209                 ts = libtrace_result_get_key(&result);
    210                 if (*last_ts == 0)
    211                         *last_ts = ts;
    212                
    213                 result_t * res = libtrace_result_get_value(&result);
    214                 static result_t *  last_res = NULL;
    215                 if (res == last_res) {
    216                         printf("Hmm could be asserting but I'm not ;)\n");
    217                 }
    218                 //assert(res != last_res);
    219                 last_res = res;
    220                 //printf("Perpkt published %"PRIu64" - c=%"PRIu64"\n", ts, res->total.count);
    221                 /*while (*last_ts < ts) {
    222                         report_results((double) *last_ts * (double) packet_interval, count, bytes);
    223                         count = 0;
    224                         bytes = 0;
    225                         for (j = 0; j < filter_count; j++)
    226                                 filters[j].count = filters[j].bytes = 0;
    227                         (*last_ts)++;
    228                 }*/
    229                
    230                 count += res->total.count;
    231                 bytes += res->total.bytes;
    232                 for (j = 0; j < filter_count; j++) {
    233                         filters[j].count += res->filters[j].count;
    234                         filters[j].bytes += res->filters[j].bytes;
    235                 }
    236                 free(res);
    237         }
    238         report_results((double) *last_ts * (double) packet_interval, count, bytes);
    239         count = 0;
    240         bytes = 0;
    241         for (j = 0; j < filter_count; j++)
    242                 filters[j].count = filters[j].bytes = 0;
    243         (*last_ts)++;
    244        
    245         // Done with these results - Free internally and externally
    246         libtrace_vector_destroy(&results);
    247        
    248         return 0;
    249 }
    250 
    251191static void* per_packet(libtrace_t *trace, libtrace_packet_t *pkt,
    252192                                                libtrace_message_t *mesg,
     
    259199        // Unsure when we would hit this case but the old code had it, I
    260200        // guess we should keep it
    261         if (pkt && trace_get_packet_buffer(pkt,NULL,NULL) == NULL) {
    262                
     201        if (pkt && trace_get_packet_buffer(pkt,NULL,NULL) != NULL) {
     202                //fprintf(stderr, "Got packet t=%x\n", t);
    263203                ts = trace_get_seconds(pkt) / packet_interval;
    264204                if (last_ts == 0)
    265205                        last_ts = ts;
    266                
     206
    267207                while (packet_interval != UINT64_MAX && last_ts<ts) {
    268208                        // Publish and make a new one new
     209                        fprintf(stderr, "Publishing result %"PRIu64"\n", last_ts);
    269210                        trace_publish_result(trace, (uint64_t) last_ts, results);
    270211                        trace_post_reduce(trace);
     
    296237                                break;
    297238                        case MESSAGE_STOPPED:
     239                                // Should we always post this?
    298240                                if (results->total.count) {
    299241                                        trace_publish_result(trace, (uint64_t) last_ts, results);
    300242                                        trace_post_reduce(trace);
     243                                        results = NULL;
    301244                                }
     245                                break;
     246                        case MESSAGE_TICK:
     247                        {
     248                                int64_t offset;
     249                                struct timeval *tv, tv_real;
     250                                libtrace_packet_t *first_packet = NULL;
     251                                retrive_first_packet(trace, &first_packet, &tv);
     252                                if (first_packet != NULL) {
     253                                        // So figure out our running offset
     254                                        tv_real = trace_get_timeval(first_packet);
     255                                        offset = tv_to_usec(tv) - tv_to_usec(&tv_real);
     256                                        // Get time of day and do this stuff
     257                                        uint64_t next_update_time;
     258                                        next_update_time = (last_ts*packet_interval + packet_interval) * 1000000 + offset;
     259                                        if (next_update_time <= mesg->additional.uint64) {
     260                                                fprintf(stderr, "Got a tick and publishing early!!\n");
     261                                                trace_publish_result(trace, (uint64_t) last_ts, results);
     262                                                trace_post_reduce(trace);
     263                                                results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
     264                                                last_ts++;
     265                                        } else {
     266                                                fprintf(stderr, "Got a tick but no publish ...\n");
     267                                        }
     268                                } else {
     269                                        fprintf(stderr, "Got a tick but no packets seen yet!!!\n");
     270                                }
     271                        }
    302272                }
    303273        }
    304274        return pkt;
    305275}
    306 void * trace_retrive_inprogress_result(libtrace_t *libtrace, uint64_t key);
    307 /**
    308  * A trace time version of map which will attempt to keep upto date
    309  * with the incoming data and detect cases where results are missing and
    310  * recover correctly.
    311  */
    312 static void* per_packet_tracetime(libtrace_t *trace, libtrace_packet_t *pkt,
    313                                                 libtrace_message_t *mesg,
    314                                                 libtrace_thread_t *t)
    315 {
    316         // Using first entry as total and those after for filter counts
    317         int i;
    318         static __thread uint64_t last_ts = 0, ts = 0;
    319         static __thread double debug_last = 0.0;
    320         static __thread result_t * tmp_result = NULL;
    321        
    322         if (pkt && trace_get_packet_buffer(pkt,NULL,NULL) != NULL) {
    323                 ts = trace_get_seconds(pkt) / packet_interval;
    324                
    325                 if (debug_last != 0.0 && debug_last > trace_get_seconds(pkt))
    326                         printf("packets out of order bitch :(\n");
    327                 debug_last = trace_get_seconds(pkt);
    328                 if (last_ts == 0)
    329                         last_ts = ts;
    330                
    331                 /*
    332                 while (packet_interval != UINT64_MAX && last_ts<ts) {
    333                         // Publish and make new
    334                         trace_publish_result(trace, (uint64_t) last_ts, results);
    335                         results = malloc(sizeof(result_t) + sizeof(statistic_t) * filter_count);
    336                         memset(results, 0, sizeof(result_t) + sizeof(statistic_t) * filter_count);
    337                         last_ts++;
    338                 }*/
    339                
    340                 /* Calculate count for filters */
    341                 for(i=0;i<filter_count;++i) {
    342                         if(trace_apply_filter(filters[i].filter, pkt)) {
    343                                 tmp_result->filters[i].count = 1;
    344                                 tmp_result->filters[i].bytes = trace_get_wire_length(pkt);
    345                         } else {
    346                                 tmp_result->filters[i].count = 0;
    347                                 tmp_result->filters[i].bytes = 0;
    348                         }
    349                 }
    350                
    351                 /* Now Update the currently stored result */
    352                 result_t * results = (result_t *) trace_retrive_inprogress_result(trace, ts);
    353                
    354                 if (!results) {
    355                         results = malloc(sizeof(result_t) + sizeof(statistic_t) * filter_count);
    356                         memset(results, 0, sizeof(result_t) + sizeof(statistic_t) * filter_count);
    357                 }
    358                 assert(results);
    359                 /* Now add to the current results */
    360                 results->total.count++;
    361                 results->total.bytes +=trace_get_wire_length(pkt);
    362                 /* Now add on filters */
    363                 for(i=0;i<filter_count;++i) {
    364                         results->filters[i].count += tmp_result->filters[i].count;
    365                         results->filters[i].bytes += tmp_result->filters[i].bytes;
    366                 }
    367                 /* Now release the lock and send it away place that back into the buffer */
    368                 trace_update_inprogress_result(trace, ts, (void *) results);
    369                 /*if (count >= packet_count) {
    370                         report_results(ts,count,bytes);
    371                         count=0;
    372                         bytes=0;
    373                 }*/ // Hmm what was happening here doesn't match up with any of the documentations!!!
    374         }
    375         if (mesg) {
    376                 // printf ("%d.%06d READ #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet));
    377                 switch (mesg->code) {
    378                         case MESSAGE_STARTED:
    379                                 tmp_result = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
    380                                 break;
    381                         case MESSAGE_STOPPED:
    382                                 trace_retrive_inprogress_result(trace, 0);
    383                                 trace_update_inprogress_result(trace, 1, NULL);
    384                 }
    385         }
    386         // Done push the final results
    387         /*if (results->total.count)
    388                 trace_publish_result(trace, (uint64_t) last_ts, results);*/
    389        
    390         return pkt;
     276
     277static uint64_t bad_hash(const libtrace_packet_t * pkt, void *data) {
     278        return 0;
    391279}
    392280
     
    422310        trace_parallel_config(trace, TRACE_OPTION_ORDERED, &i);
    423311        trace_parallel_config(trace, TRACE_OPTION_TRACETIME, &i);
     312        trace_set_hasher(trace, HASHER_CUSTOM, &bad_hash, NULL);
    424313#if TRACE_TIME
    425314        if (trace_pstart(trace, NULL, &per_packet_tracetime, NULL)==-1) {
     
    434323        }
    435324
    436 #if TRACE_TIME
    437         // First we wait for a message telling us that a timestamp has been
    438         // published this allows us to approximately synchronize with the time
    439         libtrace_message_t message;
    440         int64_t offset;
    441         libtrace_packet_t *packet;
    442         struct timeval *tv, tv_real;
    443        
    444        
    445         do {
    446                 // TODO Put a timeout here also
    447                 libtrace_thread_get_message(trace, &message);
    448         } while (retrive_first_packet(trace, &packet, &tv) == 0);
    449         tv_real = trace_get_timeval(packet);
    450         offset = tv_to_usec(&tv_real) - tv_to_usec(tv);
    451         last_ts = trace_get_seconds(packet) / packet_interval;
    452         printf("Got first yay offset=%"PRId64" first_interval=%"PRIu64"\n", offset, last_ts);
    453         /*
    454         while (!got_first) {
    455                 // Wait for a message indicating we've got our 'first' packet, note not a 100% guarantee its our first but pretty likely
    456                
    457                
    458                
    459                 assert(pthread_mutex_lock(&lock_more) == 0);
    460                
    461                 for (i=0; i < 2; ++i) {
    462                         if (initial_stamps[i].difference_usecs) { // Hmm certainly this cannot possibly lineup 100%??
    463                                 got_first=1;
    464                                 last_ts = initial_stamps[i].first_interval_number;
    465                                 offset = initial_stamps[i].difference_usecs;
    466                                 printf("Got first yay offset=%"PRId64" first_interval=%"PRIu64"\n", offset, last_ts);
    467                         }
    468                 }
    469                 assert(pthread_mutex_unlock(&lock_more) == 0);
    470         }*/
    471         while (!trace_finished(trace)) {
    472                 struct timeval tv;
    473                 // Now try our best to read that one out
    474                
    475                 // Read messages
    476                 //libtrace_thread_get_message(trace, &message);
    477                
    478                 // We just release and do work currently, maybe if something
    479                 // interesting comes through we'd deal with that
    480                 //libtrace_thread_get_message(trace, &message);
    481                
    482                 //while (libtrace_thread_try_get_message(trace, &message) != LIBTRACE_MQ_FAILED) { }
    483                
    484                 /* Now wait for a second after we should see the results */
    485                 uint64_t next_update_time, t_usec;
    486                 next_update_time = (last_ts*packet_interval + packet_interval + 1) * 1000000 + offset;
    487                 gettimeofday(&tv, NULL);
    488                 t_usec = tv.tv_sec;
    489                 t_usec *= 1000000;
    490                 t_usec += tv.tv_usec;
    491                
    492                 //printf("Current time=%"PRIu64" Next result ready=%"PRIu64" =%f\n", t_usec, next_update_time, ((double) next_update_time - (double) t_usec) / 1000000.0);
    493                 if (next_update_time > t_usec) {
    494                         tv.tv_sec = (next_update_time - t_usec) / 1000000;
    495                         tv.tv_usec = (next_update_time - t_usec) % 1000000;
    496                         select(0, NULL, NULL, NULL, &tv);
    497                 }
    498                 reduce_tracetime(trace, NULL, &last_ts);
    499         }
    500 #else
     325
    501326        // reduce
    502327        while (!trace_finished(trace)) {
     
    511336                reduce(trace, NULL, &last_ts);
    512337        }
    513 #endif
    514338
    515339        // Wait for all threads to stop
Note: See TracChangeset for help on using the changeset viewer.