• ## lib/libtrace_int.h

 rf625817 void register_format(struct libtrace_format_t *format); /** Converts a timeval into a timestamp in microseconds since the epoch. * * @param tv    The timeval to be converted. * @return A 64 bit timestamp in microseconds since the epoch. */ uint64_t tv_to_usec(const struct timeval *tv); /** Converts a PCAP DLT into a libtrace link type. *
• ## lib/libtrace_parallel.h

 r5478d3d * This file is part of libtrace * * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton, * Copyright (c) 2007-2015 The University of Waikato, Hamilton, * New Zealand. * * Authors: Richard Sanger *          Shane Alcock * * All rights reserved. * @author Richard Sanger * * @version $Id$ * * The parallel libtrace framework is a replacement to the libtrace framework. XXX TODO MAKE MORE DOCS HERE. * @version 4.0.0 * * The parallel libtrace framework is a replacement to the libtrace framework * that allows packet processing workload to be spread over multiple threads. * It can also take advantage of native parallelism in the packet capture * source. */ /** * A collection of types for convenience used in place of a * simple void* to allow a any type of data to be stored. * simple void* to allow any type of data to be stored and passed * around easily. * * This is expected to be 8 bytes in length. ct_assert(sizeof(libtrace_generic_t) == 8); /** * Structure describing a message that can be sent to a libtrace thread. */ typedef struct libtrace_message_t { int code; /**< The message code see enum libtrace_messages */ int code; /**< The message code, as defined in enum libtrace_messages */ libtrace_generic_t data; /**< Additional data related to the message */ libtrace_thread_t *sender; /**< The thread that sent the message */ /** Structure holding information about a result */ struct libtrace_result_t { uint64_t key; libtrace_generic_t value; int type; uint64_t key;   /**< The unique key for the result */ libtrace_generic_t value;  /**< The result value itself */ int type; /**< Describes the type of result, see enum result_types */ }; * All libtrace messages are defined and documented here. * * Some messages can be sent to control the library while others * are received by the per-packet and reporter functions to inform the libtrace * application. * * If a user wishes to send there own custom messages they should use * Some messages can be sent to control the internal behaviour of the library * while others are used to trigger the user-defined callback functions. * If a user wishes to send their own custom messages, they should use * numbers greater than MESSAGE_USER (1000). * */ enum libtrace_messages { /** A libtrace packet is ready, this will only be sent to per * packet threads. * @param data Holds the packet in data.pkt. The packet belongs to * libtrace and should either be returned from the per-packet function * if no longer needed or free'd at some later time using the XXX * function. * @param sender The sender will be set as the current thread /** A libtrace packet is ready, this will trigger the packet callback *  for the processing threads. */ MESSAGE_PACKET, /** A libtrace result is ready, this will only be sent to the reporter * thread. * @param data Holds the result in data.res. The memory holding the * result is allocated by libtrace and should not be free'd. However * note that any data stored within the result might need to be free'd. * @param sender The sender will be set as the current thread /** A libtrace result is ready, this will trigger the result callback *  for the reporter thread. */ MESSAGE_RESULT, /** A message sent to each thread when it starts. This is sent * to both the reporter and per-packet threads. This will be sent once * after trace_pstart() (assuming no errors occurs). * * This can be used to allocate resources required by each thread. * * These can be free'd when MESSAGE_STOPPING is received. * * @param data unused, do not use this * @param sender The sender will be set as the current thread * @return When using a function callback for starting, the returned * value is stored against the thread tls. Otherwise the return is ignored. /** This message is sent to each thread when it first starts and will *  trigger the starting callback for the processing and reporter *  threads. A starting message is sent when trace_pstart is called *  for the first time on a trace. */ MESSAGE_STARTING, /** A message sent to each thread when it stops. This is sent * to both the reporter and per-packet threads. This will be sent once * after MESSAGE_STARTING. * * This can be used to free any resources allocated with * MESSAGE_STARTING. * * @param data unused, do not use this * @param sender The sender will be set as the current thread /** This message is sent to each thread when the thread ends and will *  trigger the stopping callback for the processing and reporter *  threads. */ MESSAGE_STOPPING, /** A message sent to each thread when a thread transitions between a * paused (or unstarted) state to running state. This is sent * to both the reporter and per-packet threads. This will be sent after * MESSAGE_STARTING when a trace is first started and when a trace * is started (trace_pstart()) after a pause (trace_ppause()). * * This can be used to allocate resources. * * @param data unused, do not use this * @param sender The sender will be set as the current thread */ /** This message is sent to each thread when the thread transitions *  from a paused state to a running state. It will trigger the *  resuming callback for the processing and reporter threads. * *  A resuming message is sent whenever trace_pstart is called on a *  trace (including the first time the trace is started). */ MESSAGE_RESUMING, /** A message sent to each thread when a thread transitions between a * paused (or unstarted) state to running state. This is sent * to both the reporter and per-packet threads. This will be sent after * MESSAGE_STARTING when a trace is first started and when a trace * is started (trace_pstart()) after a pause (trace_ppause()). * * This can be used to allocate resources. * * @param data unused, do not use this * @param sender The sender will be set as the current thread */ /** This message is sent to each thread when the thread transitions *  into a paused state from a running state. It will trigger the *  pausing callback for the processing and reporter threads. * *  A pausing message is sent whenever trace_ppause is called on a *  trace. It will also be sent when a trace is stopped, as all traces *  are implicitly paused before they stop. */ MESSAGE_PAUSING, /** An internal message do not use this */ /** An internal message for forcing another thread to pause. Do not *  use this in user-defined callbacks! */ MESSAGE_DO_PAUSE, /** An internal message do not use this */ /** An internal message for forcing another thread to stop. Do not *  use this in user-defined callbacks! */ MESSAGE_DO_STOP, /** Sent to all per-packet threads (including the sender) and the * reducer when the first packet is seen for a thread. * * @param data The first packet is stored in data.pkt. This packet is * shared by all threads receiving the message and is valid until * MESSAGE_PAUSING is received. * @param sender The per-packet thread which received the packet * * @note Upon pausing and restarting a trace this will be reset and * sent once a new packet is encountered * * @see trace_get_first_packet() */ /** This message is sent to each processing thread as soon as the first *  packet has been seen by any of the processing threads. This will *  trigger the first_packet callback for the processing threads, *  allowing the threads to perform any initialisation required based *  on the properties of the first packet (e.g. the timestamp). * *  Threads should use trace_get_first_packet() to access the packet *  that triggered this message. * *  @note Upon pausing and restarting a trace, this message will be *  sent again when the first new packet is encountered. */ MESSAGE_FIRST_PACKET, /** Notify the reporter thread more data is available. * * Triggers the reporter to read as many results as possible. * * @param data unused * @param sender the sending * * @note This message should not be sent directly instead call * trace_post_reporter() * /** An internal message for notifying the reporter thread that more *  results are available. * *  Do not use this in user-defined callbacks -- call *  trace_post_reporter() instead. */ MESSAGE_POST_REPORTER, * @param sender should be ignored */ /** This message is sent to the processing threads periodically, after *  the configured time interval has passed. This message will *  trigger the tick_interval callback function for the processing *  threads. * *  This message is sent out-of-band relative to packet messages and *  therefore can appear after a packet with a later timestamp or *  before a packet with an earlier timestamp. */ MESSAGE_TICK_INTERVAL, * @param sender Set to the current per-packet thread */ /** This message is sent to the processing threads periodically, after *  the configured number of packets have been read from the input *  trace. This message will trigger the tick_count callback function *  for the processing threads. * *  This message is sent in-band relative to packet messages and *  will always appear in the right place relative to the other packets *  observed by the thread. */ MESSAGE_TICK_COUNT, /** For specific user defined messages use codes of MESSAGE_USER or above. */ /** All message codes at or above this value represent custom *  user-defined messages and will trigger the usermessage callback *  for the processing threads. */ MESSAGE_USER = 1000 }; /** The hasher types available to libtrace application. * These can be selected using trace_set_hasher(). /** The hasher types that are available to libtrace applications. *  These can be selected using trace_set_hasher(). */ enum hasher_types { /** Balance load across per-packet threads as best as possible, this is * basically to say I do not care about where packets are sent. This * might still might be implemented using a hash or round robin etc. * depending on the format and libtrace configuration. /** Balance load across per-packet threads as best as possible, i.e *  the program does not care which thread sees a given packet. This *  will be implemented using a hash or round robin, depending on the * format and libtrace configuration. */ HASHER_BALANCE, /** Use a hash which is bi-directional for TCP and UDP flows, that is * packets with the same 5-tuple are sent to the same per-packet thread. /** Use a hash which is bi-directional for TCP and UDP flows, such that * packets with the same 5-tuple are sent to the same processing thread. * All non TCP/UDP packets will be sent to the same thread. * * @note it is possible that UDP packets may not be spread across * per-packet threads, depending upon the format support. In this case * they would be directed to a single per-packet thread. * processing threads, depending upon the format support. In this case * they would be directed to a single thread. */ HASHER_BIDIRECTIONAL, /** Use a hash which is uni-directional across TCP and UDP flows, this * means the opposing directions of the same 5-tuple might end up on * different per-packet threads. * Otherwise this is identical to HASHER_BIDIRECTIONAL /** Use a hash which is uni-directional across TCP and UDP flows, such * that the opposing directions of the same 5-tuple may end up on * different processing threads. * Otherwise this is identical to HASHER_BIDIRECTIONAL. */ HASHER_UNIDIRECTIONAL, /** * Always use the user supplied hasher, this disables native * support in and is likely significantly slower. * This value indicates that the hasher is a custom user-defined * function. */ HASHER_CUSTOM /** * The maximum number of threads supported by a parallel trace. 1 * if parallel support is not native (in this case libtrace will simulate * an unlimited number of threads), -1 means unlimited and 0 unknown. * if parallel support is not native (in this case libtrace will * simulate an unlimited number of threads), -1 means unlimited and 0 * unknown. */ int max_threads; } libtrace_info_t; /** * The methods we use to combine multiple outputs into a single output * This is not considered a stable API however is public. * Where possible use built in combiners. * * @note this structure is duplicated per trace and as such can * have functions rewritten, and in fact should if possible. */ typedef struct libtrace_combine libtrace_combine_t; /** * The methods we use to combine the results from multiple processing * threads into a single output. Users can write their own combiners, but * we strongly recommend that you use one of the provided combiners. * */ struct libtrace_combine { /** * Called when the trace ends, clean up any memory here * from libtrace_t * init. * Called when the trace ends, clean up any memory allocated * by the initialise function. */ void (*destroy)(libtrace_t *, libtrace_combine_t *); /** * Publish a result against it's a threads queue. * If null publish directly, expected to be used * as a single threaded optimisation and can be * set to NULL by init if this case is detected. * Receive a result from a processing thread. Most implementations * of this function will push the result into an appropriate * queue. If this is NULL, the result will automatically be pushed * to the reporter thread. */ void (*publish)(libtrace_t *, int thread_id, libtrace_combine_t *, libtrace_result_t *); /** * Read as many results as possible from the trace. Each result * that is read should cause a MESSAGE_RESULT to be sent to the * reporter thread. * * TODO this is old info */ void (*publish)(libtrace_t *, int thread_id, libtrace_combine_t *, libtrace_result_t *); /** * Read as many results as possible from the trace. * Directly calls the users code to handle results from here. * THIS SHOULD BE NON-BLOCKING AND READ AS MANY AS POSSIBLE! * If publish is NULL, this probably should be NULL as it will not be * called in that case. */ void (*read)(libtrace_t *, libtrace_combine_t *); /** * Called when the trace is finished to flush the final * results to the reporter thread. Any leftover results should * cause a MESSAGE_RESULT to be sent to the reporter thread. * * THIS SHOULD BE NON-BLOCKING AND READ AS MANY AS POSSIBLE * If publish is NULL, this probably should be NULL also otherwise * it will not be called. */ void (*read)(libtrace_t *, libtrace_combine_t *); /** * Called when the trace is finished to flush the final * results to the reporter thread. * * There may be no results, in which case this should * There may be no results, in which case this function should * just return. * * Libtrace state: * Called from reporter thread * No perpkt threads will be running, i.e. publish will not be * called again. * This function will be called from the reporter thread. * No processing threads will be running, i.e. you can assume that * publish will not be called again. * * If publish is NULL, this probably should be NULL also otherwise * it will not be called. * If publish is NULL, this probably should be NULL as it will not be * called in that case. */ void (*read_final)(libtrace_t *, libtrace_combine_t *); /** * Pause must make sure any results of the type packet are safe. * That means trace_copy_packet() and destroy the original. * This also should be NULL if publish is NULL. * Pause must make sure any queued results that contain packets are * safe. See libtrace_make_result_safe() for more details on what it * means for a result to be safe. * This function should be NULL if publish is NULL. */ void (*pause)(libtrace_t *, libtrace_combine_t *); void *queues; /** The last counter tick that we saw, so we can avoid duplicating *  any ticks that are published. */ uint64_t last_count_tick; /** The last timestamp tick that we saw, so we can avoid duplicating *  any ticks that are published. */ uint64_t last_ts_tick; /** * The definition for the main function that the user supplies to process * messages. * * @param trace The trace the packet is related to. * @param thread The thread identifier. * @param mesg_code The type of data ready, the most important being MESSAGE_PACKET. * In this case data.pkt contains the packet. * @param data A generic union of types that fit into 8 bytes, containing * information dependent upon the mesg_code. * @param sender The thread from which the message originated. * @return If the message type is MESSAGE_PACKET a packet can be returned back * to the library similar to trace_free_packet() otherwise this should be NULL. * * The values of data and sender depend upon the mesg_code. Please see the * documentation for the message as to what value these will contain. */ typedef void* (*fn_cb_msg)(libtrace_t* trace, libtrace_thread_t *thread, int mesg_code, libtrace_generic_t data, libtrace_thread_t *sender); /** * The definition for the main function that the user supplies to process * results from trace_publish_result(). * * @param trace The trace the packet is related to. * @param mesg_code The type of data ready, the most important being MESSAGE_RESULT. * In this case data.res contains the result. * @param data A generic union of types that fit into 8 bytes, containing * information dependent upon the mesg_code. * @param sender The thread from which the message originated. * * The values of data and sender depend upon the mesg_code. Please see the * documentation for the message as to what value these will contain. */ typedef void (*fn_reporter)(libtrace_t* trace, int mesg_code, libtrace_generic_t data, libtrace_thread_t *sender); /** * The definition for a hasher function, allowing matching packets to be * directed to the same per packet thread for processing. * directed to the correct thread for processing. * * @param packet The packet to be hashed. * @param data A void pointer which can contain additional information, * such as configuration of the hasher function. * such as configuration for the hasher function. * * @return The id of the thread that should receive this packet. */ typedef uint64_t (*fn_hasher)(const libtrace_packet_t* packet, void *data); * * @param libtrace The input trace to start * @param global_blob Global data related to this trace accessible using trace_get_global() * @param per_packet_cbs A set of user supplied functions to be called in response to events being observed by the per_pkt threads. * @param reporter_cbs A set of user supplied functions to be called in response to events / results being seen by the reporter thread. * Optional if NULL the reporter thread will not be started. * @param global_blob Global data related to this trace. This may be NULL if *    no global data is required. * @param per_packet_cbs A set of user supplied functions to be called in *   response to messages that are observed by the processing threads. * @param reporter_cbs A set of user supplied functions to be called in *   response to messages being seen by the reporter thread. * Optional if NULL, the reporter thread will not be started. * @return 0 on success, otherwise -1 to indicate an error has occurred * /** * * @param libtrace The parallel trace * @param t The thread that is running * @param global The global storage * @return The returned value is stored against the threads tls. *         This is typically passed as tls argument to other messages. * The starting callback for a processing or reporting thread. Use this * callback to allocate and initialise any thread-local storage that you * would like to be available in other callbacks. * * @param libtrace The parallel trace. * @param t The thread that has just started. * @param global The global storage for the trace. * * @return The returned value is stored against the thread's local storage. *         This is typically passed as the 'tls' argument to other callbacks. */ typedef void* (*fn_cb_starting)(libtrace_t *libtrace, /** * @param libtrace The parallel trace * @param t The thread that is running * @param global The global storage * @param tls The thread local storage * A callback function for any message that does not require any specific * data, e.g. stopping, pausing, or resuming callbacks. * * @param libtrace The parallel trace. * @param t The thread that is running. * @param global The global storage. * @param tls The thread local storage. */ typedef void (*fn_cb_dataless)(libtrace_t *libtrace, /** * @param libtrace The parallel trace * @param t The thread that is running * @param global The global storage * @param tls The thread local storage * A callback function for a first packet message seen by a processing thread. * @param libtrace The parallel trace. * @param t The thread that is running. * @param global The global storage. * @param tls The thread local storage. * @param sender The thread that saw the first packet. */ typedef void (*fn_cb_first_packet)(libtrace_t *libtrace, /** * @param libtrace The parallel trace * @param t The thread that is running * @param global The global storage * @param tls The thread local storage * @param uint64_t Either the timestamp or packet count depending on message type * A callback function for handling a tick message within a processing thread. * * @param libtrace The parallel trace. * @param t The thread that is running. * @param global The global storage. * @param tls The thread local storage. * @param uint64_t The value of the tick; either a timestamp or packet count *    depending on the type of tick. */ typedef void (*fn_cb_tick)(libtrace_t *libtrace, /** * @param libtrace The parallel trace * @param t The thread * @param packet The packet associated with the message * @param global The global storage * @param tls The thread local storage * * @return optionally a packet which is handed back to the library, *         typically this is the packet supplied. Otherwise NULL. * A callback function triggered when a processing thread receives a packet. * * @param libtrace The parallel trace. * @param t The thread that is running * @param global The global storage. * @param tls The thread local storage. * @param packet The packet to be processed. * * @return either the packet itself if it is not being published as a result *   or NULL otherwise. If returning NULL, it is the user's responsibility *   to ensure the packet is freed when the reporter thread is finished with it. */ typedef libtrace_packet_t* (*fn_cb_packet)(libtrace_t *libtrace, * reporter thread. * * @param libtrace The parallel trace * @param sender The thread that generated this result * @param global The global storage * @param tls The thread local storage * @param result The result associated with the message * @param libtrace The parallel trace. * @param sender The thread that generated this result. * @param global The global storage. * @param tls The thread local storage. * @param result The result associated with the message. * */ * any messages with a type >= MESSAGE_USER. * * @param libtrace The parallel trace * @param t The thread * @param global The global storage * @param tls The thread local storage * @param mesg The code identifying the message type * @param data The data associated with the message * @param libtrace The parallel trace. * @param t The thread. * @param global The global storage. * @param tls The thread local storage. * @param mesg The code identifying the message type. * @param data The data associated with the message. * */ void *global, void *tls, int mesg, libtrace_generic_t data); /** Registers a built-in message with a handler. * Note we do not include the sending thread as an argument to the reporter. * If set to NULL, the message will be sent to default perpkt handler. * * @param libtrace The input trace to start * @param handler the handler to be called when the message is received * @return 0 if successful otherwise -1. */ /** * Registers a starting callback against a callback set. * * @param cbset The callback set. * @param handler The starting callback function. * @return 0 if successful, -1 otherwise. */ DLLEXPORT int trace_set_starting_cb(libtrace_callback_set_t *cbset, fn_cb_starting handler); /** * Registers a stopping callback against a callback set. * * @param cbset The callback set. * @param handler The stopping callback function. * @return 0 if successful, -1 otherwise. */ DLLEXPORT int trace_set_stopping_cb(libtrace_callback_set_t *cbset, fn_cb_dataless handler); /** * Registers a resuming callback against a callback set. * * @param cbset The callback set. * @param handler The resuming callback function. * @return 0 if successful, -1 otherwise. */ DLLEXPORT int trace_set_resuming_cb(libtrace_callback_set_t *cbset, fn_cb_dataless handler); /** * Registers a pausing callback against a callback set. * * @param cbset The callback set. * @param handler The pausing callback function. * @return 0 if successful, -1 otherwise. */ DLLEXPORT int trace_set_pausing_cb(libtrace_callback_set_t *cbset, fn_cb_dataless handler); /** * Registers a packet callback against a callback set. * * @param cbset The callback set. * @param handler The packet callback function. * @return 0 if successful, -1 otherwise. */ DLLEXPORT int trace_set_packet_cb(libtrace_callback_set_t *cbset, fn_cb_packet handler); /** * Registers a first packet callback against a callback set. * * @param cbset The callback set. * @param handler The first packet callback function. * @return 0 if successful, -1 otherwise. */ DLLEXPORT int trace_set_first_packet_cb(libtrace_callback_set_t *cbset, fn_cb_first_packet handler); /** * Registers a result callback against a callback set. * * @param cbset The callback set. * @param handler The result callback function. * @return 0 if successful, -1 otherwise. */ DLLEXPORT int trace_set_result_cb(libtrace_callback_set_t *cbset, fn_cb_result handler); /** * Registers a tick counter callback against a callback set. * * @param cbset The callback set. * @param handler The tick callback function. * @return 0 if successful, -1 otherwise. */ DLLEXPORT int trace_set_tick_count_cb(libtrace_callback_set_t *cbset, fn_cb_tick handler); /** * Registers a tick interval callback against a callback set. * * @param cbset The callback set. * @param handler The tick callback function. * @return 0 if successful, -1 otherwise. */ DLLEXPORT int trace_set_tick_interval_cb(libtrace_callback_set_t *cbset, fn_cb_tick handler); /** * Registers a callback for custom user messages against a callback set. * * @param cbset The callback set. * @param handler The user message callback function. * @return 0 if successful, -1 otherwise. */ DLLEXPORT int trace_set_user_message_cb(libtrace_callback_set_t *cbset, fn_cb_usermessage handler); DLLEXPORT libtrace_callback_set_t *trace_create_callback_set(void); /** Destroys a callback set, freeing up an resources it was using. /** Destroys a callback set, freeing up any resources it was using. * * @param cbset         The callback set to be destroyed. * @return 0 on success, otherwise -1 to indicate an error has occurred * * This should only be called by the main thread. * Ideally, this should only be called by the main thread (i.e. from a signal * handler) but it can be called from within a reporter thread reasonably * safely. * */ */ DLLEXPORT void trace_join(libtrace_t * trace); /** * @name User Data Storage * * These method provide a way for users to store data against a trace or * a thread. * * Alternatively one could use global variables and thread local * storage (__thread), respectively, which in many cases could be simpler. * * @note We do not lock on reads, instead we rely on the * processor making any writes appear atomically. * * @{ */ /** Returns the data stored against a trace. * * @param trace The parallel trace * @return The stored data. */ DLLEXPORT void * trace_get_local(libtrace_t *trace); /** Store data against a trace so that all threads can access it * using trace_get_global(). * * @param trace The parallel trace. * @param data The new value to save against the trace * @return The previously stored value * * The update to the previous value is atomic and thread-safe. * * @note Although this is thread-safe another thread may still be * using the previous data, as such further synchronisation is needed * if a thread wanted to free the existing value. */ DLLEXPORT void * trace_set_local(libtrace_t *trace, void * data); /** Returns the users data stored against a thread. * * @param thread The thread * @return The stored data */ DLLEXPORT void * trace_get_tls(libtrace_thread_t *thread); /** Store data against a thread. * * @param thread The thread * @param data The new value to save against the thread * @return The previously stored value * * This function is not thread-safe and is intended only to be * called on the currently running thread. */ DLLEXPORT void * trace_set_tls(libtrace_thread_t *thread, void * data); /// @} * * E.g. * - int:eth0 would match LIBTRACE_CONF, LIBTRACE_CONF_INT, LIBTRACE_CONF_INT_ETH0 * - dag:/dev/dag0,0 would match LIBTRACE_CONF, LIBTRACE_CONF_DAG, LIBTRACE_CONF_DAG__DEV_DAG0_0 * - test.erf would match LIBTRACE_CONF, LIBTRACE_CONF_ERF, LIBTRACE_CONF_ERF_TEST_ERF * - int:eth0 would match LIBTRACE_CONF, LIBTRACE_CONF_INT, *       LIBTRACE_CONF_INT_ETH0 * - dag:/dev/dag0,0 would match LIBTRACE_CONF, LIBTRACE_CONF_DAG, *       LIBTRACE_CONF_DAG__DEV_DAG0_0 * - test.erf would match LIBTRACE_CONF, LIBTRACE_CONF_ERF, *       LIBTRACE_CONF_ERF_TEST_ERF * * @note All environment variables names MUST only contain * * @param[in] trace The parallel input trace * @param[in] nb The number of threads to use. If 0 use default. * @param[in] nb The number of threads to use. If set to 0, libtrace will *    try to auto-detect how many threads it can use. * @return 0 if successful otherwise -1 */ * * @param[in] trace The parallel input trace * @param[in] millisec The interval in milliseconds. If 0 this is disabled [default]. * @return 0 if successful otherwise -1 * * When a underlying parallel live trace is used MESSAGE_TICK_INTERVAL is sent * every tick interval to all per-packet threads to ensure data is received. * This allows results to be printed even in cases flows are not being directed * to a per-packet thread, while still maintaining order etc. * * @note Tick count is preferred over tick interval and will be used rather * than tick interval if possible. * @param[in] millisec The interval in milliseconds. If 0 this is disabled *     [default]. * @return 0 if successful, otherwise -1. * * When enabled, MESSAGE_TICK_INTERVAL will be sent every tick interval to all * processing threads. This allows results to be published even in cases where * new packets are not being directed to a processing thread, while still * maintaining order etc. * * @see MESSAGE_TICK_INTERVAL, trace_set_tick_count() */ DLLEXPORT int trace_set_tick_interval(libtrace_t *trace, size_t millisec); /** Set the count between tick messages. /** Set the number of packets to be read between tick messages. * * @param[in] trace The parallel input trace * @return 0 if successful otherwise -1 * * When an underlying trace is accessed internally by libtrace in a * single-threaded manner MESSAGE_TICK_COUNT is sent to all per-packet threads * after every count packets have been seen in the trace. This allows results * to be printed even in cases flows are not being directed to a per-packet * thread, while still maintaining order etc. * When enabled, MESSAGE_TICK_COUNT will be sent to all processing threads * after every 'count' packets have been read from the trace. This allows * results to be published even in cases where new packets are not being * directed to a processing thread, while still maintaining order etc. * * @see MESSAGE_TICK_COUNT, trace_set_tick_interval() * * @param trace A parallel input trace * @param tracetime If true packets are released with time intervals matching * @param tracetime If true packets are released with time spacing that matches * the original trace. Otherwise packets are read as fast as possible. * @return 0 if successful otherwise -1 DLLEXPORT int trace_set_tracetime(libtrace_t *trace, bool tracetime); /** This sets the maximum size of the freelist used to store empty packets /** Sets the maximum size of the freelist used to store empty packets * and their memory buffers. * * @return 0 if successful otherwise -1 * * Internally libtrace maintains a buffer of packet structures, this buffer * includes a cache per thread and a shared main pool. This configures * Internally libtrace maintains a buffer of packet structures which * includes a cache per thread and a shared main pool. This option configures * the size of the main pool. If an application is not passing packets * through to the reducer step --- that is to say returns packets from * the perpkt function --- this buffer will not need to be used. * through to the reporter thread, i.e. the packet callback always returns * the packet, then the main pool is not used. * * @note Setting this too low could cause performance issues or a deadlock. An DLLEXPORT int trace_set_cache_size(libtrace_t *trace, size_t size); /** This sets the maximum size of the freelist thread cache's used to provide * faster access than the main shared pool. /** This sets the maximum size of the freelist cache owned by each thread * used to provide faster access to empty packets than the main shared pool. * * @param trace A parallel input trace DLLEXPORT int trace_set_thread_cache_size(libtrace_t *trace, size_t size); /** If true the total number of packets that can be created by a trace is limited * to that set by trace_set_cache_size(), otherwise once exceeded malloc * and free will be used to create and free packets, this will be slower than * using the freelist and could run a machine out of memory. /** Determines whether a trace is allowed to create additional packets *  beyond the cache size. * *  If set to true, libtrace will cease reading packets once the cache is used *  up until the other threads release some packets back to the cache. * *  If set to false (the default), libtrace will use malloc and free to create *  additional packets when the cache is exhausted. This will be slower than *  getting a packet from the cache and will eventually run the machine out *  of memory if packets are allocated faster than they are released. * * @param trace A parallel input trace * * Internally libtrace will attempt to read up to this number of packets from * a format typically values of 10 will get good performance and increasing * beyond that will should little difference. * * @note We still pass a single packet at a time to the perpkt function * a format at a time. Typically, values of 10 will get good performance and * increasing beyond that will should little difference. * * @note We still pass a single packet at a time to the packet callback * function. */ DLLEXPORT int trace_set_burst_size(libtrace_t *trace, size_t size); /** * See diagrams, this sets the maximum size of buffers used between * the single hasher thread and the buffer. * NOTE setting this to less than recommend could cause deadlock a * Sets the maximum size of the buffer used between the single hasher thread * and the packet processing thread. * * Setting this to less than recommend could cause a deadlock for an input * trace that manages its own packets. * A unblockable warning message will be printed to stderr in this case. */ /** The number of packets that can queue per thread from hasher thread */ DLLEXPORT int trace_set_hasher_queue_size(libtrace_t *trace, size_t size); /** If true use a polling hasher queue, that means that we will spin/or yield * when data is not available rather than blocking on a condition. /** * Enables or disables polling of the hasher queue. * * If enabled, the processing threads will poll on the hasher queue, yielding * if no data is available. * * If disabled, the processing threads will block on a condition variable * if there is no data available from the hasher. * * @param trace A parallel input trace * @param polling If true the hasher will poll waiting for data, otherwise * it is not. Defaults to false. * * We note this is likely to waste many CPU cycles and could even decrease * it will use a condition variable. Defaults to false. * * We note polling is likely to waste many CPU cycles and could even decrease * performance. * DLLEXPORT int trace_set_hasher_polling(libtrace_t *trace, bool polling); /** If true the reporter thread will continuously poll waiting for results * if false they are only checked when a message is received, this message * is controlled by reporter_thold. /** * Enables or disables polling of the reporter result queue. * * If enabled, the reporter thread will continuously poll for results. * If disabled, the reporter will only check for results if it receives a * MESSAGE_POST_REPORTER. * * @param trace A parallel input trace * @param polling If true the reporter will poll waiting for data, otherwise * it is not. Defaults to false. * it will wait for a MESSAGE_POST_REPORTER. Defaults to false. * @return 0 if successful otherwise -1 * * We note this is likely to waste many CPU cycles and could even decrease * We note polling is likely to waste many CPU cycles and could even decrease * performance. * DLLEXPORT int trace_set_reporter_polling(libtrace_t *trace, bool polling); /** Set the perpkt thread result queue size before triggering the reporter * to read results. /** * Set the number of results that are required in the result queue before * a MESSAGE_POST_REPORTER is sent to the reporter so that it can read the * results. * * Set this to 1 to ensure if you require your results to reach the reporter * as soon as possible. * * @param trace A parallel input trace * * * @note This setting is generally ignored if trace_set_reporter_polling() is * set however some combiner functions might ignore trace_set_reporter_polling() * and still require this to be set. * @note This setting is generally ignored if the reporter is polling. However, * some combiner functions might ignore the polling behaviour and still * require this to be set. * @see trace_publish_result(), trace_post_reporter() */ DLLEXPORT int trace_set_reporter_thold(libtrace_t *trace, size_t thold); /** Prints a line to standard error for every state change * for both the trace as a whole and for each thread. /** * Enable or disable debug output for parallel libtrace. * If enabled, libtrace will print a line to standard error for every * state change observed by both the trace as a whole and by each thread. * * You really shouldn't need to enable this.... * * @param trace A parallel input trace * * The hasher function in a parallel trace can be used to control which * per-packet thread a packets is processed by. * processing thread will receive each packet. * * See hasher_types for a list of hashers supported natively by libtrace. * * HASHER_BALANCE is the default and will dispatch packets as fast as possible * to all threads arbitrarily. As such when called the hasher and * data parameters must be set to NULL. * * HASHER_CUSTOM will force the libtrace to use the user defined function. As * such the hasher parameter must be supplied. * * With other defined hasher types we will try to push the hashing into the format * by default. In this case the hasher parameter is optional and will be * preferred over the default supplied by libtrace. * to all threads arbitrarily. * * HASHER_CUSTOM will force the libtrace to use the user defined function. In * this case, the hasher parameter must be supplied. * * With other defined hasher types libtrace will try to push the hashing into * the capture format wherever possible. In this case, the hasher parameter is * optional; if a hasher is provided then it will be preferred over the * libtrace implementation. * * @note When supplying a hasher function it should be thread-safe so it can /** Types of results. * Some result types require special handling by combiners * as such making use of built-in types is important. * * Custom result types users should be defined as RESULT_USER(1000) or greater. enum result_types { /** * The result is a packet in some circumstances special handling needs * to be performed. As such packets should always be published as so. * The result contains a pointer to a libtrace_packet_t. This * packet should be freed using trace_free_packet() once the result * is processed by the reporter thread. * * The key for a RESULT_PACKET is the packet order (see * trace_get_packet_order() for more about ordering). * * @param key (Typically) The packets order, see trace_packet_get_order() */ RESULT_PACKET, /** The result is a tick message * * @param key The erf time-stamp of the tick /** * The result is a tick timestamp. The key is an ERF timestamp. */ RESULT_TICK_INTERVAL, /** The result is a tick message * * @param key The sequence number of the tick message /** * The result is a tick counter. The key is the sequence number of * the tick, relative to the packets read so far. */ RESULT_TICK_COUNT, /** Any user specific codes should be above this. * /** * Any user-defined result codes should be at or above this value. */ RESULT_USER = 1000 }; /** Publish a result for to the combiner destined for the reporter thread /** Publish a result to the reporter thread (via the combiner) * * @param[in] libtrace The parallel input trace * @param[in] key The key of the result (used for sorting by the combiner) * @param[in] value The value of the result * @param[in] type The type of result see the documentation for the result_types enum * @param[in] type The type of result (see result_types) */ DLLEXPORT void trace_publish_result(libtrace_t *libtrace, * * @param[in] libtrace The parallel input trace * @return True if the trace has dedicated hasher thread otherwise false. * * This is valid once the trace is running after calling trace_pstart(). * @return true if the trace has dedicated hasher thread otherwise false. * * This should only be called after the trace has been started with * trace_pstart(). */ DLLEXPORT bool trace_has_dedicated_hasher(libtrace_t * libtrace); /** Checks if a trace is using a reporter /** Checks if a trace is using a reporter thread. * * @param[in] libtrace The parallel input trace DLLEXPORT bool trace_has_reporter(libtrace_t * libtrace); /** Post a message to the reporter thread requesting it to check for more /** Post a message to the reporter thread requesting that it check for more * results. * DLLEXPORT int trace_post_reporter(libtrace_t *libtrace); /** Check the number of messages waiting in a queue /** Check the number of messages waiting in a thread's message queue * * @param[in] libtrace The input trace * @param[in] t The thread to check, if NULL the current thread will be used [Optional] * @param[in] t The thread to check; if NULL the current thread will be used. * * @return packets in the queue otherwise -1 upon error. libtrace_thread_t *t); /** Read a message from a thread in a blocking fashion /** Read a message from a thread in a blocking fashion. * * @param[in] libtrace The input trace * @param[in] t The thread to check, if NULL the current thread will be used [Optional] * @param[out] message A pointer to libtrace_message_t structure which will be * filled with the retrieved message. * @param[in] t The thread to check, if NULL the current thread will be used. * @param[out] message A pointer to a libtrace_message_t structure which will * be filled with the retrieved message. * * @return The number of messages remaining otherwise -1 upon error. * * * @note For best performance it is recommended to supply the thread argument libtrace_message_t * message); /** Read a message from a thread in a blocking fashion /** Read a message from a thread in a non-blocking fashion. * * @param[in] libtrace The input trace * @param[in] t The thread to check, if NULL the current thread will be used [Optional] * @param[out] message A pointer to libtrace_message_t structure which will be * filled with the retrieved message. * * @return 0 if successful otherwise -1 upon error or if no packets were available. * * @param[in] t The thread to check, if NULL the current thread will be used. * @param[out] message A pointer to a libtrace_message_t structure which will * be filled with the retrieved message. * * @return 0 if successful otherwise -1 upon error or if no message were * available. * * @note For best performance it is recommended to supply the thread argument libtrace_message_t * message); /** Send a message to the reporter thread /** Send a message to the reporter thread. * * @param[in] libtrace The parallel trace * @param[in] message The message to be sent, if sender is NULL libtrace will * attempt to fill this in. It is faster to assign this if it is known. * * @return -1 upon error indicating the message has not been sent otherwise a * backlog indicator (the number of messages the reporter has not yet read). * @param[in] message The message to be sent. If the sender field is NULL, * libtrace will attempt to fill this in. It is faster to assign this if it is * known. * * @return -1 upon error indicating the message has not been sent. Otherwise, * will return the number of messages the reporter has not yet read. */ DLLEXPORT int trace_message_reporter(libtrace_t * libtrace, libtrace_message_t * message); /** Send a message to all per-packet threads /** Send a message to all processing threads. * * @param[in] libtrace The parallel trace * @param[in] message The message to be sent, if sender is NULL libtrace will * attempt to fill this in. It is faster to assign this if it is known. * * @return 0 if successful otherwise a negative number indicating the number * of per-packet threads the message was not sent to (i.e. -1 means one thread * could not be sent the message). * @param[in] message The message to be sent. If the sender field is NULL, * libtrace will attempt to fill this in. It is faster to assign this if it is * known. * * @return 0 if successful. Otherwise, a negative number is returned that * indicates the number of processing threads that the message was not sent * to (i.e. -1 means one thread could not be sent the message). */ DLLEXPORT int trace_message_perpkts(libtrace_t * libtrace, libtrace_message_t * message); /** Send a message to a thread /** Send a message to a specific thread. * * @param[in] libtrace The parallel trace * @param[in] t The thread to message * @param[in] message The message to be sent, if sender is NULL libtrace will * attempt to fill this in. It is faster to assign this if it is known. * * @return -1 upon error indicating the message has not been sent otherwise a * backlog indicator (the number of messages the thread has not yet read). * @param[in] message The message to be sent. If the sender field is NULL, * libtrace will attempt to fill this in. It is faster to assign this if it is * known. * * @return -1 upon error indicating the message has not been sent. Otherwise, * will return the number of messages the recipient has not yet read. */ DLLEXPORT int trace_message_thread(libtrace_t * libtrace, libtrace_message_t * message); /** Check if a parallel trace has finished reading packets * * @return True if the trace has finished reading packets (even if all results /** Checks if a parallel trace has finished reading packets. * * @return true if the trace has finished reading packets (even if all results * have not yet been processed). Otherwise false. * * @note This returns true even if all results have not yet been processed. * @note This returns true even if all results have not yet been processed by * the reporter thread. */ DLLEXPORT bool trace_has_finished(libtrace_t * libtrace); /** Check if libtrace is directly reading from multiple queues * from the format (such as a NICs hardware queues). * * When a parallel trace is running, or if checked after its completion * this returns true if a trace was able to run natively parallel * from the format. Otherwise false is returned, meaning libtrace is * distibuting packets across multiple threads from a single source. * * Factors that may stop this happening despite the format supporting * native parallel reads include: the choice of hasher function, * the number of threads choosen (such as 1 or more than the trace supports) * or another error when trying to start the parallel format. * * If this is called before the trace is started. I.e. before pstart * this returns an indication that the trace has the possiblity to support * native parallel reads. After trace pstart is called this should be * checked again to confirm this has happened. * * from within the capture format (such as a NICs hardware queues). * * A trace is considered to be parallel if the input format for the trace * allows the packets to be read in a natively parallel fashion, i.e. packets * can be read using multiple pipelines. If this function returns false, the * packets are instead being read from a single input source and then * distributed amongst the processing threads. * * Factors that may cause this function to return false despite the format * normally supporting native parallel reads include: the choice of hasher * function, the number of threads choosen (such as 1 or more than the trace * supports) or another error when trying to start the parallel format. * * If called before the trace is started, i.e. before trace_pstart(), this * function returns an indication whether the trace has the possiblity to * support native parallel reads. After trace_pstart() is called this should be * checked again to confirm that this has happened. * * @return true if the trace is parallel or false if the library is splitting * @return A 64bit sequence number or erf timestamp. * * The returned value can be used to compare if packets come before or after * others. * The returned value can be used to compare the relative ordering of packets. * Formats that are not natively parallel will typically return a sequence * number. Natively parallel formats will return a timestamp. */ DLLEXPORT uint64_t trace_packet_get_order(libtrace_packet_t * packet); * @return A 64-bit hash * * @note In many cases this might not be filled in, only in cases where * @note This function will only work in situations where * a custom hash is being used. You can use trace_has_dedicated_hasher() * to check if this will be valid. * to check if this is the case. */ DLLEXPORT uint64_t trace_packet_get_hash(libtrace_packet_t * packet); * @param[in] order the new order of a packet * * @note many combiners rely on this value, ensure changing this conforms to * the combiners requirements. * @note Many combiners rely on this value, so please ensure that changing this * conforms to the expectations of the combiner. * * Generally speaking, you probably shouldn't be changing the order of packets! */ DLLEXPORT void trace_packet_set_order(libtrace_packet_t * packet, uint64_t order); * @param[in] hash the new hash * * Once handed to the user the libtrace library has little use for this field * and as such this can essentially be used for any storage the user requires. * Once a packet reaches the processing thread, the libtrace library has * little use for this field and as such this can essentially be used for any * storage that the user requires. */ DLLEXPORT void trace_packet_set_hash(libtrace_packet_t * packet, uint64_t hash); /** TODO WHAT TO DO WITH THIS ? */ DLLEXPORT uint64_t tv_to_usec(const struct timeval *tv); /** Returns the first packet of a parallel trace since it was started or * restarted. * * @param[in] libtrace the parallel input trace * @param[in] t Either a per packet thread or NULL to retrieve the first packet * of across all per packet threads. * @param[out] packet A pointer to the first packet in the trace. [Optional] * @param[out] tv The system time-stamp when this packet was received. [Optional] /** Returns the first packet read by a processing thread since the source * trace was last started or restarted. * * @param[in] libtrace the parallel input trace. * @param[in] t Either a per packet thread or NULL to retrieve the earliest * packet across all per packet threads. * @param[out] packet A pointer to the requested packet. [Optional] * @param[out] tv The system time-stamp when the packet was received. [Optional] * @return 1 if we are confident this is the first packet. Otherwise 0 if this * is a best guess (this is only possible int the case t=NULL) * in which case we recommend calling this at a later time. * -1 is returned if an error occurs, such as supplied a invalid thread. * is a best guess (this is only possible int the case t=NULL) in which case * we recommend trying again at a later time. * -1 is returned if an error occurs, such as when this function is supplied * an invalid thread. * * The packet and timeval returned by this function is shared by all threads const struct timeval **tv); /** Makes a packet safe, a packet will become invalid after a /** Makes a packet safe, preventing the packet from becoming invalid after a * pausing a trace. * * * This copies a packet in such a way that it will be able to survive a pause. * However this will not allow the packet to be used after * the format is destroyed. * However this will not allow the packet to be used after the format is * destroyed. */ DLLEXPORT void libtrace_make_packet_safe(libtrace_packet_t *pkt); /** Makes a result safe if a result contains a packet. /** Makes a result safe, preventing the result from becoming invalid after * pausing a trace. * * @param[in,out] res The result to make safe. * * This ensures the internal content of a result is safe to survive a pause. * Note that this is only an issue if the result contains a packet. * See libtrace_make_packet_safe(). */ * The packet should not be used after calling this function. * * @note Don't use this inside a packet callback function -- just return * the packet instead, as this will be faster. * * @note All packets should be free'd before a trace is destroyed. */ DLLEXPORT void trace_free_packet(libtrace_t * libtrace, libtrace_packet_t * packet); /** Provides some basic information about a trace based on its input format. * * @param libtrace  The trace that is being inquired about. * @return a libtrace_info_t structure that contains information about the * trace format, i.e. is it live or not, how many threads it supports. * * See trace_is_parallel(), trace_get_perpkt_threads(). */ DLLEXPORT libtrace_info_t *trace_get_information(libtrace_t * libtrace); * key value pairs. * * @param trace A parallel trace which is not running or destroyed * @param str A comma separated list of key=value pairs. * E.g. \em "burst_size=20,perpkt_threads=2,fixed_count=true" * @param trace A parallel trace which is not running or destroyed. * @param str A comma separated list of key=value pairs: *   e.g. \em "burst_size=20,perpkt_threads=2,fixed_count=true" * @return 0 if successful otherwise -1. If bad options are passed we will * print the error to stderr but still return successful. * LIBTRACE_CONF, see Parallel Configuration for more information. * * @note this interface is provided to allow a user to configure an application * if a libtrace applicate wishes to configure a setting it should use a * trace_set_*() function with the same name. * @note This interface is provided to allow a user to quickly configure an * application using a single API call. A nicer programatic method for * configuration would be to use the appropriate trace_set_*() function for * each option. */ DLLEXPORT int trace_set_configuration(libtrace_t *trace, const char * str); DLLEXPORT int trace_set_configuration_file(libtrace_t *trace, FILE *file); /** Returns the number of processing threads that have been created for * a given trace. * * @param t A parallel trace. * @return The number of processing threads owned by that trace. */ DLLEXPORT int trace_get_perpkt_threads(libtrace_t* t); /** * Sets a combiner function against the trace. * Sets a combiner function for an input trace. * * @param trace The input trace * @combiner The combiner to use * @config config Configuration information. Dependent upon the combiner in use * @param combiner The combiner to use * @param config Configuration information. Dependent upon the combiner. * * Sets a combiner against a trace, this should only be called on a * non-started or paused trace.  By default combiner_unordered * will be used. * non-started or paused trace.  By default, combiner_unordered * will be used if this function is not called before starting the trace. */ DLLEXPORT void trace_set_combiner(libtrace_t *trace, const libtrace_combine_t *combiner, libtrace_generic_t config); /** * Takes unordered (or ordered) input and produces unordered output. * Basically you get the result quickly but in no particular order. * This is the fastest combiner but makes no attempt to ensure you get * results in a particular order. */ extern const libtrace_combine_t combiner_unordered; /** * Takes ordered input and produces ordered output. Perpkt threads * the output results must be ordered for this to work correctly!! * Takes ordered input and produces ordered output. Each processing thread * must produce results that are strictly ordered for this combiner to * work correctly. * * For example, a thread may publish a series of results with the keys * (in order) of 1,4,10,11,15,20 as the keys are all in order. It must not * publish the results in the order 1,4,11,10,15,20 -- 10 comes after 11, * which is out-of-order. */ extern const libtrace_combine_t combiner_ordered; /** * Like classic Google Map/Reduce, the results are sorted * in ascending order, this is only done when the trace finishes. * * This only works with a limited number of results, otherwise * we will just run out of memory and crash!! You should always * in ascending order based on their key. The sorting is only done when the * trace finishes and all results are stored internally until then. * * This only works with a very limited number of results, otherwise * libtrace will just run out of memory and crash. You should always * use combiner_ordered if you can. */
• ## lib/trace_parallel.c

 r5478d3d } DLLEXPORT void * trace_get_local(libtrace_t *trace) { return trace->global_blob; } DLLEXPORT void * trace_set_local(libtrace_t *trace, void * data) { void *ret; pthread_mutex_lock(&trace->libtrace_lock); ret = trace->global_blob; trace->global_blob = data; pthread_mutex_unlock(&trace->libtrace_lock); return ret; } DLLEXPORT void * trace_get_tls(libtrace_thread_t *t) { return t->user_data; } DLLEXPORT void * trace_set_tls(libtrace_thread_t *t, void * data) { void *ret = t->user_data; t->user_data = data; return ret; } /** * Publishes a result to the reduce queue
• ## tools/tracertstats/tracertstats.c

 rf625817 trace_set_perpkt_threads(trace, threadcount); if (trace_get_information(trace)->live) { if (trace_is_parallel(trace)) { trace_set_tick_interval(trace, (int) (packet_interval * 1000)); }
