source: lib/libtrace_parallel.h @ 6a082f8

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 6a082f8 was 6a082f8, checked in by Richard Sanger <rsangerarj@…>, 6 years ago

Move parallel API functions to libtrace_parallel.h

Also started the process of updating the documentation

  • Property mode set to 100644
File size: 19.3 KB
RevLine 
[6a082f8]1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Authors: Richard Sanger
8 *
9 * All rights reserved.
10 *
11 * This code has been developed by the University of Waikato WAND
12 * research group. For further information please see http://www.wand.net.nz/
13 *
14 * libtrace is free software; you can redistribute it and/or modify
15 * it under the terms of the GNU General Public License as published by
16 * the Free Software Foundation; either version 2 of the License, or
17 * (at your option) any later version.
18 *
19 * libtrace is distributed in the hope that it will be useful,
20 * but WITHOUT ANY WARRANTY; without even the implied warranty of
21 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22 * GNU General Public License for more details.
23 *
24 * You should have received a copy of the GNU General Public License
25 * along with libtrace; if not, write to the Free Software
26 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
27 *
28 * $Id$
29 *
30 */
31
32/** @file
33 *
34 * @brief Header file containing definitions for structures and functions
35 * related to the parallel framework
36 *
37 * @author Richard Sanger
38 *
39 * @version $Id$
40 *
41 * The parallel libtrace framework is a replacement to the libtrace framework. XXX TODO MAKE MORE DOCS HERE.
42 */
43
44#ifndef LIBTRACE_PARALLEL_H
45#define LIBTRACE_PARALLEL_H
46
47#include "libtrace.h"
48#include <stdio.h>
49
50typedef struct libtrace_result_t libtrace_result_t;
51/**
52 * A collection of types for convenience used in place of a
53 * simple void* to allow a any type of data to be stored.
54 *
55 * This is expected to be 8 bytes in length.
56 */
57typedef union {
58        /* Pointers */
59        void *ptr;
60        libtrace_packet_t *pkt;
61        libtrace_result_t *res;
62
63        /* C99 Integer types */
64        /* NOTE: Standard doesn't require 64-bit
65         * but x32 and x64 gcc does */
66        int64_t sint64;
67        uint64_t uint64;
68
69        uint32_t uint32s[2];
70        int32_t sint32s[2];
71        uint32_t uint32;
72        int32_t sint32;
73
74        uint16_t uint16s[4];
75        int16_t sint16s[4];
76        uint16_t uint16;
77        int16_t sint16;
78
79        uint8_t uint8s[8];
80        int8_t sint8s[8];
81        uint8_t uint8;
82        int8_t sint8;
83
84        size_t size;
85
86        /* C basic types - we cannot be certian of the size */
87        int sint;
88        unsigned int uint;
89
90        signed char schars[8];
91        unsigned char uchars[8];
92        signed char schar;
93        unsigned char uchar;
94
95        /* Real numbers */
96        float rfloat;
97        double rdouble;
98} libtrace_generic_t;
99ct_assert(sizeof(libtrace_generic_t) == 8);
100
101typedef struct libtrace_message_t {
102        int code;
103        libtrace_generic_t additional;
104        libtrace_thread_t *sender;
105} libtrace_message_t;
106
107/** Structure holding information about a result */
108struct libtrace_result_t {
109        uint64_t key;
110        libtrace_generic_t value;
111        int type;
112};
113
114typedef enum {
115        /**
116         * Sets the hasher function, if NULL(default) no hashing is used a
117         * cores will get packets on a first in first served basis
118         */
119        TRACE_OPTION_SET_HASHER,
120
121        /**
122         * Libtrace set perpkt thread count
123         */
124        TRACE_OPTION_SET_PERPKT_THREAD_COUNT,
125
126        /**
127         * Delays packets so they are played back in trace-time rather than as fast
128         * as possible.
129         */
130        TRACE_OPTION_TRACETIME,
131
132        /**
133         * Specifies the interval between tick packets in milliseconds, if 0
134         * or less this is ignored.
135         */
136        TRACE_OPTION_TICK_INTERVAL,
137        TRACE_OPTION_GET_CONFIG,
138        TRACE_OPTION_SET_CONFIG
139} trace_parallel_option_t;
140
141enum libtrace_messages {
142        MESSAGE_PACKET,
143        MESSAGE_RESULT,
144        MESSAGE_STARTING,
145        MESSAGE_RESUMING,
146        MESSAGE_STOPPING,
147        MESSAGE_PAUSING,
148        MESSAGE_DO_PAUSE,
149        MESSAGE_DO_STOP,
150        MESSAGE_FIRST_PACKET,
151        MESSAGE_PERPKT_ENDED,
152        MESSAGE_PERPKT_RESUMED,
153        MESSAGE_PERPKT_PAUSED,
154        MESSAGE_PERPKT_EOF,
155        MESSAGE_POST_REPORTER,
156        MESSAGE_POST_RANGE,
157        MESSAGE_TICK,
158        MESSAGE_USER = 1000
159};
160
161enum hasher_types {
162        /**
163         * Balance load across CPUs best as possible, this is basically to say do
164         * not care about hash. This might still might be implemented
165         * using a hash or round robin etc. under the hood depending on the format
166         */
167        HASHER_BALANCE,
168
169        /** Use a hash which is bi-directional for TCP flows, that is packets with
170         * the same hash are sent to the same thread. All non TCP packets will be
171         * sent to the same thread. UDP may or may not be sent to separate
172         * threads like TCP, this depends on the format support.
173         */
174        HASHER_BIDIRECTIONAL,
175
176        /**
177         * Use a hash which is uni-directional across TCP flows, that means the
178         * opposite directions of the same 5 tuple might end up on separate cores.
179         * Otherwise is identical to HASHER_BIDIRECTIONAL
180         */
181        HASHER_UNIDIRECTIONAL,
182
183        /**
184         * Always use the user supplied hasher, this currently disables native
185         * support and is likely significantly slower.
186         */
187        HASHER_CUSTOM,
188
189        /**
190         * This is not a valid option, used internally only!!! TODO remove
191         * Set by the format if the hashing is going to be done in hardware
192         */
193        HASHER_HARDWARE
194};
195
196typedef struct libtrace_info_t {
197        /**
198         * True if a live format (i.e. packets have to be tracetime).
199         * Otherwise false, indicating packets can be read as fast
200         * as possible from the format.
201         */
202        bool live;
203
204        /**
205         * The maximum number of threads supported by a parallel trace. 1
206         * if parallel support is not native (in this case libtrace will simulate
207         * an unlimited number of threads), -1 means unlimited and 0 unknown.
208         */
209        int max_threads;
210
211        /* TODO hash fn supported list */
212
213        /* TODO consider time/clock details?? */
214} libtrace_info_t;
215
216
217/**
218 * Tuning the parallel sizes
219 */
220struct user_configuration {
221        // Packet memory cache settings (ocache_init) total
222        /**
223         * See diagrams, this sets the maximum size of freelist used to
224         * maintain packets and their memory buffers.
225         * NOTE setting this to less than recommend could cause deadlock a
226         * trace that manages its own packets.
227         * A unblockable error message will be printed.
228         */
229        size_t packet_cache_size;
230        /**
231         * Per thread local cache size for the packet freelist
232         */
233        size_t packet_thread_cache_size;
234        /**
235         * If true the total number of packets that can be created by a trace is limited
236         * to the packet_cache_size, otherwise once packet_cache_size is exceeded alloc
237         * and free will be used to create and free packets, this will be slower than
238         * using the freelist and could run a machine out of memory.
239         *
240         * However this does make it easier to ensure that deadlocks will not occur
241         * due to running out of packets
242         */
243        bool fixed_packet_count;
244        /**
245         * When reading from a single threaded input source to reduce
246         * lock contention a 'burst' of packets is read per pkt thread
247         * this determines the bursts size.
248         */
249        size_t burst_size;
250        // Each perpkt thread has a queue leading into the reporter
251        //size_t reporter_queue_size;
252
253        /**
254         * The tick interval - in milliseconds
255         * When a live trace is used messages are sent at the tick
256         * interval to ensure that all perpkt threads receive data
257         * this allows results to be printed in cases flows are
258         * not being directed to a certian thread, while still
259         * maintaining order.
260         */
261        size_t tick_interval;
262
263        /**
264         * Like the tick interval but used in the case of file format
265         * This specifies the number of packets before inserting a tick to
266         * every thread.
267         */
268        size_t tick_count;
269
270        /**
271         * The number of per packet threads requested, 0 means use default.
272         * Default typically be the number of processor threads detected less one or two.
273         */
274        size_t perpkt_threads;
275
276        /**
277         * See diagrams, this sets the maximum size of buffers used between
278         * the single hasher thread and the buffer.
279         * NOTE setting this to less than recommend could cause deadlock a
280         * trace that manages its own packets.
281         * A unblockable warning message will be printed to stderr in this case.
282         */
283        /** The number of packets that can queue per thread from hasher thread */
284        size_t hasher_queue_size;
285
286        /**
287         * If true use a polling hasher queue, that means that we will spin/or yeild
288         * when rather than blocking on a lock. This applies to both the hasher thread
289         * and perpkts reading the queues.
290         */
291        bool hasher_polling;
292
293        /**
294         * If true the reporter thread will continuously poll waiting for results
295         * if false they are only checked when a message is received, this message
296         * is controlled by reporter_thold.
297         */
298        bool reporter_polling;
299
300        /**
301         * Perpkt thread result queue size before triggering the reporter step to read results
302         */
303        size_t reporter_thold;
304
305        /**
306         * Prints a line to standard error for every state change
307         * for both the trace as a whole and for each thread.
308         */
309        bool debug_state;
310};
311
312/**
313 * The methods we use to combine multiple outputs into a single output
314 * This is not considered a stable API however is public.
315 * Where possible use built in combiners
316 *
317 * NOTE this structure is duplicated per trace and as such can
318 * have functions rewritten, and in fact should if possible.
319 */
320typedef struct libtrace_combine libtrace_combine_t;
321struct libtrace_combine {
322
323        /**
324         * Called at the start of the trace to allow datastructures
325         * to be initilised and allow functions to be swapped if approriate.
326         *
327         * Also factors such as whether the trace is live or not can
328         * be used to determine the functions used.
329         * @return 0 if successful, -1 if an error occurs
330         */
331        int (*initialise)(libtrace_t *,libtrace_combine_t *);
332
333        /**
334         * Called when the trace ends, clean up any memory here
335         * from libtrace_t * init.
336         */
337        void (*destroy)(libtrace_t *, libtrace_combine_t *);
338
339        /**
340         * Publish a result against it's a threads queue.
341         * If null publish directly, expected to be used
342         * as a single threaded optimisation and can be
343         * set to NULL by init if this case is detected.
344         *
345         * TODO this is old info
346         */
347        void (*publish)(libtrace_t *, int thread_id, libtrace_combine_t *, libtrace_result_t *);
348
349        /**
350         * Read as many results as possible from the trace.
351         * Directy calls the users code to handle results from here.
352         *
353         * THIS SHOULD BE NON-BLOCKING AND READ AS MANY AS POSSIBLE
354         * If publish is NULL, this probably should be NULL also otherwise
355         * it will not be called.
356         */
357        void (*read)(libtrace_t *, libtrace_combine_t *);
358
359        /**
360         * Called when the trace is finished to flush the final
361         * results to the reporter thread.
362         *
363         * There may be no results, in which case this should
364         * just return.
365         *
366         * Libtrace state:
367         * Called from reporter thread
368         * No perpkt threads will be running, i.e. publish will not be
369         * called again.
370         *
371         * If publish is NULL, this probably should be NULL also otherwise
372         * it will not be called.
373         */
374        void (*read_final)(libtrace_t *, libtrace_combine_t *);
375
376        /**
377         * Pause must make sure any results of the type packet are safe.
378         * That means trace_copy_packet() and destroy the original.
379         * This also should be NULL if publish is NULL.
380         */
381        void (*pause)(libtrace_t *, libtrace_combine_t *);
382
383        /**
384         * Data storage for all the combiner threads
385         */
386        void *queues;
387
388        /**
389         * Configuration options, what this does is upto the combiner
390         * chosen.
391         */
392        libtrace_generic_t configuration;
393};
394
395/**
396 * The definition for the main function that the user supplies to process
397 * packets.
398 *
399 * @param trace The trace the packet is related to.
400 * @param thread The thread identifier.
401 * @param mesg_code The type of data ready, the most important being MESSAGE_PACKET.
402 * In this case data.pkt contains the packet.
403 * @param data A generic union of types that fit into 8 bytes, containing
404 * information dependent upon the mesg_code.
405 * @param sender The thread from which the message originated.
406 *
407 * The values of data and sender depend upon the mesg_code. Please see the
408 * documentation for the message as to what value these will contain.
409 */
410typedef void* (*fn_per_pkt)(libtrace_t* trace,
411                            libtrace_thread_t *thread,
412                            int mesg_code,
413                            libtrace_generic_t data,
414                            libtrace_thread_t *sender);
415
416/**
417 * The definition for the main function that the user supplies to process
418 * results from trace_publish_result().
419 *
420 * @param trace The trace the packet is related to.
421 * @param mesg_code The type of data ready, the most important being MESSAGE_RESULT.
422 * In this case data.res contains the result.
423 * @param data A generic union of types that fit into 8 bytes, containing
424 * information dependent upon the mesg_code.
425 * @param sender The thread from which the message originated.
426 *
427 * The values of data and sender depend upon the mesg_code. Please see the
428 * documentation for the message as to what value these will contain.
429 */
430typedef void (*fn_reporter)(libtrace_t* trace,
431                            int mesg_code,
432                            libtrace_generic_t data,
433                            libtrace_thread_t *sender);
434
435/**
436 * The definition for a hasher function, allowing matching packets to be
437 * directed to the same per packet thread for processing.
438 *
439 * @param packet The packet to be hashed.
440 * @param data A void pointer which can contain additional information,
441 * such as configuration of the hasher function.
442 */
443typedef uint64_t (*fn_hasher)(const libtrace_packet_t* packet, void *data);
444
445
446/** Start or restart an input trace in the parallel libtrace framework.
447 *
448 * @param libtrace The input trace to start
449 * @param global_blob Global data related to this trace accessable using trace_get_global()
450 * @param per_pkt A user supplied function called when a packet is ready
451 * @param reporter A user supplied function called when a result is ready.
452 * Optional if NULL the reporter thread will not be started.
453 * @returns 0 on success, otherwise -1 to indicate an error has occured
454 *
455 * This can also be used to restart an existing parallel trace,
456 * that has previously been paused using trace_ppause().
457 * In this case global_blob,per_pkt and reporter will only be updated
458 * if they are non-null. Otherwise their previous values will be maintained.
459 *
460 */
461DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob,
462                           fn_per_pkt per_pkt, fn_reporter reporter);
463
464/** Pauses a trace previously started with trace_pstart()
465 *
466 * @param libtrace The parallel trace to be paused
467 * @returns 0 on success, otherwise -1 to indicate an error has occured
468 *
469 */
470DLLEXPORT int trace_ppause(libtrace_t *libtrace);
471
472/** Stops a parallel trace, causing all threads to exit as if an EOF
473 * has occured. This replaces trace_interrupt(), allowing
474 * a specified trace to be stopped.
475 *
476 * @param libtrace The parallel trace to be stopped
477 * @returns 0 on success, otherwise -1 to indicate an error has occured
478 *
479 * This should only be called by the main thread.
480 *
481 */
482DLLEXPORT int trace_pstop(libtrace_t *libtrace);
483
484/** Waits for a trace to finish and all threads to join.
485 *
486 * @param trace The parallel trace
487 *
488 * Waits for a trace to finish, whether this be due to
489 * an error occuring, an EOF or trace_pstop.
490 *
491 */
492DLLEXPORT void trace_join(libtrace_t * trace);
493
494
495/**
496 * @name User Data Storage
497 *
498 * These method provide a way for users to store data agaist a trace or
499 * a thread.
500 *
501 * Alternatively one could use global variables and thread local
502 * storage (__thread), respectively, which in many cases could be simplier.
503 *
504 * @note We do not lock on reads, instead we rely on the
505 * processor making any writes appear atomically.
506 *
507 * @{
508 */
509
510/** Returns the data stored against a trace.
511 *
512 * @param trace The parallel trace
513 * @return The stored data.
514 */
515DLLEXPORT void * trace_get_local(libtrace_t *trace);
516
517/** Store data against a trace so that all threads can access it
518 * using trace_get_global().
519 *
520 * @param trace The parallel trace.
521 * @param data The new value to save agaisnt the trace
522 * @return The previously stored value
523 *
524 * The update to the previous value is atomic and thread-safe.
525 *
526 * @note Although this is thread-safe another thread may still be
527 * using the previous data, as such further synchronisation is needed
528 * if a thread wanted to free the existing value.
529 */
530DLLEXPORT void * trace_set_local(libtrace_t *trace, void * data);
531
532/** Returns the users data stored against a thread.
533 *
534 * @param thread The thread
535 * @return The stored data
536 */
537DLLEXPORT void * trace_get_tls(libtrace_thread_t *thread);
538
539/** Store data against a thread.
540 *
541 * @param The parallel trace.
542 * @param data The new value to save agaisnt the trace
543 * @return The previously stored value
544 *
545 * This function is not thread-safe and is intented only to be
546 * called on the currently running thread.
547 */
548DLLEXPORT void * trace_set_tls(libtrace_thread_t *thread, void * data);
549
550/// @}
551
552
553/** TODO DOXS
554 */
555DLLEXPORT int trace_set_hasher(libtrace_t *trace, enum hasher_types type, fn_hasher hasher, void *data);
556
557#define RESULT_NORMAL 0
558#define RESULT_PACKET 1
559#define RESULT_TICK   2
560
561DLLEXPORT void libtrace_result_set_key(libtrace_result_t * result, uint64_t key);
562DLLEXPORT uint64_t libtrace_result_get_key(libtrace_result_t * result);
563DLLEXPORT void libtrace_result_set_value(libtrace_result_t * result, libtrace_generic_t value);
564DLLEXPORT libtrace_generic_t libtrace_result_get_value(libtrace_result_t * result);
565DLLEXPORT void libtrace_result_set_key_value(libtrace_result_t * result, uint64_t key, libtrace_generic_t value);
566DLLEXPORT void trace_destroy_result(libtrace_result_t ** result);
567
568DLLEXPORT void trace_publish_result(libtrace_t *libtrace, libtrace_thread_t *t, uint64_t key, libtrace_generic_t value, int type);
569
570DLLEXPORT int trace_post_reporter(libtrace_t *libtrace);
571DLLEXPORT int libtrace_thread_get_message_count(libtrace_t * libtrace);
572DLLEXPORT int libtrace_thread_get_message(libtrace_t * libtrace, libtrace_message_t * message);
573DLLEXPORT int libtrace_thread_try_get_message(libtrace_t * libtrace, libtrace_message_t * message);
574DLLEXPORT int trace_send_message_to_reporter(libtrace_t * libtrace, libtrace_message_t * message);
575DLLEXPORT int trace_send_message_to_perpkts(libtrace_t * libtrace, libtrace_message_t * message);
576DLLEXPORT int trace_send_message_to_thread(libtrace_t * libtrace, libtrace_thread_t *t, libtrace_message_t * message);
577DLLEXPORT int trace_finished(libtrace_t * libtrace);
578DLLEXPORT uint64_t trace_packet_get_order(libtrace_packet_t * packet);
579DLLEXPORT uint64_t trace_packet_get_hash(libtrace_packet_t * packet);
580DLLEXPORT void trace_packet_set_order(libtrace_packet_t * packet, uint64_t order);
581DLLEXPORT void trace_packet_set_hash(libtrace_packet_t * packet, uint64_t hash);
582DLLEXPORT uint64_t tv_to_usec(struct timeval *tv);
583
584DLLEXPORT int retrive_first_packet(libtrace_t *libtrace, libtrace_packet_t **packet, struct timeval **tv);
585
586DLLEXPORT void libtrace_make_packet_safe(libtrace_packet_t *pkt);
587DLLEXPORT void libtrace_make_result_safe(libtrace_result_t *res);
588
589
590DLLEXPORT int trace_parallel_config(libtrace_t *libtrace, trace_parallel_option_t option, void *value);
591DLLEXPORT libtrace_packet_t* trace_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet);
592DLLEXPORT void trace_free_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet);
593DLLEXPORT libtrace_info_t *trace_get_information(libtrace_t * libtrace);
594
595
596DLLEXPORT void parse_user_config(struct user_configuration* uc, char * str);
597DLLEXPORT void parse_user_config_file(struct user_configuration* uc, FILE *file);
598DLLEXPORT int libtrace_get_perpkt_count(libtrace_t* t);
599
600
601
602/**
603 * Sets a combiner function against the trace.
604 *
605 * @param trace The input trace
606 * @combiner The combiner to use
607 * @config config Configuration information. Dependent upon the combiner in use
608 *
609 * Sets a combiner against a trace, this should only be called on a
610 * non-started or paused trace.
611 */
612DLLEXPORT void trace_set_combiner(libtrace_t *trace, const libtrace_combine_t *combiner, libtrace_generic_t config);
613
614#define ZERO_USER_CONFIG(config) memset(&config, 0, sizeof(struct user_configuration));
615
616#endif // LIBTRACE_PARALLEL_H
Note: See TracBrowser for help on using the repository browser.