Changeset 6a6e6a8


Ignore:
Timestamp:
03/12/15 17:14:42 (6 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
a978dec
Parents:
b54e2da
Message:

More documentation, including some renaming and modifications to behaviour

  • Removes accessor functions for libtrace_result_t, instead directly access the structure
  • Documentation for most functions
  • Split tick into interval and count messages for the two modes of operation
  • Normalise interval and packet order to use the erf timestamp format
  • Rename trace_send_message_to_XXX to trace trace_message_XXX
Files:
16 edited

Legend:

Unmodified
Added
Removed
  • lib/combiner_ordered.c

    r62b3c4e r6a6e6a8  
    4545                        live_count++;
    4646                        live[i] = true;
    47                         key[i] = libtrace_result_get_key(&r);
     47                        key[i] = r.key;
    4848                        if (i==0 || min_key > key[i]) {
    4949                                min_key = key[i];
     
    7272                {
    7373                        libtrace_deque_peek_front(&queues[min_queue], (void *) &r);
    74                         key[min_queue] = libtrace_result_get_key(&r);
     74                        key[min_queue] = r.key;
    7575                        if (key[min_queue] <= min_key) {
    7676                                // We are still the smallest, might be out of order though :(
     
    131131
    132132DLLEXPORT const libtrace_combine_t combiner_ordered = {
    133     init_combiner,      /* initialise */
     133        init_combiner,  /* initialise */
    134134        destroy,                /* destroy */
    135135        publish,                /* publish */
    136     read,                       /* read */
    137     read_final,         /* read_final */
    138     pause,                      /* pause */
    139     NULL,                       /* queues */
    140     {0}                         /* opts */
     136        read,                   /* read */
     137        read_final,             /* read_final */
     138        pause,                  /* pause */
     139        NULL,                   /* queues */
     140        {0}                             /* opts */
    141141};
  • lib/combiner_sorted.c

    r62b3c4e r6a6e6a8  
    2828static int compare_result(const void* p1, const void* p2)
    2929{
    30         if (libtrace_result_get_key((libtrace_result_t *) p1) < libtrace_result_get_key((libtrace_result_t *) p2))
     30        const libtrace_result_t * r1 = p1;
     31        const libtrace_result_t * r2 = p2;
     32        if (r1->key < r2->key)
    3133                return -1;
    32         if (libtrace_result_get_key((libtrace_result_t *) p1) == libtrace_result_get_key((libtrace_result_t *) p2))
     34        if (r1->key == r2->key)
    3335                return 0;
    3436        else
  • lib/data-struct/object_cache.c

    r04bf7c5 r6a6e6a8  
    230230        pthread_spin_unlock(&oc->spin);
    231231
    232         // Make sure we haven't lost too many packets
    233232        if (oc->current_allocations)
    234                 fprintf(stderr, "!!OCache closing lost, %d packets!!\n", (int) oc->current_allocations);
    235         else
    236                 /* This is clearly a bug, but I don't know what to replace it with... */
    237                 fprintf(stderr, "!!OCache closing lost, %d packets!!\n", (int) oc->current_allocations);
     233                fprintf(stderr, "OCache destroyed, leaking %d packets!!\n", (int) oc->current_allocations);
     234
    238235        libtrace_ringbuffer_destroy(&oc->rb);
    239236        pthread_spin_destroy(&oc->spin);
  • lib/format_dpdk.c

    r4ce6fca r6a6e6a8  
    20232023
    20242024        plc->ts_last_sys = cur_sys_time_ns;
    2025 
    20262025        return;
    20272026}
  • lib/libtrace_int.h

    r6a082f8 r6a6e6a8  
    225225        size_t count; // If == perpkt_thread_count threads we have all
    226226        size_t first; // Valid if count != 0
    227         struct __packet_storage_magic_type {
     227        struct {
    228228                libtrace_packet_t * packet;
    229229                struct timeval tv;
  • lib/libtrace_parallel.h

    r6a082f8 r6a6e6a8  
    4949
    5050typedef struct libtrace_result_t libtrace_result_t;
     51
    5152/**
    5253 * A collection of types for convenience used in place of a
     
    100101
    101102typedef struct libtrace_message_t {
    102         int code;
    103         libtrace_generic_t additional;
    104         libtrace_thread_t *sender;
     103        int code; /**< The message code see enum libtrace_messages */
     104        libtrace_generic_t data; /**< Additional data related to the message */
     105        libtrace_thread_t *sender; /**< The thread that sent the message */
    105106} libtrace_message_t;
    106107
     
    139140} trace_parallel_option_t;
    140141
     142/** The libtrace_messages enum
     143 * All libtrace messages are defined and documented here.
     144 *
     145 * Some messages can be sent to control the library while others
     146 * are received by the per-packet and reporter functions to inform the libtrace
     147 * application.
     148 *
     149 * If a user wishes to send there own custom messages they should use
     150 * numbers greater than MESSAGE_USER (1000).
     151 *
     152 * @note Some messages are for internal use only
     153 */
    141154enum libtrace_messages {
     155        /** A libtrace packet is ready, this will only be sent to per
     156         * packet threads.
     157         * @param data Holds the packet in data.pkt. The packet belongs to
     158         * libtrace and should either be returned from the per-packet function
     159         * if no longer needed or free'd at some later time using the XXX
     160         * function.
     161         * @param sender The sender will be set as the current thread
     162         */
    142163        MESSAGE_PACKET,
     164        /** A libtrace result is ready, this will only be sent to the reporter
     165         * thread.
     166         * @param data Holds the result in data.res. The memory holding the
     167         * result is allocated by libtrace and should not be free'd. However
     168         * note that any data stored within the result might need to be free'd.
     169         * @param sender The sender will be set as the current thread
     170         */
    143171        MESSAGE_RESULT,
     172
     173        /** A message sent to each thread when it starts. This is sent
     174         * to both the reporter and per-packet threads. This will be sent once
     175         * after trace_pstart() (assuming no errors occurs).
     176         *
     177         * This can be used to allocate resources required by each thread.
     178         *
     179         * These can be free'd when MESSAGE_STOPPING is received.
     180         *
     181         * @param data unused, do not use this
     182         * @param sender The sender will be set as the current thread
     183         */
    144184        MESSAGE_STARTING,
     185
     186        /** A message sent to each thread when it stops. This is sent
     187         * to both the reporter and per-packet threads. This will be sent once
     188         * after MESSAGE_STARTING.
     189         *
     190         * This can be used to free any resources allocated with
     191         * MESSAGE_STARTING.
     192         *
     193         * @param data unused, do not use this
     194         * @param sender The sender will be set as the current thread
     195         */
     196        MESSAGE_STOPPING,
     197
     198        /** A message sent to each thread when a thread transitions between a
     199         * paused (or unstarted) state to running state. This is sent
     200         * to both the reporter and per-packet threads. This will be sent after
     201         * MESSAGE_STARTING when a trace is first started and when a trace
     202         * is started (trace_pstart()) after a pause (trace_ppause()).
     203         *
     204         * This can be used to allocate resources.
     205         *
     206         * @param data unused, do not use this
     207         * @param sender The sender will be set as the current thread
     208         */
    145209        MESSAGE_RESUMING,
    146         MESSAGE_STOPPING,
     210
     211        /** A message sent to each thread when a thread transitions between a
     212         * paused (or unstarted) state to running state. This is sent
     213         * to both the reporter and per-packet threads. This will be sent after
     214         * MESSAGE_STARTING when a trace is first started and when a trace
     215         * is started (trace_pstart()) after a pause (trace_ppause()).
     216         *
     217         * This can be used to allocate resources.
     218         *
     219         * @param data unused, do not use this
     220         * @param sender The sender will be set as the current thread
     221         */
    147222        MESSAGE_PAUSING,
     223
     224        /** An internal message do not use this */
    148225        MESSAGE_DO_PAUSE,
     226        /** An internal message do not use this */
    149227        MESSAGE_DO_STOP,
     228
     229        /** Sent to all per-packet threads (including the sender) and the
     230         * reducer when the first packet is seen for a thread.
     231         *
     232         * @param data The first packet is stored in data.pkt. This packet is
     233         * shared by all threads receiving the message and is valid until
     234         * MESSAGE_PAUSING is received.
     235         * @param sender The per-packet thread which received the packet
     236         *
     237         * @note Upon pausing and restarting a trace this will be reset and
     238         * sent once a new packet is encountered
     239         *
     240         * @
     241         */
    150242        MESSAGE_FIRST_PACKET,
    151         MESSAGE_PERPKT_ENDED,
    152         MESSAGE_PERPKT_RESUMED,
    153         MESSAGE_PERPKT_PAUSED,
    154         MESSAGE_PERPKT_EOF,
     243
     244        /** Notify the reporter thread more data is available.
     245         *
     246         * Triggers the reporter to read as many results as possible.
     247         *
     248         * @param data unused
     249         * @param sender the sending
     250         *
     251         * @note This message should not be sent directly instead call
     252         * trace_post_reporter()
     253         *
     254         * @see trace_get_first_packet()
     255         */
    155256        MESSAGE_POST_REPORTER,
    156         MESSAGE_POST_RANGE,
    157         MESSAGE_TICK,
     257
     258        /** Sent to per-packet threads perodically after the configured time
     259         * interval has passed.
     260         *
     261         * This is sent out-of-band with respect to packets and as a result
     262         * can appear after a packet with an later timestamp, or before one
     263         * with an earlier timestamp.
     264         *
     265         * @param data data.uint64_t holds the system timestamp in the
     266         * erf format
     267         * @param sender should be ignored
     268         */
     269        MESSAGE_TICK_INTERVAL,
     270
     271        /** Sent to per-packet threads once the configured number of packets
     272         * are read from a trace.
     273         *
     274         * This are sent in-band with respect to packets such that all
     275         * threads will see it between the same packets.
     276         *
     277         * @param data The number of packets seen so far across all threads
     278         * @param sender Set to the current per-packet thread
     279         */
     280        MESSAGE_TICK_COUNT,
     281
     282        /** For specific user defined messages use codes above MESSAGE_USER. */
    158283        MESSAGE_USER = 1000
    159284};
     
    404529 * information dependent upon the mesg_code.
    405530 * @param sender The thread from which the message originated.
     531 * @return If the message type is MESSAGE_PACKET a packet can be returned back
     532 * to the library similar to trace_free_packet() otherwise this should be NULL.
    406533 *
    407534 * The values of data and sender depend upon the mesg_code. Please see the
     
    451578 * @param reporter A user supplied function called when a result is ready.
    452579 * Optional if NULL the reporter thread will not be started.
    453  * @returns 0 on success, otherwise -1 to indicate an error has occured
     580 * @return 0 on success, otherwise -1 to indicate an error has occured
    454581 *
    455582 * This can also be used to restart an existing parallel trace,
     
    465592 *
    466593 * @param libtrace The parallel trace to be paused
    467  * @returns 0 on success, otherwise -1 to indicate an error has occured
     594 * @return 0 on success, otherwise -1 to indicate an error has occured
    468595 *
    469596 */
     
    475602 *
    476603 * @param libtrace The parallel trace to be stopped
    477  * @returns 0 on success, otherwise -1 to indicate an error has occured
     604 * @return 0 on success, otherwise -1 to indicate an error has occured
    478605 *
    479606 * This should only be called by the main thread.
     
    491618 */
    492619DLLEXPORT void trace_join(libtrace_t * trace);
    493 
    494620
    495621/**
     
    555681DLLEXPORT int trace_set_hasher(libtrace_t *trace, enum hasher_types type, fn_hasher hasher, void *data);
    556682
    557 #define RESULT_NORMAL 0
    558 #define RESULT_PACKET 1
    559 #define RESULT_TICK   2
    560 
    561 DLLEXPORT void libtrace_result_set_key(libtrace_result_t * result, uint64_t key);
    562 DLLEXPORT uint64_t libtrace_result_get_key(libtrace_result_t * result);
    563 DLLEXPORT void libtrace_result_set_value(libtrace_result_t * result, libtrace_generic_t value);
    564 DLLEXPORT libtrace_generic_t libtrace_result_get_value(libtrace_result_t * result);
    565 DLLEXPORT void libtrace_result_set_key_value(libtrace_result_t * result, uint64_t key, libtrace_generic_t value);
    566 DLLEXPORT void trace_destroy_result(libtrace_result_t ** result);
    567 
    568 DLLEXPORT void trace_publish_result(libtrace_t *libtrace, libtrace_thread_t *t, uint64_t key, libtrace_generic_t value, int type);
    569 
     683/** Types of results.
     684 * Some result types require special handling by combiners
     685 * as such making use of built-in types is important.
     686 *
     687 * User specific results should be defined as values greater than RESULT_USER(1000)
     688 *
     689 */
     690enum result_types {
     691        /**
     692         * The result is a packet in some circumstances special handling needs
     693         * to be performed. As such packets should always be published as so.
     694         *
     695         * @param key (Typically) The packets order, see trace_packet_get_order()
     696         * @param
     697         */
     698        RESULT_PACKET,
     699
     700        /** The result is a tick message
     701         *
     702         * @param key The erf timestamp of the tick
     703         */
     704        RESULT_TICK_INTERVAL,
     705
     706        /** The result is a tick message
     707         *
     708         * @param key The sequence number of the tick message
     709         */
     710        RESULT_TICK_COUNT,
     711
     712        /** Any user specific codes should be above this.
     713         *
     714         */
     715        RESULT_USER = 1000
     716
     717};
     718
     719/** Publish a result for to the combiner destined for the reporter thread
     720 *
     721 * @param libtrace[in] The parallel input trace
     722 * @param t[in] The current per-packet thread
     723 * @param key[in] The key of the result (used for sorting by the combiner)
     724 * @param value[in] The value of the result
     725 * @param type[in] The type of result see the documentation for the result_types enum
     726 */
     727DLLEXPORT void trace_publish_result(libtrace_t *libtrace,
     728                                    libtrace_thread_t *t,
     729                                    uint64_t key,
     730                                    libtrace_generic_t value,
     731                                    int type);
     732
     733/** Check if a dedicated hasher thread is being used
     734 *
     735 * @return True if the trace has dedicated hasher thread otherwise false.
     736 */
     737DLLEXPORT bool trace_has_dedicated_hasher(libtrace_t * libtrace);
     738
     739/** Checks if a trace is using a reporter
     740 *
     741 * @param[in] The parallel input trace
     742 * @return True if the trace is using a reporter otherwise false
     743 */
     744DLLEXPORT bool trace_has_reporter(libtrace_t * libtrace);
     745
     746/** Post a message to the reporter thread requesting it to check for more
     747 * results.
     748 *
     749 * @param[in] The parallel input trace
     750 * @return -1 upon error indicating the message has not been sent otherwise a
     751 * backlog indicator (the number of messages the reporter has not yet read).
     752 */
    570753DLLEXPORT int trace_post_reporter(libtrace_t *libtrace);
    571 DLLEXPORT int libtrace_thread_get_message_count(libtrace_t * libtrace);
    572 DLLEXPORT int libtrace_thread_get_message(libtrace_t * libtrace, libtrace_message_t * message);
    573 DLLEXPORT int libtrace_thread_try_get_message(libtrace_t * libtrace, libtrace_message_t * message);
    574 DLLEXPORT int trace_send_message_to_reporter(libtrace_t * libtrace, libtrace_message_t * message);
    575 DLLEXPORT int trace_send_message_to_perpkts(libtrace_t * libtrace, libtrace_message_t * message);
    576 DLLEXPORT int trace_send_message_to_thread(libtrace_t * libtrace, libtrace_thread_t *t, libtrace_message_t * message);
    577 DLLEXPORT int trace_finished(libtrace_t * libtrace);
     754
     755/** Check the number of messages waiting in a queue
     756 *
     757 * @param[in] libtrace The input trace
     758 * @param[in] t The thread to check, if NULL the current thread will be used [Optional]
     759 *
     760 * @return packets in the queue otherwise -1 upon error.
     761 *
     762 * @note For best performance it is recommended to supply the thread argument
     763 * even if it is the current thread.
     764 */
     765DLLEXPORT int libtrace_thread_get_message_count(libtrace_t * libtrace,
     766                                                libtrace_thread_t *t);
     767
     768/** Read a message from a thread in a blocking fashion
     769 *
     770 * @param[in] libtrace The input trace
     771 * @param[in] t The thread to check, if NULL the current thread will be used [Optional]
     772 * @param[out] message A pointer to libtrace_message_t structure which will be
     773 * filled with the retrived message.
     774 *
     775 * @return The number of messages remaining otherwise -1 upon error.
     776 *
     777 *
     778 * @note For best performance it is recommended to supply the thread argument
     779 * even if it is the current thread.
     780 */
     781DLLEXPORT int libtrace_thread_get_message(libtrace_t * libtrace,
     782                                          libtrace_thread_t *t,
     783                                          libtrace_message_t * message);
     784
     785/** Read a message from a thread in a blocking fashion
     786 *
     787 * @param[in] libtrace The input trace
     788 * @param[in] t The thread to check, if NULL the current thread will be used [Optional]
     789 * @param[out] message A pointer to libtrace_message_t structure which will be
     790 * filled with the retrived message.
     791 *
     792 * @return 0 if successful otherwise -1 upon error or if no packets were available.
     793 *
     794 *
     795 * @note For best performance it is recommended to supply the thread argument
     796 * even if it is the current thread.
     797 */
     798DLLEXPORT int libtrace_thread_try_get_message(libtrace_t * libtrace,
     799                                              libtrace_thread_t *t,
     800                                              libtrace_message_t * message);
     801
     802/** Send a message to the reporter thread
     803 *
     804 * @param[in] libtrace The parallel trace
     805 * @param[in] message The message to be sent, if sender is NULL libtrace will
     806 * attempt to fill this in. It is faster to assign this if it is known.
     807 *
     808 * @return -1 upon error indicating the message has not been sent otherwise a
     809 * backlog indicator (the number of messages the reporter has not yet read).
     810 */
     811DLLEXPORT int trace_message_reporter(libtrace_t * libtrace,
     812                                     libtrace_message_t * message);
     813
     814/** Send a message to all per-packet threads
     815 *
     816 * @param[in] libtrace The parallel trace
     817 * @param[in] message The message to be sent, if sender is NULL libtrace will
     818 * attempt to fill this in. It is faster to assign this if it is known.
     819 *
     820 * @return 0 if successful otherwise a negative number indicating the number
     821 * of per-packet threads the message was not sent to (i.e. -1 means one thread
     822 * could not be sent the message).
     823 */
     824DLLEXPORT int trace_message_perpkts(libtrace_t * libtrace,
     825                                    libtrace_message_t * message);
     826
     827/** Send a message to a thread
     828 *
     829 * @param[in] libtrace The parallel trace
     830 * @param[in] t The thread to message
     831 * @param[in] message The message to be sent, if sender is NULL libtrace will
     832 * attempt to fill this in. It is faster to assign this if it is known.
     833 *
     834 * @return -1 upon error indicating the message has not been sent otherwise a
     835 * backlog indicator (the number of messages the thread has not yet read).
     836 */
     837DLLEXPORT int trace_message_thread(libtrace_t * libtrace,
     838                                   libtrace_thread_t *t,
     839                                   libtrace_message_t * message);
     840
     841/** Check if a parallel trace has finished reading packets
     842 *
     843 * @return True if the trace has finished reading packets (even if all results
     844 * have not yet been processed). Otherwise false.
     845 *
     846 * @note This returns true even if all results have not yet been processed.
     847 */
     848DLLEXPORT bool trace_has_finished(libtrace_t * libtrace);
     849
     850/** Returns either the sequence number or erf timestamp of a packet.
     851 *
     852 * @param[in] packet
     853 * @return A 64bit sequence number or erf timestamp.
     854 *
     855 * The returned value can be used to compare if packets come before or after
     856 * others.
     857 */
    578858DLLEXPORT uint64_t trace_packet_get_order(libtrace_packet_t * packet);
     859
     860/** Returns the hash of a packet.
     861 *
     862 * @param[in] packet
     863 * @return A 64-bit hash
     864 *
     865 * @note In many cases this might not be filled in, only in cases where
     866 * a custom hash is being used. You can use trace_has_dedicated_hasher()
     867 * to check if this will be valid.
     868 */
    579869DLLEXPORT uint64_t trace_packet_get_hash(libtrace_packet_t * packet);
     870
     871/** Sets the order of a packet.
     872 *
     873 * @param[in] packet
     874 * @param[in] order the new order of a packet
     875 *
     876 * @note many combiners rely on this value, ensure changing this conforms to
     877 * the combiners requirements.
     878 */
    580879DLLEXPORT void trace_packet_set_order(libtrace_packet_t * packet, uint64_t order);
     880
     881/** Sets the hash of a packet.
     882 *
     883 * @param[in] packet
     884 * @param[in] hash the new hash
     885 *
     886 * Once handed to the user the libtrace library has little use for this field
     887 * and as such this can essentially be used for any storage the user requires.
     888 */
    581889DLLEXPORT void trace_packet_set_hash(libtrace_packet_t * packet, uint64_t hash);
     890
     891/** TODO WHAT TO DO WITH THIS ? */
    582892DLLEXPORT uint64_t tv_to_usec(struct timeval *tv);
    583893
    584 DLLEXPORT int retrive_first_packet(libtrace_t *libtrace, libtrace_packet_t **packet, struct timeval **tv);
    585 
     894
     895/** Returns the first packet of a parallel trace since it was started or
     896 * restarted.
     897 *
     898 * @param[in] libtrace the parallel input trace
     899 * @param[in] t Either a per packet thread or NULL to retrive the first packet
     900 * of across all per packet threads.
     901 * @param[out] packet A pointer to the first packet in the trace. [Optional]
     902 * @param[out] tv The system timestamp when this packet was received. [Optional]
     903 * @return 1 if we are confident this is the first packet. Otherwise 0 if this
     904 * is a best guess (this is only possible int the case t=NULL)
     905 * in which case we recommend calling this at a later time.
     906 * -1 is returned if an error occurs, such as supplied a invalid thread.
     907 *
     908 * The packet returned by this function is shared by all threads and remains
     909 * valid until MESSAGE_PAUSING is received.
     910 */
     911DLLEXPORT int trace_get_first_packet(libtrace_t *libtrace,
     912                                     libtrace_thread_t *t,
     913                                     libtrace_packet_t **packet,
     914                                     struct timeval **tv);
     915
     916/** Makes a packet safe, a packet will become invaild after a
     917 * pausing a trace.
     918 *
     919 * @param pkt[in,out] The packet to make safe
     920 *
     921 * This copies a packet in such a way that it will be able to survive a pause.
     922 * However this will not allow the packet to be used after
     923 * the format is destroyed.
     924 */
    586925DLLEXPORT void libtrace_make_packet_safe(libtrace_packet_t *pkt);
     926
     927/** Makes a result safe if a result contains a packet.
     928 *
     929 * @param res[in,out] The result to make safe.
     930 *
     931 * This ensures the internal content of a result is safe to survive a pause.
     932 * See libtrace_make_packet_safe().
     933 */
    587934DLLEXPORT void libtrace_make_result_safe(libtrace_result_t *res);
    588935
    589936
    590937DLLEXPORT int trace_parallel_config(libtrace_t *libtrace, trace_parallel_option_t option, void *value);
    591 DLLEXPORT libtrace_packet_t* trace_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet);
    592 DLLEXPORT void trace_free_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet);
     938
     939/** In a parallel trace, free a packet back to libtrace.
     940 *
     941 * @param[in] libtrace A parallel input trace
     942 * @param[in] packet The packet to be released back to libtrace
     943 *
     944 * The packet should not be used after calling this function.
     945 *
     946 * @note All packets should be free'd before a trace is destroyed.
     947 */
     948DLLEXPORT void trace_free_packet(libtrace_t * libtrace, libtrace_packet_t * packet);
     949
     950
    593951DLLEXPORT libtrace_info_t *trace_get_information(libtrace_t * libtrace);
    594 
    595 
    596952DLLEXPORT void parse_user_config(struct user_configuration* uc, char * str);
    597953DLLEXPORT void parse_user_config_file(struct user_configuration* uc, FILE *file);
    598954DLLEXPORT int libtrace_get_perpkt_count(libtrace_t* t);
    599955
    600 
    601 
    602956/**
    603957 * Sets a combiner function against the trace.
  • lib/trace_parallel.c

    rb54e2da r6a6e6a8  
    163163}
    164164
    165 /**
     165/*
    166166 * This can be used once the hasher thread has been started and internally after
    167167 * verfiy_configuration.
    168  *
    169  * @return true if the trace has dedicated hasher thread otherwise false.
    170  */
    171 inline bool trace_has_dedicated_hasher(libtrace_t * libtrace)
     168 */
     169DLLEXPORT bool trace_has_dedicated_hasher(libtrace_t * libtrace)
    172170{
    173171        return libtrace->hasher_thread.type == THREAD_HASHER;
    174172}
    175173
    176 /**
    177  * True if the trace has dedicated hasher thread otherwise false,
    178  * to be used after the trace is running
    179  */
    180 static inline int trace_has_dedicated_reporter(libtrace_t * libtrace)
     174DLLEXPORT bool trace_has_reporter(libtrace_t * libtrace)
    181175{
    182176        assert(libtrace->state != STATE_NEW);
     
    194188DLLEXPORT int libtrace_get_perpkt_count(libtrace_t * t) {
    195189        return t->perpkt_thread_count;
     190}
     191
     192/**
     193 * Changes the overall traces state and signals the condition.
     194 *
     195 * @param trace A pointer to the trace
     196 * @param new_state The new state of the trace
     197 * @param need_lock Set to true if libtrace_lock is not held, otherwise
     198 *        false in the case the lock is currently held by this thread.
     199 */
     200static inline void libtrace_change_state(libtrace_t *trace,
     201        const enum trace_state new_state, const bool need_lock)
     202{
     203        UNUSED enum trace_state prev_state;
     204        if (need_lock)
     205                pthread_mutex_lock(&trace->libtrace_lock);
     206        prev_state = trace->state;
     207        trace->state = new_state;
     208
     209        if (trace->config.debug_state)
     210                fprintf(stderr, "Trace(%s) state changed from %s to %s\n",
     211                        trace->uridata, get_trace_state_name(prev_state),
     212                        get_trace_state_name(trace->state));
     213
     214        pthread_cond_broadcast(&trace->perpkt_cond);
     215        if (need_lock)
     216                pthread_mutex_unlock(&trace->libtrace_lock);
    196217}
    197218
     
    225246                        (int) t->tid, prev_state, t->state);
    226247
    227         pthread_cond_broadcast(&trace->perpkt_cond);
    228         if (need_lock)
    229                 pthread_mutex_unlock(&trace->libtrace_lock);
    230 }
    231 
    232 /**
    233  * Changes the overall traces state and signals the condition.
    234  *
    235  * @param trace A pointer to the trace
    236  * @param new_state The new state of the trace
    237  * @param need_lock Set to true if libtrace_lock is not held, otherwise
    238  *        false in the case the lock is currently held by this thread.
    239  */
    240 static inline void libtrace_change_state(libtrace_t *trace,
    241         const enum trace_state new_state, const bool need_lock)
    242 {
    243         UNUSED enum trace_state prev_state;
    244         if (need_lock)
    245                 pthread_mutex_lock(&trace->libtrace_lock);
    246         prev_state = trace->state;
    247         trace->state = new_state;
    248 
    249         if (trace->config.debug_state)
    250                 fprintf(stderr, "Trace(%s) state changed from %s to %s\n",
    251                         trace->uridata, get_trace_state_name(prev_state),
    252                         get_trace_state_name(trace->state));
     248        if (trace->perpkt_thread_states[THREAD_FINISHED] == trace->perpkt_thread_count)
     249                libtrace_change_state(trace, STATE_FINSHED, false);
    253250
    254251        pthread_cond_broadcast(&trace->perpkt_cond);
     
    324321}
    325322
    326 /** Makes a packet safe, a packet may become invaild after a
    327  * pause (or stop/destroy) of a trace. This copies a packet
    328  * in such a way that it will be able to survive a pause.
    329  *
    330  * However this will not allow the packet to be used after
    331  * the format is destroyed. Or while the trace is still paused.
    332  */
    333323DLLEXPORT void libtrace_make_packet_safe(libtrace_packet_t *pkt) {
    334324        // Duplicate the packet in standard malloc'd memory and free the
     
    399389                assert((*packet)->error == READ_TICK);
    400390                libtrace_generic_t data = {.uint64 = trace_packet_get_order(*packet)};
    401                 (*trace->per_pkt)(trace, t, MESSAGE_TICK, data, t);
     391                (*trace->per_pkt)(trace, t, MESSAGE_TICK_COUNT, data, t);
    402392        }
    403393        return 0;
     
    567557                                              packets, nb_packets, &empty, &offset);
    568558                                        if (ret == READ_EOF) {
    569                                                 fprintf(stderr, "PAUSE stop eof!!\n");
    570559                                                goto eof;
    571560                                        } else if (ret == READ_ERROR) {
    572                                                 fprintf(stderr, "PAUSE stop error!!\n");
    573561                                                goto error;
    574562                                        }
     
    579567                                        goto eof;
    580568                        }
    581                         (*trace->per_pkt)(trace, t, message.code, message.additional, message.sender);
     569                        (*trace->per_pkt)(trace, t, message.code, message.data, message.sender);
    582570                        /* Continue and the empty messages out before packets */
    583571                        continue;
     
    619607                        switch (nb_packets) {
    620608                        case READ_EOF:
    621                                 fprintf(stderr, "EOF stop %d!!\n", nb_packets);
    622609                                goto eof;
    623610                        case READ_ERROR:
    624                                 fprintf(stderr, "ERROR stop %d!!\n", nb_packets);
    625611                                goto error;
    626612                        case READ_MESSAGE:
     
    636622
    637623error:
    638         fprintf(stderr, "An error occured in trace\n");
    639624        message.code = MESSAGE_DO_STOP;
    640625        message.sender = t;
    641         message.additional.uint64 = 0;
    642         trace_send_message_to_perpkts(trace, &message);
     626        message.data.uint64 = 0;
     627        trace_message_perpkts(trace, &message);
    643628eof:
    644         fprintf(stderr, "An eof occured in trace\n");
    645629        /* ~~~~~~~~~~~~~~ Trace is finished do tear down ~~~~~~~~~~~~~~~~~~~~~ */
    646630
     
    659643        thread_change_state(trace, t, THREAD_FINISHED, true);
    660644
    661         // Notify only after we've defiantly set the state to finished
    662         message.code = MESSAGE_PERPKT_ENDED;
    663         message.additional.uint64 = 0;
    664         trace_send_message_to_reporter(trace, &message);
     645        /* Make sure the reporter sees we have finished */
     646        if (trace_has_reporter(trace))
     647                trace_post_reporter(trace);
    665648
    666649        // Release all ocache memory before unregistering with the format
     
    703686        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
    704687
     688        /* We are reading but it is not the parallel API */
    705689        if (trace->format->pregister_thread) {
    706690                trace->format->pregister_thread(trace, t, true);
     
    789773                        // Unlock early otherwise we could deadlock
    790774                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, bcast);
    791                         ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
    792775                } else {
    793776                        fprintf(stderr, "SKIPPING THREAD !!!%d!!!/n", (int) i);
    794                         ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
    795                 }
     777                }
     778                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
    796779        }
    797780
     
    799782        thread_change_state(trace, t, THREAD_FINISHED, true);
    800783
    801         // Notify only after we've defiantly set the state to finished
    802         message.code = MESSAGE_PERPKT_ENDED;
    803         message.additional.uint64 = 0;
    804         trace_send_message_to_reporter(trace, &message);
    805784        libtrace_ocache_unregister_thread(&trace->packet_freelist);
    806785        if (trace->format->punregister_thread) {
     
    811790        // TODO remove from TTABLE t sometime
    812791        pthread_exit(NULL);
    813 };
    814 
    815 /**
    816  * Moves src into dest(Complete copy) and copies the memory buffer and
    817  * its flags from dest into src ready for reuse without needing extra mallocs.
    818  */
    819 static inline void swap_packets(libtrace_packet_t *dest, libtrace_packet_t *src) {
    820         // Save the passed in buffer status
    821         assert(dest->trace == NULL); // Must be a empty packet
    822         void * temp_buf = dest->buffer;
    823         buf_control_t temp_buf_control = dest->buf_control;
    824         // Completely copy StoredPacket into packet
    825         memcpy(dest, src, sizeof(libtrace_packet_t));
    826         // Set the buffer settings on the returned packet
    827         src->buffer = temp_buf;
    828         src->buf_control = temp_buf_control;
    829         src->trace = NULL;
    830792}
    831793
     
    943905{
    944906        if (!t->recorded_first) {
     907                libtrace_message_t mesg = {0};
    945908                struct timeval tv;
    946909                libtrace_packet_t * dup;
    947                 // For what it's worth we can call these outside of the lock
     910
     911                /* We mark system time against a copy of the packet */
    948912                gettimeofday(&tv, NULL);
    949913                dup = trace_copy_packet(packet);
     914
    950915                ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
    951916                libtrace->first_packets.packets[t->perpkt_num].packet = dup;
    952                 //printf("Stored first packet time=%f\n", trace_get_seconds(dup));
    953917                memcpy(&libtrace->first_packets.packets[t->perpkt_num].tv, &tv, sizeof(tv));
    954                 // Now update the first
    955918                libtrace->first_packets.count++;
     919
     920                /* Now update the first */
    956921                if (libtrace->first_packets.count == 1) {
    957                         // We the first entry hence also the first known packet
     922                        /* We the first entry hence also the first known packet */
    958923                        libtrace->first_packets.first = t->perpkt_num;
    959924                } else {
    960                         // Check if we are newer than the previous 'first' packet
     925                        /* Check if we are newer than the previous 'first' packet */
    961926                        size_t first = libtrace->first_packets.first;
    962927                        if (trace_get_seconds(dup) <
     
    965930                }
    966931                ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
    967                 libtrace_message_t mesg = {0};
     932
    968933                mesg.code = MESSAGE_FIRST_PACKET;
    969                 trace_send_message_to_reporter(libtrace, &mesg);
     934                trace_message_reporter(libtrace, &mesg);
     935                trace_message_perpkts(libtrace, &mesg);
    970936                t->recorded_first = true;
    971937        }
    972938}
    973939
    974 /**
    975  * Returns 1 if it's certain that the first packet is truly the first packet
    976  * rather than a best guess based upon threads that have published so far.
    977  * Otherwise 0 is returned.
    978  * It's recommended that this result is stored rather than calling this
    979  * function again.
    980  */
    981 DLLEXPORT int retrive_first_packet(libtrace_t *libtrace, libtrace_packet_t **packet, struct timeval **tv)
     940DLLEXPORT int trace_get_first_packet(libtrace_t *libtrace,
     941                                     libtrace_thread_t *t,
     942                                     libtrace_packet_t **packet,
     943                                     struct timeval **tv)
    982944{
     945        void * tmp;
    983946        int ret = 0;
     947
     948        if (t) {
     949                if (t->type != THREAD_PERPKT || t->trace != libtrace)
     950                        return -1;
     951        }
     952
     953        /* Throw away these which we don't use */
     954        if (!packet)
     955                packet = (libtrace_packet_t **) &tmp;
     956        if (!tv)
     957                tv = (struct timeval **) &tmp;
     958
    984959        ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
    985         if (libtrace->first_packets.count) {
     960        if (t) {
     961                /* Get the requested thread */
     962                *packet = libtrace->first_packets.packets[t->perpkt_num].packet;
     963                *tv = &libtrace->first_packets.packets[t->perpkt_num].tv;
     964        } else if (libtrace->first_packets.count) {
     965                /* Get the first packet across all threads */
    986966                *packet = libtrace->first_packets.packets[libtrace->first_packets.first].packet;
    987967                *tv = &libtrace->first_packets.packets[libtrace->first_packets.first].tv;
     
    10441024        (*trace->reporter)(trace, MESSAGE_RESUMING, (libtrace_generic_t) {0}, t);
    10451025
    1046         while (!trace_finished(trace)) {
     1026        while (!trace_has_finished(trace)) {
    10471027                if (trace->config.reporter_polling) {
    10481028                        if (libtrace_message_queue_try_get(&t->messages, &message) == LIBTRACE_MQ_FAILED)
     
    10641044                                break;
    10651045                default:
    1066                         (*trace->reporter)(trace, message.code, message.additional, message.sender);
     1046                        (*trace->reporter)(trace, message.code, message.data, message.sender);
    10671047                }
    10681048        }
     
    10861066        libtrace_t *trace = (libtrace_t *)data;
    10871067        uint64_t next_release;
     1068        libtrace_thread_t *t = &trace->keepalive_thread;
     1069
    10881070        fprintf(stderr, "keepalive thread is starting\n");
    10891071
     
    10911073        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
    10921074        if (trace->state == STATE_ERROR) {
    1093                 thread_change_state(trace, &trace->keepalive_thread, THREAD_FINISHED, false);
     1075                thread_change_state(trace, t, THREAD_FINISHED, false);
    10941076                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
    10951077                pthread_exit(NULL);
     
    10981080
    10991081        gettimeofday(&prev, NULL);
    1100         message.code = MESSAGE_TICK;
     1082        message.code = MESSAGE_TICK_INTERVAL;
     1083
    11011084        while (trace->state != STATE_FINSHED) {
    11021085                fd_set rfds;
     
    11071090                        // Wait for timeout or a message
    11081091                        FD_ZERO(&rfds);
    1109                         FD_SET(libtrace_message_queue_get_fd(&trace->keepalive_thread.messages), &rfds);
    1110                         if (select(libtrace_message_queue_get_fd(&trace->keepalive_thread.messages)+1, &rfds, NULL, NULL, &next) == 1) {
     1092                        FD_SET(libtrace_message_queue_get_fd(&t->messages), &rfds);
     1093                        if (select(libtrace_message_queue_get_fd(&t->messages)+1, &rfds, NULL, NULL, &next) == 1) {
    11111094                                libtrace_message_t msg;
    1112                                 libtrace_message_queue_get(&trace->keepalive_thread.messages, &msg);
     1095                                libtrace_message_queue_get(&t->messages, &msg);
    11131096                                assert(msg.code == MESSAGE_DO_STOP);
    11141097                                goto done;
     
    11171100                prev = usec_to_tv(next_release);
    11181101                if (trace->state == STATE_RUNNING) {
    1119                         message.additional.uint64 = tv_to_usec(&prev);
    1120                         trace_send_message_to_perpkts(trace, &message);
     1102                        message.data.uint64 = ((((uint64_t)prev.tv_sec) << 32) +
     1103                                               (((uint64_t)prev.tv_usec << 32)/1000000));
     1104                        trace_message_perpkts(trace, &message);
    11211105                }
    11221106        }
    11231107done:
    11241108
    1125         thread_change_state(trace, &trace->keepalive_thread, THREAD_FINISHED, true);
     1109        thread_change_state(trace, t, THREAD_FINISHED, true);
    11261110        return NULL;
    11271111}
     
    11461130                struct timeval *sys_tv;
    11471131                int64_t initial_offset;
    1148                 int stable = retrive_first_packet(libtrace, &first_pkt, &sys_tv);
     1132                int stable = trace_get_first_packet(libtrace, NULL, &first_pkt, &sys_tv);
    11491133                assert(first_pkt);
    11501134                pkt_tv = trace_get_timeval(first_pkt);
     
    16451629        ASSERT_RET(pthread_spin_init(&libtrace->first_packets.lock, 0), == 0);
    16461630        libtrace->first_packets.packets = calloc(libtrace->perpkt_thread_count,
    1647                                                  sizeof(struct  __packet_storage_magic_type));
     1631                                                 sizeof(*libtrace->first_packets.packets));
    16481632        if (libtrace->first_packets.packets == NULL) {
    16491633                trace_set_err(libtrace, errno, "trace_pstart "
     
    17231707}
    17241708
    1725 /**
     1709/*
    17261710 * Pauses a trace, this should only be called by the main thread
    17271711 * 1. Set started = false
     
    17581742                libtrace_message_t message = {0};
    17591743                message.code = MESSAGE_DO_PAUSE;
    1760                 trace_send_message_to_thread(libtrace, &libtrace->hasher_thread, &message);
     1744                trace_message_thread(libtrace, &libtrace->hasher_thread, &message);
    17611745                // Wait for it to pause
    17621746                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
     
    17761760                        libtrace_message_t message = {0};
    17771761                        message.code = MESSAGE_DO_PAUSE;
    1778                         trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message);
     1762                        trace_message_thread(libtrace, &libtrace->perpkt_threads[i], &message);
    17791763                        if(trace_has_dedicated_hasher(libtrace)) {
    17801764                                // The hasher has stopped and other threads have messages waiting therefore
     
    18091793
    18101794        // Deal with the reporter
    1811         if (trace_has_dedicated_reporter(libtrace)) {
     1795        if (trace_has_reporter(libtrace)) {
    18121796                if (libtrace->config.debug_state)
    18131797                        fprintf(stderr, "Reporter thread is running, asking it to pause ...");
    18141798                libtrace_message_t message = {0};
    18151799                message.code = MESSAGE_DO_PAUSE;
    1816                 trace_send_message_to_thread(libtrace, &libtrace->reporter_thread, &message);
     1800                trace_message_thread(libtrace, &libtrace->reporter_thread, &message);
    18171801                // Wait for it to pause
    18181802                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
     
    18731857
    18741858        message.code = MESSAGE_DO_STOP;
    1875         trace_send_message_to_perpkts(libtrace, &message);
     1859        trace_message_perpkts(libtrace, &message);
    18761860        if (trace_has_dedicated_hasher(libtrace))
    1877                 trace_send_message_to_thread(libtrace, &libtrace->hasher_thread, &message);
     1861                trace_message_thread(libtrace, &libtrace->hasher_thread, &message);
    18781862
    18791863        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
    1880                 trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message);
     1864                trace_message_thread(libtrace, &libtrace->perpkt_threads[i], &message);
    18811865        }
    18821866
     
    19861970                // Cannot destroy vector yet, this happens with trace_destroy
    19871971        }
    1988         // TODO consider perpkt threads marking trace as finished before join is called
    1989         libtrace_change_state(libtrace, STATE_FINSHED, true);
    1990 
    1991         if (trace_has_dedicated_reporter(libtrace)) {
     1972
     1973        if (trace_has_reporter(libtrace)) {
    19921974                pthread_join(libtrace->reporter_thread.tid, NULL);
    19931975                assert(libtrace->reporter_thread.state == THREAD_FINISHED);
     
    19981980                libtrace_message_t msg = {0};
    19991981                msg.code = MESSAGE_DO_STOP;
    2000                 trace_send_message_to_thread(libtrace, &libtrace->keepalive_thread, &msg);
     1982                trace_message_thread(libtrace, &libtrace->keepalive_thread, &msg);
    20011983                pthread_join(libtrace->keepalive_thread.tid, NULL);
    20021984        }
     
    20061988}
    20071989
    2008 DLLEXPORT int libtrace_thread_get_message_count(libtrace_t * libtrace)
     1990DLLEXPORT int libtrace_thread_get_message_count(libtrace_t * libtrace,
     1991                                                libtrace_thread_t *t)
    20091992{
    2010         libtrace_thread_t * t = get_thread_descriptor(libtrace);
    2011         assert(t);
    2012         return libtrace_message_queue_count(&t->messages);
    2013 }
    2014 
    2015 DLLEXPORT int libtrace_thread_get_message(libtrace_t * libtrace, libtrace_message_t * message)
     1993        int ret;
     1994        if (t == NULL)
     1995                t = get_thread_descriptor(libtrace);
     1996        if (t == NULL)
     1997                return -1;
     1998        ret = libtrace_message_queue_count(&t->messages);
     1999        return ret < 0 ? 0 : ret;
     2000}
     2001
     2002DLLEXPORT int libtrace_thread_get_message(libtrace_t * libtrace,
     2003                                          libtrace_thread_t *t,
     2004                                          libtrace_message_t * message)
    20162005{
    2017         libtrace_thread_t * t = get_thread_descriptor(libtrace);
    2018         assert(t);
    2019         return libtrace_message_queue_get(&t->messages, message);
    2020 }
    2021 
    2022 DLLEXPORT int libtrace_thread_try_get_message(libtrace_t * libtrace, libtrace_message_t * message)
     2006        int ret;
     2007        if (t == NULL)
     2008                t = get_thread_descriptor(libtrace);
     2009        if (t == NULL)
     2010                return -1;
     2011        ret = libtrace_message_queue_get(&t->messages, message);
     2012        return ret < 0 ? 0 : ret;
     2013}
     2014
     2015DLLEXPORT int libtrace_thread_try_get_message(libtrace_t * libtrace,
     2016                                              libtrace_thread_t *t,
     2017                                              libtrace_message_t * message)
    20232018{
    2024         libtrace_thread_t * t = get_thread_descriptor(libtrace);
    2025         assert(t);
    2026         return libtrace_message_queue_try_get(&t->messages, message);
    2027 }
    2028 
    2029 /**
    2030  * Return backlog indicator
    2031  */
     2019        if (t == NULL)
     2020                t = get_thread_descriptor(libtrace);
     2021        if (t == NULL)
     2022                return -1;
     2023        if (libtrace_message_queue_try_get(&t->messages, message) != LIBTRACE_MQ_FAILED)
     2024                return 0;
     2025        else
     2026                return -1;
     2027}
     2028
     2029DLLEXPORT int trace_message_thread(libtrace_t * libtrace, libtrace_thread_t *t, libtrace_message_t * message)
     2030{
     2031        int ret;
     2032        if (!message->sender)
     2033                message->sender = get_thread_descriptor(libtrace);
     2034
     2035        ret = libtrace_message_queue_put(&t->messages, message);
     2036        return ret < 0 ? 0 : ret;
     2037}
     2038
     2039DLLEXPORT int trace_message_reporter(libtrace_t * libtrace, libtrace_message_t * message)
     2040{
     2041        if (!trace_has_reporter(libtrace) ||
     2042            !(libtrace->reporter_thread.state == THREAD_RUNNING
     2043              || libtrace->reporter_thread.state == THREAD_PAUSED))
     2044                return -1;
     2045
     2046        return trace_message_thread(libtrace, &libtrace->reporter_thread, message);
     2047}
     2048
    20322049DLLEXPORT int trace_post_reporter(libtrace_t *libtrace)
    20332050{
    20342051        libtrace_message_t message = {0};
    20352052        message.code = MESSAGE_POST_REPORTER;
    2036         message.sender = get_thread_descriptor(libtrace);
    2037         return libtrace_message_queue_put(&libtrace->reporter_thread.messages, (void *) &message);
    2038 }
    2039 
    2040 /**
    2041  * Return backlog indicator
    2042  */
    2043 DLLEXPORT int trace_send_message_to_reporter(libtrace_t * libtrace, libtrace_message_t * message)
    2044 {
    2045         //printf("Sending message code=%d to reporter\n", message->code);
    2046         message->sender = get_thread_descriptor(libtrace);
    2047         return libtrace_message_queue_put(&libtrace->reporter_thread.messages, message);
    2048 }
    2049 
    2050 /**
    2051  *
    2052  */
    2053 DLLEXPORT int trace_send_message_to_thread(libtrace_t * libtrace, libtrace_thread_t *t, libtrace_message_t * message)
    2054 {
    2055         //printf("Sending message code=%d to reporter\n", message->code);
    2056         message->sender = get_thread_descriptor(libtrace);
    2057         return libtrace_message_queue_put(&t->messages, message);
    2058 }
    2059 
    2060 DLLEXPORT int trace_send_message_to_perpkts(libtrace_t * libtrace, libtrace_message_t * message)
     2053        return trace_message_reporter(libtrace, (void *) &message);
     2054}
     2055
     2056DLLEXPORT int trace_message_perpkts(libtrace_t * libtrace, libtrace_message_t * message)
    20612057{
    20622058        int i;
    2063         message->sender = get_thread_descriptor(libtrace);
     2059        int missed;
     2060        if (message->sender == NULL)
     2061                message->sender = get_thread_descriptor(libtrace);
    20642062        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
    2065                 libtrace_message_queue_put(&libtrace->perpkt_threads[i].messages, message);
    2066         }
    2067         //printf("Sending message code=%d to reporter\n", message->code);
    2068         return 0;
    2069 }
    2070 
    2071 DLLEXPORT void libtrace_result_set_key(libtrace_result_t * result, uint64_t key) {
    2072         result->key = key;
    2073 }
    2074 DLLEXPORT uint64_t libtrace_result_get_key(libtrace_result_t * result) {
    2075         return result->key;
    2076 }
    2077 DLLEXPORT void libtrace_result_set_value(libtrace_result_t * result, libtrace_generic_t value) {
    2078         result->value = value;
    2079 }
    2080 DLLEXPORT libtrace_generic_t libtrace_result_get_value(libtrace_result_t * result) {
    2081         return result->value;
    2082 }
    2083 DLLEXPORT void libtrace_result_set_key_value(libtrace_result_t * result, uint64_t key, libtrace_generic_t value) {
    2084         result->key = key;
    2085         result->value = value;
    2086 }
    2087 DLLEXPORT void trace_destroy_result(libtrace_result_t ** result) {
    2088         free(*result);
    2089         result = NULL;
    2090         // TODO automatically back with a free list!!
     2063                if (libtrace->perpkt_threads[i].state == THREAD_RUNNING ||
     2064                    libtrace->perpkt_threads[i].state == THREAD_PAUSED) {
     2065                        libtrace_message_queue_put(&libtrace->perpkt_threads[i].messages, message);
     2066                } else {
     2067                        missed += 1;
     2068                }
     2069        }
     2070        return -missed;
    20912071}
    20922072
     
    21582138}
    21592139
    2160 DLLEXPORT int trace_finished(libtrace_t * libtrace) {
    2161         // TODO I don't like using this so much, we could use state!!!
    2162         return libtrace->perpkt_thread_states[THREAD_FINISHED] == libtrace->perpkt_thread_count;
     2140DLLEXPORT bool trace_has_finished(libtrace_t * libtrace) {
     2141        return libtrace->state == STATE_FINSHED || libtrace->state == STATE_JOINED;
    21632142}
    21642143
     
    22692248}
    22702249
    2271 DLLEXPORT libtrace_packet_t* trace_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet) {
    2272         libtrace_packet_t* result;
    2273         libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &result, 1, 1);
    2274         assert(result);
    2275         swap_packets(result, packet); // Move the current packet into our copy
    2276         return result;
    2277 }
    2278 
    2279 DLLEXPORT void trace_free_result_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
    2280         // Try write back the packet
     2250DLLEXPORT void trace_free_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
    22812251        assert(packet);
    2282         // Always release any resources this might be holding such as a slot in a ringbuffer
     2252        /* Always release any resources this might be holding */
    22832253        trace_fin_packet(packet);
    22842254        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packet, 1, 1);
  • test/test-format-parallel-hasher.c

    r6a082f8 r6a6e6a8  
    107107        switch (mesg) {
    108108        case MESSAGE_RESULT:
    109                 assert(libtrace_result_get_key(data.res) == 0);
    110                 printf("%d,", libtrace_result_get_value(data.res).sint);
     109                assert(data.res->key == 0);
     110                printf("%d,", data.res->value.sint);
    111111                totalthreads++;
    112                 totalpkts += libtrace_result_get_value(data.res).sint;
    113                 assert(libtrace_result_get_value(data.res).sint == 25 ||
    114                        libtrace_result_get_value(data.res).sint == expected - 25);
     112                totalpkts += data.res->value.sint;
     113                assert(data.res->value.sint == 25 ||
     114                       data.res->value.sint == expected - 25);
    115115                break;
    116116        case MESSAGE_STARTING:
     
    166166                // All threads publish to verify the thread count
    167167                assert(tls->count == 25 || tls->count == 75);
    168                 trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_t){.sint=tls->count}, RESULT_NORMAL);
     168                trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_t){.sint=tls->count}, RESULT_USER);
    169169                trace_post_reporter(trace);
    170170                free(tls);
    171171                break;
    172         case MESSAGE_TICK:
     172        case MESSAGE_TICK_INTERVAL:
     173        case MESSAGE_TICK_COUNT:
    173174                assert(tls->seen_start_message );
    174175                fprintf(stderr, "Not expecting a tick packet\n");
  • test/test-format-parallel-reporter.c

    r6a082f8 r6a6e6a8  
    102102        switch (mesg) {
    103103        case MESSAGE_RESULT:
    104                 packet = libtrace_result_get_value(data.res).pkt;
    105                 assert(libtrace_result_get_key(data.res) == trace_packet_get_order(packet));
     104                packet = data.res->value.pkt;
     105                assert(data.res->key == trace_packet_get_order(packet));
    106106                if(last == (uint64_t)-1) {
    107                         last = libtrace_result_get_key(data.res);
     107                        last = data.res->key;
    108108                } else {
    109                         assert (last < libtrace_result_get_key(data.res));
    110                         last = libtrace_result_get_key(data.res);
     109                        assert (last < data.res->key);
     110                        last = data.res->key;
    111111                }
    112112                pktcount++;
    113                 trace_free_result_packet(libtrace, packet);
     113                trace_free_packet(libtrace, packet);
    114114                break;
    115115        case MESSAGE_STOPPING:
  • test/test-format-parallel-singlethreaded-hasher.c

    r6a082f8 r6a6e6a8  
    108108        switch (mesg) {
    109109        case MESSAGE_RESULT:
    110                 assert(libtrace_result_get_key(data.res) == 0);
    111                 printf("%d,", libtrace_result_get_value(data.res).sint);
     110                assert(data.res->key == 0);
     111                printf("%d,", data.res->value.sint);
    112112                totalthreads++;
    113                 totalpkts += libtrace_result_get_value(data.res).sint;
     113                totalpkts += data.res->value.sint;
    114114                break;
    115115        case MESSAGE_STARTING:
     
    164164
    165165                // All threads publish to verify the thread count
    166                 trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_t){.sint=tls->count}, RESULT_NORMAL);
     166                trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_t){.sint=tls->count}, RESULT_USER);
    167167                trace_post_reporter(trace);
    168168                free(tls);
    169169                break;
    170         case MESSAGE_TICK:
     170        case MESSAGE_TICK_INTERVAL:
     171        case MESSAGE_TICK_COUNT:
    171172                assert(tls->seen_start_message );
    172173                fprintf(stderr, "Not expecting a tick packet\n");
  • test/test-format-parallel-singlethreaded.c

    r6a082f8 r6a6e6a8  
    107107        switch (mesg) {
    108108        case MESSAGE_RESULT:
    109                 assert(libtrace_result_get_key(data.res) == 0);
    110                 printf("%d,", libtrace_result_get_value(data.res).sint);
     109                assert(data.res->key == 0);
     110                printf("%d,", data.res->value.sint);
    111111                totalthreads++;
    112                 totalpkts += libtrace_result_get_value(data.res).sint;
     112                totalpkts += data.res->value.sint;
    113113                break;
    114114        case MESSAGE_STARTING:
     
    163163
    164164                // All threads publish to verify the thread count
    165                 trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_t){.sint=tls->count}, RESULT_NORMAL);
     165                trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_t){.sint=tls->count}, RESULT_USER);
    166166                trace_post_reporter(trace);
    167167                free(tls);
    168168                break;
    169         case MESSAGE_TICK:
     169        case MESSAGE_TICK_INTERVAL:
     170        case MESSAGE_TICK_COUNT:
    170171                assert(tls->seen_start_message );
    171172                fprintf(stderr, "Not expecting a tick packet\n");
  • test/test-format-parallel-stressthreads.c

    r6a082f8 r6a6e6a8  
    107107        switch (mesg) {
    108108        case MESSAGE_RESULT:
    109                 assert(libtrace_result_get_key(data.res) == 0);
    110                 printf("%d,", libtrace_result_get_value(data.res).sint);
     109                assert(data.res->key == 0);
     110                printf("%d,", data.res->value.sint);
    111111                totalthreads++;
    112                 totalpkts += libtrace_result_get_value(data.res).sint;
     112                totalpkts += data.res->value.sint;
    113113                break;
    114114        case MESSAGE_STARTING:
     
    163163
    164164                // All threads publish to verify the thread count
    165                 trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_t){.sint=tls->count}, RESULT_NORMAL);
     165                trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_t){.sint=tls->count}, RESULT_USER);
    166166                trace_post_reporter(trace);
    167167                free(tls);
    168168                break;
    169         case MESSAGE_TICK:
     169        case MESSAGE_TICK_INTERVAL:
     170        case MESSAGE_TICK_COUNT:
    170171                assert(tls->seen_start_message );
    171172                fprintf(stderr, "Not expecting a tick packet\n");
  • test/test-format-parallel.c

    r6a082f8 r6a6e6a8  
    106106        switch (mesg) {
    107107        case MESSAGE_RESULT:
    108                 assert(libtrace_result_get_key(data.res) == 0);
    109                 printf("%d,", libtrace_result_get_value(data.res).sint);
     108                assert(data.res->key == 0);
     109                printf("%d,", data.res->value.sint);
    110110                totalthreads++;
    111                 totalpkts += libtrace_result_get_value(data.res).sint;
     111                totalpkts += data.res->value.sint;
    112112                break;
    113113        case MESSAGE_STARTING:
     
    173173
    174174                // All threads publish to verify the thread count
    175                 trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_t){.sint = count}, RESULT_NORMAL);
     175                trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_t){.sint = count}, RESULT_USER);
    176176                trace_post_reporter(trace);
    177177                break;
    178         case MESSAGE_TICK:
     178        case MESSAGE_TICK_INTERVAL:
     179        case MESSAGE_TICK_COUNT:
    179180                assert(seen_start_message);
    180181                fprintf(stderr, "Not expecting a tick packet\n");
  • tools/traceanon/traceanon_parallel.c

    r6a082f8 r6a6e6a8  
    161161        libtrace_udp_t *udp = NULL;
    162162        libtrace_tcp_t *tcp = NULL;
    163 
     163        libtrace_stat_t *stats = NULL;
    164164        switch (mesg) {
    165165        case MESSAGE_PACKET:
     
    199199                enc_init(enc_type,key);
    200200                break;
    201         case MESSAGE_TICK:
    202                 trace_publish_result(trace, t, data.uint64, (libtrace_generic_t){0}, RESULT_TICK);
     201        case MESSAGE_TICK_INTERVAL:
     202                trace_publish_result(trace, t, data.uint64, (libtrace_generic_t){0}, RESULT_TICK_INTERVAL);
     203                break;
     204        case MESSAGE_TICK_COUNT:
     205                trace_publish_result(trace, t, data.uint64, (libtrace_generic_t){0}, RESULT_TICK_COUNT);
     206                break;
     207        case MESSAGE_STOPPING:
     208                stats = trace_create_statistics();
     209                trace_get_thread_statistics(trace, t, stats);
     210                trace_print_statistics(stats, stderr, NULL);
     211                free(stats);
     212                stats = trace_get_statistics(trace, NULL);
     213                trace_print_statistics(stats, stderr, NULL);
     214                //fprintf(stderr, "tracestats_parallel:\t Stopping thread - publishing results\n");
    203215                break;
    204216        }
     
    216228        case MESSAGE_RESULT:
    217229                if (data.res->type == RESULT_PACKET) {
    218                         libtrace_packet_t *packet = (libtrace_packet_t*) libtrace_result_get_value(data.res).pkt;
    219                         assert(libtrace_result_get_key(data.res) == packet_count++);
     230                        libtrace_packet_t *packet = (libtrace_packet_t*) data.res->value.pkt;
     231                        assert(data.res->key >= packet_count);
     232                        packet_count = data.res->key;
    220233                        if (trace_write_packet(writer,packet)==-1) {
    221234                                trace_perror_output(writer,"writer");
    222235                                trace_interrupt();
    223236                        }
    224                         trace_free_result_packet(trace, packet);
     237                        trace_free_packet(trace, packet);
    225238
    226239                } else {
    227                         assert(data.res->type == RESULT_TICK);
     240                        assert(data.res->type == RESULT_TICK_COUNT || data.res->type == RESULT_TICK_INTERVAL);
    228241                        // Ignore it
    229242                }
  • tools/tracertstats/tracertstats_parallel.c

    r6a082f8 r6a6e6a8  
    147147        switch (mesg) {
    148148                case MESSAGE_RESULT:
    149                 ts = libtrace_result_get_key(data.res);
    150                 res = libtrace_result_get_value(data.res).ptr;
     149                ts = data.res->key;
     150                res = data.res->value.ptr;
    151151                if (last_ts == 0)
    152152                        last_ts = ts;
     
    195195                        // Publish and make a new one new
    196196                        //fprintf(stderr, "Publishing result %"PRIu64"\n", last_ts);
    197                         trace_publish_result(trace, t, (uint64_t) last_ts, tmp, RESULT_NORMAL);
     197                        trace_publish_result(trace, t, (uint64_t) last_ts, tmp, RESULT_USER);
    198198                        trace_post_reporter(trace);
    199199                        results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
     
    225225                if (results->total.count) {
    226226                        libtrace_generic_t tmp = {.ptr = results};
    227                         trace_publish_result(trace, t, (uint64_t) last_ts, tmp, RESULT_NORMAL);
     227                        trace_publish_result(trace, t, (uint64_t) last_ts, tmp, RESULT_USER);
    228228                        trace_post_reporter(trace);
    229229                        results = NULL;
     
    231231                break;
    232232
    233                 case MESSAGE_TICK:
     233        case MESSAGE_TICK_INTERVAL:
     234        case MESSAGE_TICK_COUNT:
    234235                {
    235236                        int64_t offset;
    236237                        struct timeval *tv, tv_real;
    237238                        libtrace_packet_t *first_packet = NULL;
    238                         retrive_first_packet(trace, &first_packet, &tv);
     239                        trace_get_first_packet(trace, NULL, &first_packet, &tv);
    239240                        if (first_packet != NULL) {
    240241                                // So figure out our running offset
     
    247248                                        libtrace_generic_t tmp = {.ptr = results};
    248249                                        //fprintf(stderr, "Got a tick and publishing early!!\n");
    249                                         trace_publish_result(trace, t, (uint64_t) last_ts, tmp, RESULT_NORMAL);
     250                                        trace_publish_result(trace, t, (uint64_t) last_ts, tmp, RESULT_USER);
    250251                                        trace_post_reporter(trace);
    251252                                        results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
  • tools/tracestats/tracestats_parallel.c

    r6a082f8 r6a6e6a8  
    139139                trace_print_statistics(stats, stderr, NULL);
    140140                free(stats);
    141                 trace_publish_result(trace, t, 0, (libtrace_generic_t){.ptr = results}, RESULT_NORMAL); // Only ever using a single key 0
     141                trace_publish_result(trace, t, 0, (libtrace_generic_t){.ptr = results}, RESULT_USER); // Only ever using a single key 0
    142142                //fprintf(stderr, "tracestats_parallel:\t Stopping thread - publishing results\n");
    143143                break;
     
    168168        case MESSAGE_RESULT:
    169169                /* Get the results from each core and sum 'em up */
    170                 assert(libtrace_result_get_key(data.res) == 0);
    171                 statistics_t * res = libtrace_result_get_value(data.res).ptr;
     170                assert(data.res->key == 0);
     171                statistics_t * res = data.res->value.ptr;
    172172                count += res[0].count;
    173173                bytes += res[0].bytes;
Note: See TracChangeset for help on using the changeset viewer.