Ignore:
Timestamp:
03/12/15 17:14:42 (7 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
a978dec
Parents:
b54e2da
Message:

More documentation, including some renaming and modifications to behaviour

  • Removes accessor functions for libtrace_result_t, instead directly access the structure
  • Documentation for most functions
  • Split tick into interval and count messages for the two modes of operation
  • Normalise interval and packet order to use the erf timestamp format
  • Rename trace_send_message_to_XXX to trace trace_message_XXX
File:
1 edited

Legend:

Unmodified
Added
Removed
  • lib/trace_parallel.c

    rb54e2da r6a6e6a8  
    163163}
    164164
    165 /**
     165/*
    166166 * This can be used once the hasher thread has been started and internally after
    167167 * verfiy_configuration.
    168  *
    169  * @return true if the trace has dedicated hasher thread otherwise false.
    170  */
    171 inline bool trace_has_dedicated_hasher(libtrace_t * libtrace)
     168 */
     169DLLEXPORT bool trace_has_dedicated_hasher(libtrace_t * libtrace)
    172170{
    173171        return libtrace->hasher_thread.type == THREAD_HASHER;
    174172}
    175173
    176 /**
    177  * True if the trace has dedicated hasher thread otherwise false,
    178  * to be used after the trace is running
    179  */
    180 static inline int trace_has_dedicated_reporter(libtrace_t * libtrace)
     174DLLEXPORT bool trace_has_reporter(libtrace_t * libtrace)
    181175{
    182176        assert(libtrace->state != STATE_NEW);
     
    194188DLLEXPORT int libtrace_get_perpkt_count(libtrace_t * t) {
    195189        return t->perpkt_thread_count;
     190}
     191
     192/**
     193 * Changes the overall traces state and signals the condition.
     194 *
     195 * @param trace A pointer to the trace
     196 * @param new_state The new state of the trace
     197 * @param need_lock Set to true if libtrace_lock is not held, otherwise
     198 *        false in the case the lock is currently held by this thread.
     199 */
     200static inline void libtrace_change_state(libtrace_t *trace,
     201        const enum trace_state new_state, const bool need_lock)
     202{
     203        UNUSED enum trace_state prev_state;
     204        if (need_lock)
     205                pthread_mutex_lock(&trace->libtrace_lock);
     206        prev_state = trace->state;
     207        trace->state = new_state;
     208
     209        if (trace->config.debug_state)
     210                fprintf(stderr, "Trace(%s) state changed from %s to %s\n",
     211                        trace->uridata, get_trace_state_name(prev_state),
     212                        get_trace_state_name(trace->state));
     213
     214        pthread_cond_broadcast(&trace->perpkt_cond);
     215        if (need_lock)
     216                pthread_mutex_unlock(&trace->libtrace_lock);
    196217}
    197218
     
    225246                        (int) t->tid, prev_state, t->state);
    226247
    227         pthread_cond_broadcast(&trace->perpkt_cond);
    228         if (need_lock)
    229                 pthread_mutex_unlock(&trace->libtrace_lock);
    230 }
    231 
    232 /**
    233  * Changes the overall traces state and signals the condition.
    234  *
    235  * @param trace A pointer to the trace
    236  * @param new_state The new state of the trace
    237  * @param need_lock Set to true if libtrace_lock is not held, otherwise
    238  *        false in the case the lock is currently held by this thread.
    239  */
    240 static inline void libtrace_change_state(libtrace_t *trace,
    241         const enum trace_state new_state, const bool need_lock)
    242 {
    243         UNUSED enum trace_state prev_state;
    244         if (need_lock)
    245                 pthread_mutex_lock(&trace->libtrace_lock);
    246         prev_state = trace->state;
    247         trace->state = new_state;
    248 
    249         if (trace->config.debug_state)
    250                 fprintf(stderr, "Trace(%s) state changed from %s to %s\n",
    251                         trace->uridata, get_trace_state_name(prev_state),
    252                         get_trace_state_name(trace->state));
     248        if (trace->perpkt_thread_states[THREAD_FINISHED] == trace->perpkt_thread_count)
     249                libtrace_change_state(trace, STATE_FINSHED, false);
    253250
    254251        pthread_cond_broadcast(&trace->perpkt_cond);
     
    324321}
    325322
    326 /** Makes a packet safe, a packet may become invaild after a
    327  * pause (or stop/destroy) of a trace. This copies a packet
    328  * in such a way that it will be able to survive a pause.
    329  *
    330  * However this will not allow the packet to be used after
    331  * the format is destroyed. Or while the trace is still paused.
    332  */
    333323DLLEXPORT void libtrace_make_packet_safe(libtrace_packet_t *pkt) {
    334324        // Duplicate the packet in standard malloc'd memory and free the
     
    399389                assert((*packet)->error == READ_TICK);
    400390                libtrace_generic_t data = {.uint64 = trace_packet_get_order(*packet)};
    401                 (*trace->per_pkt)(trace, t, MESSAGE_TICK, data, t);
     391                (*trace->per_pkt)(trace, t, MESSAGE_TICK_COUNT, data, t);
    402392        }
    403393        return 0;
     
    567557                                              packets, nb_packets, &empty, &offset);
    568558                                        if (ret == READ_EOF) {
    569                                                 fprintf(stderr, "PAUSE stop eof!!\n");
    570559                                                goto eof;
    571560                                        } else if (ret == READ_ERROR) {
    572                                                 fprintf(stderr, "PAUSE stop error!!\n");
    573561                                                goto error;
    574562                                        }
     
    579567                                        goto eof;
    580568                        }
    581                         (*trace->per_pkt)(trace, t, message.code, message.additional, message.sender);
     569                        (*trace->per_pkt)(trace, t, message.code, message.data, message.sender);
    582570                        /* Continue and the empty messages out before packets */
    583571                        continue;
     
    619607                        switch (nb_packets) {
    620608                        case READ_EOF:
    621                                 fprintf(stderr, "EOF stop %d!!\n", nb_packets);
    622609                                goto eof;
    623610                        case READ_ERROR:
    624                                 fprintf(stderr, "ERROR stop %d!!\n", nb_packets);
    625611                                goto error;
    626612                        case READ_MESSAGE:
     
    636622
    637623error:
    638         fprintf(stderr, "An error occured in trace\n");
    639624        message.code = MESSAGE_DO_STOP;
    640625        message.sender = t;
    641         message.additional.uint64 = 0;
    642         trace_send_message_to_perpkts(trace, &message);
     626        message.data.uint64 = 0;
     627        trace_message_perpkts(trace, &message);
    643628eof:
    644         fprintf(stderr, "An eof occured in trace\n");
    645629        /* ~~~~~~~~~~~~~~ Trace is finished do tear down ~~~~~~~~~~~~~~~~~~~~~ */
    646630
     
    659643        thread_change_state(trace, t, THREAD_FINISHED, true);
    660644
    661         // Notify only after we've defiantly set the state to finished
    662         message.code = MESSAGE_PERPKT_ENDED;
    663         message.additional.uint64 = 0;
    664         trace_send_message_to_reporter(trace, &message);
     645        /* Make sure the reporter sees we have finished */
     646        if (trace_has_reporter(trace))
     647                trace_post_reporter(trace);
    665648
    666649        // Release all ocache memory before unregistering with the format
     
    703686        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
    704687
     688        /* We are reading but it is not the parallel API */
    705689        if (trace->format->pregister_thread) {
    706690                trace->format->pregister_thread(trace, t, true);
     
    789773                        // Unlock early otherwise we could deadlock
    790774                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, bcast);
    791                         ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
    792775                } else {
    793776                        fprintf(stderr, "SKIPPING THREAD !!!%d!!!/n", (int) i);
    794                         ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
    795                 }
     777                }
     778                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
    796779        }
    797780
     
    799782        thread_change_state(trace, t, THREAD_FINISHED, true);
    800783
    801         // Notify only after we've defiantly set the state to finished
    802         message.code = MESSAGE_PERPKT_ENDED;
    803         message.additional.uint64 = 0;
    804         trace_send_message_to_reporter(trace, &message);
    805784        libtrace_ocache_unregister_thread(&trace->packet_freelist);
    806785        if (trace->format->punregister_thread) {
     
    811790        // TODO remove from TTABLE t sometime
    812791        pthread_exit(NULL);
    813 };
    814 
    815 /**
    816  * Moves src into dest(Complete copy) and copies the memory buffer and
    817  * its flags from dest into src ready for reuse without needing extra mallocs.
    818  */
    819 static inline void swap_packets(libtrace_packet_t *dest, libtrace_packet_t *src) {
    820         // Save the passed in buffer status
    821         assert(dest->trace == NULL); // Must be a empty packet
    822         void * temp_buf = dest->buffer;
    823         buf_control_t temp_buf_control = dest->buf_control;
    824         // Completely copy StoredPacket into packet
    825         memcpy(dest, src, sizeof(libtrace_packet_t));
    826         // Set the buffer settings on the returned packet
    827         src->buffer = temp_buf;
    828         src->buf_control = temp_buf_control;
    829         src->trace = NULL;
    830792}
    831793
     
    943905{
    944906        if (!t->recorded_first) {
     907                libtrace_message_t mesg = {0};
    945908                struct timeval tv;
    946909                libtrace_packet_t * dup;
    947                 // For what it's worth we can call these outside of the lock
     910
     911                /* We mark system time against a copy of the packet */
    948912                gettimeofday(&tv, NULL);
    949913                dup = trace_copy_packet(packet);
     914
    950915                ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
    951916                libtrace->first_packets.packets[t->perpkt_num].packet = dup;
    952                 //printf("Stored first packet time=%f\n", trace_get_seconds(dup));
    953917                memcpy(&libtrace->first_packets.packets[t->perpkt_num].tv, &tv, sizeof(tv));
    954                 // Now update the first
    955918                libtrace->first_packets.count++;
     919
     920                /* Now update the first */
    956921                if (libtrace->first_packets.count == 1) {
    957                         // We the first entry hence also the first known packet
     922                        /* We the first entry hence also the first known packet */
    958923                        libtrace->first_packets.first = t->perpkt_num;
    959924                } else {
    960                         // Check if we are newer than the previous 'first' packet
     925                        /* Check if we are newer than the previous 'first' packet */
    961926                        size_t first = libtrace->first_packets.first;
    962927                        if (trace_get_seconds(dup) <
     
    965930                }
    966931                ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
    967                 libtrace_message_t mesg = {0};
     932
    968933                mesg.code = MESSAGE_FIRST_PACKET;
    969                 trace_send_message_to_reporter(libtrace, &mesg);
     934                trace_message_reporter(libtrace, &mesg);
     935                trace_message_perpkts(libtrace, &mesg);
    970936                t->recorded_first = true;
    971937        }
    972938}
    973939
    974 /**
    975  * Returns 1 if it's certain that the first packet is truly the first packet
    976  * rather than a best guess based upon threads that have published so far.
    977  * Otherwise 0 is returned.
    978  * It's recommended that this result is stored rather than calling this
    979  * function again.
    980  */
    981 DLLEXPORT int retrive_first_packet(libtrace_t *libtrace, libtrace_packet_t **packet, struct timeval **tv)
     940DLLEXPORT int trace_get_first_packet(libtrace_t *libtrace,
     941                                     libtrace_thread_t *t,
     942                                     libtrace_packet_t **packet,
     943                                     struct timeval **tv)
    982944{
     945        void * tmp;
    983946        int ret = 0;
     947
     948        if (t) {
     949                if (t->type != THREAD_PERPKT || t->trace != libtrace)
     950                        return -1;
     951        }
     952
     953        /* Throw away these which we don't use */
     954        if (!packet)
     955                packet = (libtrace_packet_t **) &tmp;
     956        if (!tv)
     957                tv = (struct timeval **) &tmp;
     958
    984959        ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
    985         if (libtrace->first_packets.count) {
     960        if (t) {
     961                /* Get the requested thread */
     962                *packet = libtrace->first_packets.packets[t->perpkt_num].packet;
     963                *tv = &libtrace->first_packets.packets[t->perpkt_num].tv;
     964        } else if (libtrace->first_packets.count) {
     965                /* Get the first packet across all threads */
    986966                *packet = libtrace->first_packets.packets[libtrace->first_packets.first].packet;
    987967                *tv = &libtrace->first_packets.packets[libtrace->first_packets.first].tv;
     
    10441024        (*trace->reporter)(trace, MESSAGE_RESUMING, (libtrace_generic_t) {0}, t);
    10451025
    1046         while (!trace_finished(trace)) {
     1026        while (!trace_has_finished(trace)) {
    10471027                if (trace->config.reporter_polling) {
    10481028                        if (libtrace_message_queue_try_get(&t->messages, &message) == LIBTRACE_MQ_FAILED)
     
    10641044                                break;
    10651045                default:
    1066                         (*trace->reporter)(trace, message.code, message.additional, message.sender);
     1046                        (*trace->reporter)(trace, message.code, message.data, message.sender);
    10671047                }
    10681048        }
     
    10861066        libtrace_t *trace = (libtrace_t *)data;
    10871067        uint64_t next_release;
     1068        libtrace_thread_t *t = &trace->keepalive_thread;
     1069
    10881070        fprintf(stderr, "keepalive thread is starting\n");
    10891071
     
    10911073        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
    10921074        if (trace->state == STATE_ERROR) {
    1093                 thread_change_state(trace, &trace->keepalive_thread, THREAD_FINISHED, false);
     1075                thread_change_state(trace, t, THREAD_FINISHED, false);
    10941076                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
    10951077                pthread_exit(NULL);
     
    10981080
    10991081        gettimeofday(&prev, NULL);
    1100         message.code = MESSAGE_TICK;
     1082        message.code = MESSAGE_TICK_INTERVAL;
     1083
    11011084        while (trace->state != STATE_FINSHED) {
    11021085                fd_set rfds;
     
    11071090                        // Wait for timeout or a message
    11081091                        FD_ZERO(&rfds);
    1109                         FD_SET(libtrace_message_queue_get_fd(&trace->keepalive_thread.messages), &rfds);
    1110                         if (select(libtrace_message_queue_get_fd(&trace->keepalive_thread.messages)+1, &rfds, NULL, NULL, &next) == 1) {
     1092                        FD_SET(libtrace_message_queue_get_fd(&t->messages), &rfds);
     1093                        if (select(libtrace_message_queue_get_fd(&t->messages)+1, &rfds, NULL, NULL, &next) == 1) {
    11111094                                libtrace_message_t msg;
    1112                                 libtrace_message_queue_get(&trace->keepalive_thread.messages, &msg);
     1095                                libtrace_message_queue_get(&t->messages, &msg);
    11131096                                assert(msg.code == MESSAGE_DO_STOP);
    11141097                                goto done;
     
    11171100                prev = usec_to_tv(next_release);
    11181101                if (trace->state == STATE_RUNNING) {
    1119                         message.additional.uint64 = tv_to_usec(&prev);
    1120                         trace_send_message_to_perpkts(trace, &message);
     1102                        message.data.uint64 = ((((uint64_t)prev.tv_sec) << 32) +
     1103                                               (((uint64_t)prev.tv_usec << 32)/1000000));
     1104                        trace_message_perpkts(trace, &message);
    11211105                }
    11221106        }
    11231107done:
    11241108
    1125         thread_change_state(trace, &trace->keepalive_thread, THREAD_FINISHED, true);
     1109        thread_change_state(trace, t, THREAD_FINISHED, true);
    11261110        return NULL;
    11271111}
     
    11461130                struct timeval *sys_tv;
    11471131                int64_t initial_offset;
    1148                 int stable = retrive_first_packet(libtrace, &first_pkt, &sys_tv);
     1132                int stable = trace_get_first_packet(libtrace, NULL, &first_pkt, &sys_tv);
    11491133                assert(first_pkt);
    11501134                pkt_tv = trace_get_timeval(first_pkt);
     
    16451629        ASSERT_RET(pthread_spin_init(&libtrace->first_packets.lock, 0), == 0);
    16461630        libtrace->first_packets.packets = calloc(libtrace->perpkt_thread_count,
    1647                                                  sizeof(struct  __packet_storage_magic_type));
     1631                                                 sizeof(*libtrace->first_packets.packets));
    16481632        if (libtrace->first_packets.packets == NULL) {
    16491633                trace_set_err(libtrace, errno, "trace_pstart "
     
    17231707}
    17241708
    1725 /**
     1709/*
    17261710 * Pauses a trace, this should only be called by the main thread
    17271711 * 1. Set started = false
     
    17581742                libtrace_message_t message = {0};
    17591743                message.code = MESSAGE_DO_PAUSE;
    1760                 trace_send_message_to_thread(libtrace, &libtrace->hasher_thread, &message);
     1744                trace_message_thread(libtrace, &libtrace->hasher_thread, &message);
    17611745                // Wait for it to pause
    17621746                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
     
    17761760                        libtrace_message_t message = {0};
    17771761                        message.code = MESSAGE_DO_PAUSE;
    1778                         trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message);
     1762                        trace_message_thread(libtrace, &libtrace->perpkt_threads[i], &message);
    17791763                        if(trace_has_dedicated_hasher(libtrace)) {
    17801764                                // The hasher has stopped and other threads have messages waiting therefore
     
    18091793
    18101794        // Deal with the reporter
    1811         if (trace_has_dedicated_reporter(libtrace)) {
     1795        if (trace_has_reporter(libtrace)) {
    18121796                if (libtrace->config.debug_state)
    18131797                        fprintf(stderr, "Reporter thread is running, asking it to pause ...");
    18141798                libtrace_message_t message = {0};
    18151799                message.code = MESSAGE_DO_PAUSE;
    1816                 trace_send_message_to_thread(libtrace, &libtrace->reporter_thread, &message);
     1800                trace_message_thread(libtrace, &libtrace->reporter_thread, &message);
    18171801                // Wait for it to pause
    18181802                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
     
    18731857
    18741858        message.code = MESSAGE_DO_STOP;
    1875         trace_send_message_to_perpkts(libtrace, &message);
     1859        trace_message_perpkts(libtrace, &message);
    18761860        if (trace_has_dedicated_hasher(libtrace))
    1877                 trace_send_message_to_thread(libtrace, &libtrace->hasher_thread, &message);
     1861                trace_message_thread(libtrace, &libtrace->hasher_thread, &message);
    18781862
    18791863        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
    1880                 trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message);
     1864                trace_message_thread(libtrace, &libtrace->perpkt_threads[i], &message);
    18811865        }
    18821866
     
    19861970                // Cannot destroy vector yet, this happens with trace_destroy
    19871971        }
    1988         // TODO consider perpkt threads marking trace as finished before join is called
    1989         libtrace_change_state(libtrace, STATE_FINSHED, true);
    1990 
    1991         if (trace_has_dedicated_reporter(libtrace)) {
     1972
     1973        if (trace_has_reporter(libtrace)) {
    19921974                pthread_join(libtrace->reporter_thread.tid, NULL);
    19931975                assert(libtrace->reporter_thread.state == THREAD_FINISHED);
     
    19981980                libtrace_message_t msg = {0};
    19991981                msg.code = MESSAGE_DO_STOP;
    2000                 trace_send_message_to_thread(libtrace, &libtrace->keepalive_thread, &msg);
     1982                trace_message_thread(libtrace, &libtrace->keepalive_thread, &msg);
    20011983                pthread_join(libtrace->keepalive_thread.tid, NULL);
    20021984        }
     
    20061988}
    20071989
    2008 DLLEXPORT int libtrace_thread_get_message_count(libtrace_t * libtrace)
     1990DLLEXPORT int libtrace_thread_get_message_count(libtrace_t * libtrace,
     1991                                                libtrace_thread_t *t)
    20091992{
    2010         libtrace_thread_t * t = get_thread_descriptor(libtrace);
    2011         assert(t);
    2012         return libtrace_message_queue_count(&t->messages);
    2013 }
    2014 
    2015 DLLEXPORT int libtrace_thread_get_message(libtrace_t * libtrace, libtrace_message_t * message)
     1993        int ret;
     1994        if (t == NULL)
     1995                t = get_thread_descriptor(libtrace);
     1996        if (t == NULL)
     1997                return -1;
     1998        ret = libtrace_message_queue_count(&t->messages);
     1999        return ret < 0 ? 0 : ret;
     2000}
     2001
     2002DLLEXPORT int libtrace_thread_get_message(libtrace_t * libtrace,
     2003                                          libtrace_thread_t *t,
     2004                                          libtrace_message_t * message)
    20162005{
    2017         libtrace_thread_t * t = get_thread_descriptor(libtrace);
    2018         assert(t);
    2019         return libtrace_message_queue_get(&t->messages, message);
    2020 }
    2021 
    2022 DLLEXPORT int libtrace_thread_try_get_message(libtrace_t * libtrace, libtrace_message_t * message)
     2006        int ret;
     2007        if (t == NULL)
     2008                t = get_thread_descriptor(libtrace);
     2009        if (t == NULL)
     2010                return -1;
     2011        ret = libtrace_message_queue_get(&t->messages, message);
     2012        return ret < 0 ? 0 : ret;
     2013}
     2014
     2015DLLEXPORT int libtrace_thread_try_get_message(libtrace_t * libtrace,
     2016                                              libtrace_thread_t *t,
     2017                                              libtrace_message_t * message)
    20232018{
    2024         libtrace_thread_t * t = get_thread_descriptor(libtrace);
    2025         assert(t);
    2026         return libtrace_message_queue_try_get(&t->messages, message);
    2027 }
    2028 
    2029 /**
    2030  * Return backlog indicator
    2031  */
     2019        if (t == NULL)
     2020                t = get_thread_descriptor(libtrace);
     2021        if (t == NULL)
     2022                return -1;
     2023        if (libtrace_message_queue_try_get(&t->messages, message) != LIBTRACE_MQ_FAILED)
     2024                return 0;
     2025        else
     2026                return -1;
     2027}
     2028
     2029DLLEXPORT int trace_message_thread(libtrace_t * libtrace, libtrace_thread_t *t, libtrace_message_t * message)
     2030{
     2031        int ret;
     2032        if (!message->sender)
     2033                message->sender = get_thread_descriptor(libtrace);
     2034
     2035        ret = libtrace_message_queue_put(&t->messages, message);
     2036        return ret < 0 ? 0 : ret;
     2037}
     2038
     2039DLLEXPORT int trace_message_reporter(libtrace_t * libtrace, libtrace_message_t * message)
     2040{
     2041        if (!trace_has_reporter(libtrace) ||
     2042            !(libtrace->reporter_thread.state == THREAD_RUNNING
     2043              || libtrace->reporter_thread.state == THREAD_PAUSED))
     2044                return -1;
     2045
     2046        return trace_message_thread(libtrace, &libtrace->reporter_thread, message);
     2047}
     2048
    20322049DLLEXPORT int trace_post_reporter(libtrace_t *libtrace)
    20332050{
    20342051        libtrace_message_t message = {0};
    20352052        message.code = MESSAGE_POST_REPORTER;
    2036         message.sender = get_thread_descriptor(libtrace);
    2037         return libtrace_message_queue_put(&libtrace->reporter_thread.messages, (void *) &message);
    2038 }
    2039 
    2040 /**
    2041  * Return backlog indicator
    2042  */
    2043 DLLEXPORT int trace_send_message_to_reporter(libtrace_t * libtrace, libtrace_message_t * message)
    2044 {
    2045         //printf("Sending message code=%d to reporter\n", message->code);
    2046         message->sender = get_thread_descriptor(libtrace);
    2047         return libtrace_message_queue_put(&libtrace->reporter_thread.messages, message);
    2048 }
    2049 
    2050 /**
    2051  *
    2052  */
    2053 DLLEXPORT int trace_send_message_to_thread(libtrace_t * libtrace, libtrace_thread_t *t, libtrace_message_t * message)
    2054 {
    2055         //printf("Sending message code=%d to reporter\n", message->code);
    2056         message->sender = get_thread_descriptor(libtrace);
    2057         return libtrace_message_queue_put(&t->messages, message);
    2058 }
    2059 
    2060 DLLEXPORT int trace_send_message_to_perpkts(libtrace_t * libtrace, libtrace_message_t * message)
     2053        return trace_message_reporter(libtrace, (void *) &message);
     2054}
     2055
     2056DLLEXPORT int trace_message_perpkts(libtrace_t * libtrace, libtrace_message_t * message)
    20612057{
    20622058        int i;
    2063         message->sender = get_thread_descriptor(libtrace);
     2059        int missed;
     2060        if (message->sender == NULL)
     2061                message->sender = get_thread_descriptor(libtrace);
    20642062        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
    2065                 libtrace_message_queue_put(&libtrace->perpkt_threads[i].messages, message);
    2066         }
    2067         //printf("Sending message code=%d to reporter\n", message->code);
    2068         return 0;
    2069 }
    2070 
    2071 DLLEXPORT void libtrace_result_set_key(libtrace_result_t * result, uint64_t key) {
    2072         result->key = key;
    2073 }
    2074 DLLEXPORT uint64_t libtrace_result_get_key(libtrace_result_t * result) {
    2075         return result->key;
    2076 }
    2077 DLLEXPORT void libtrace_result_set_value(libtrace_result_t * result, libtrace_generic_t value) {
    2078         result->value = value;
    2079 }
    2080 DLLEXPORT libtrace_generic_t libtrace_result_get_value(libtrace_result_t * result) {
    2081         return result->value;
    2082 }
    2083 DLLEXPORT void libtrace_result_set_key_value(libtrace_result_t * result, uint64_t key, libtrace_generic_t value) {
    2084         result->key = key;
    2085         result->value = value;
    2086 }
    2087 DLLEXPORT void trace_destroy_result(libtrace_result_t ** result) {
    2088         free(*result);
    2089         result = NULL;
    2090         // TODO automatically back with a free list!!
     2063                if (libtrace->perpkt_threads[i].state == THREAD_RUNNING ||
     2064                    libtrace->perpkt_threads[i].state == THREAD_PAUSED) {
     2065                        libtrace_message_queue_put(&libtrace->perpkt_threads[i].messages, message);
     2066                } else {
     2067                        missed += 1;
     2068                }
     2069        }
     2070        return -missed;
    20912071}
    20922072
     
    21582138}
    21592139
    2160 DLLEXPORT int trace_finished(libtrace_t * libtrace) {
    2161         // TODO I don't like using this so much, we could use state!!!
    2162         return libtrace->perpkt_thread_states[THREAD_FINISHED] == libtrace->perpkt_thread_count;
     2140DLLEXPORT bool trace_has_finished(libtrace_t * libtrace) {
     2141        return libtrace->state == STATE_FINSHED || libtrace->state == STATE_JOINED;
    21632142}
    21642143
     
    22692248}
    22702249
    2271 DLLEXPORT libtrace_packet_t* trace_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet) {
    2272         libtrace_packet_t* result;
    2273         libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &result, 1, 1);
    2274         assert(result);
    2275         swap_packets(result, packet); // Move the current packet into our copy
    2276         return result;
    2277 }
    2278 
    2279 DLLEXPORT void trace_free_result_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
    2280         // Try write back the packet
     2250DLLEXPORT void trace_free_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
    22812251        assert(packet);
    2282         // Always release any resources this might be holding such as a slot in a ringbuffer
     2252        /* Always release any resources this might be holding */
    22832253        trace_fin_packet(packet);
    22842254        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packet, 1, 1);
Note: See TracChangeset for help on using the changeset viewer.