source: lib/trace_parallel.c @ 37ee856

developringdecrementfixringperformance
Last change on this file since 37ee856 was 37ee856, checked in by Shane Alcock <salcock@…>, 2 years ago

Tag each packet with the start iteration of the parent trace.

Traces can be started, paused and then restarted. For some
formats (especially live ones), this will mean that buffers
containing received packets can be destroyed and recreated as
a result of that process. However, in our parallel world we might
also have lingering references to packets that lived in a
now-destroyed buffer and bad things will happen if we try to
operate on it.

To try and avoid this, we keep track of how many times a trace has
been "started" and each packet read is tagged with the start count
at the time it was read. Later processing functions can now check
if the packet was read before the most recent "start" -- if it was,
then it is potentially bogus and should be ignored.

This shouldn't change anything for the vast majority of libtrace
use-cases. Normally, pausing is only used prior to ending an
input without subsequent restarting, i.e. there is only one start
iteration.

  • Property mode set to 100644
File size: 85.0 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27#define _GNU_SOURCE
28#include "common.h"
29#include "config.h"
30#include <assert.h>
31#include <errno.h>
32#include <fcntl.h>
33#include <stdio.h>
34#include <stdlib.h>
35#include <string.h>
36#include <sys/stat.h>
37#include <sys/types.h>
38#ifndef WIN32
39#include <sys/socket.h>
40#endif
41#include <stdarg.h>
42#include <sys/param.h>
43
44#ifdef HAVE_LIMITS_H
45#  include <limits.h>
46#endif
47
48#ifdef HAVE_SYS_LIMITS_H
49#  include <sys/limits.h>
50#endif
51
52#ifdef HAVE_NET_IF_ARP_H
53#  include <net/if_arp.h>
54#endif
55
56#ifdef HAVE_NET_IF_H
57#  include <net/if.h>
58#endif
59
60#ifdef HAVE_NETINET_IN_H
61#  include <netinet/in.h>
62#endif
63
64#ifdef HAVE_NET_ETHERNET_H
65#  include <net/ethernet.h>
66#endif
67
68#ifdef HAVE_NETINET_IF_ETHER_H
69#  include <netinet/if_ether.h>
70#endif
71
72#include <time.h>
73#ifdef WIN32
74#include <sys/timeb.h>
75#endif
76
77#include "libtrace.h"
78#include "libtrace_parallel.h"
79
80#ifdef HAVE_NET_BPF_H
81#  include <net/bpf.h>
82#else
83#  ifdef HAVE_PCAP_BPF_H
84#    include <pcap-bpf.h>
85#  endif
86#endif
87
88
89#include "libtrace_int.h"
90#include "format_helper.h"
91#include "rt_protocol.h"
92#include "hash_toeplitz.h"
93
94#include <pthread.h>
95#include <signal.h>
96#include <unistd.h>
97#include <ctype.h>
98
99static inline int delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t);
100extern int libtrace_parallel;
101
102struct mem_stats {
103        struct memfail {
104           uint64_t cache_hit;
105           uint64_t ring_hit;
106           uint64_t miss;
107           uint64_t recycled;
108        } readbulk, read, write, writebulk;
109};
110
111
112#ifdef ENABLE_MEM_STATS
113// Grrr gcc wants this spelt out
114__thread struct mem_stats mem_hits = {{0},{0},{0},{0}};
115
116
117static void print_memory_stats() {
118        uint64_t total;
119#if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__)
120        char t_name[50];
121        pthread_getname_np(pthread_self(), t_name, sizeof(t_name));
122
123        fprintf(stderr, "Thread ID#%d - %s\n", (int) pthread_self(), t_name);
124#else
125        fprintf(stderr, "Thread ID#%d\n", (int) pthread_self());
126#endif
127
128        total = mem_hits.read.cache_hit + mem_hits.read.ring_hit + mem_hits.read.miss;
129        if (total) {
130                fprintf(stderr, "\tRead:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
131                                mem_hits.read.cache_hit, mem_hits.read.ring_hit, mem_hits.read.miss, mem_hits.read.recycled);
132                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
133                                total, (double) mem_hits.read.miss / (double) total * 100.0);
134        }
135
136        total = mem_hits.readbulk.cache_hit + mem_hits.readbulk.ring_hit + mem_hits.readbulk.miss;
137        if (total) {
138                fprintf(stderr, "\tReadbulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
139                                mem_hits.readbulk.cache_hit, mem_hits.readbulk.ring_hit, mem_hits.readbulk.miss, mem_hits.readbulk.recycled);
140
141
142                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
143                                total, (double) mem_hits.readbulk.miss / (double) total * 100.0);
144        }
145
146        total = mem_hits.write.cache_hit + mem_hits.write.ring_hit + mem_hits.write.miss;
147        if (total) {
148                fprintf(stderr, "\tWrite:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
149                                mem_hits.write.cache_hit, mem_hits.write.ring_hit, mem_hits.write.miss, mem_hits.write.recycled);
150
151                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
152                                total, (double) mem_hits.write.miss / (double) total * 100.0);
153        }
154
155        total = mem_hits.writebulk.cache_hit + mem_hits.writebulk.ring_hit + mem_hits.writebulk.miss;
156        if (total) {
157                fprintf(stderr, "\tWritebulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
158                                mem_hits.writebulk.cache_hit, mem_hits.writebulk.ring_hit, mem_hits.writebulk.miss, mem_hits.writebulk.recycled);
159
160                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
161                                total, (double) mem_hits.writebulk.miss / (double) total * 100.0);
162        }
163}
164#else
165static void print_memory_stats() {}
166#endif
167
168static const libtrace_generic_t gen_zero = {0};
169
170/* This should optimise away the switch to nothing in the explict cases */
171inline void send_message(libtrace_t *trace, libtrace_thread_t *thread,
172                const enum libtrace_messages type,
173                libtrace_generic_t data, libtrace_thread_t *sender) {
174
175        fn_cb_dataless fn = NULL;
176        enum libtrace_messages switchtype;
177        libtrace_callback_set_t *cbs = NULL;
178
179        if (thread == &trace->reporter_thread) {
180                cbs = trace->reporter_cbs;
181        } else {
182                cbs = trace->perpkt_cbs;
183        }
184
185        if (cbs == NULL)
186                return;
187
188        if (type >= MESSAGE_USER)
189                switchtype = MESSAGE_USER;
190        else
191                switchtype = (enum libtrace_messages) type;
192
193        switch (switchtype) {
194        case MESSAGE_STARTING:
195                if (cbs->message_starting)
196                        thread->user_data = (*cbs->message_starting)(trace,
197                                        thread, trace->global_blob);
198                return;
199        case MESSAGE_FIRST_PACKET:
200                if (cbs->message_first_packet)
201                                (*cbs->message_first_packet)(trace, thread,
202                                trace->global_blob, thread->user_data,
203                                sender);
204                return;
205        case MESSAGE_TICK_COUNT:
206                if (cbs->message_tick_count)
207                        (*cbs->message_tick_count)(trace, thread,
208                                        trace->global_blob, thread->user_data,
209                                        data.uint64);
210                return;
211        case MESSAGE_TICK_INTERVAL:
212                if (cbs->message_tick_interval)
213                        (*cbs->message_tick_interval)(trace, thread,
214                                        trace->global_blob, thread->user_data,
215                                        data.uint64);
216                return;
217        case MESSAGE_STOPPING:
218                fn = cbs->message_stopping;
219                break;
220        case MESSAGE_RESUMING:
221                fn = cbs->message_resuming;
222                break;
223        case MESSAGE_PAUSING:
224                fn = cbs->message_pausing;
225                break;
226        case MESSAGE_USER:
227                if (cbs->message_user)
228                        (*cbs->message_user)(trace, thread, trace->global_blob,
229                                        thread->user_data, type, data, sender);
230                return;
231        case MESSAGE_RESULT:
232                if (cbs->message_result)
233                        (*cbs->message_result)(trace, thread,
234                                        trace->global_blob, thread->user_data,
235                                        data.res);
236                return;
237
238        /* These should be unused */
239        case MESSAGE_DO_PAUSE:
240        case MESSAGE_DO_STOP:
241        case MESSAGE_POST_REPORTER:
242        case MESSAGE_PACKET:
243                return;
244        }
245
246        if (fn)
247                (*fn)(trace, thread, trace->global_blob, thread->user_data);
248}
249
250DLLEXPORT void trace_destroy_callback_set(libtrace_callback_set_t *cbset) {
251        free(cbset);
252}
253
254DLLEXPORT libtrace_callback_set_t *trace_create_callback_set() {
255        libtrace_callback_set_t *cbset;
256
257        cbset = (libtrace_callback_set_t *)malloc(sizeof(libtrace_callback_set_t));
258        memset(cbset, 0, sizeof(libtrace_callback_set_t));
259        return cbset;
260}
261
262/*
263 * This can be used once the hasher thread has been started and internally after
264 * verify_configuration.
265 */
266DLLEXPORT bool trace_has_dedicated_hasher(libtrace_t * libtrace)
267{
268        return libtrace->hasher_thread.type == THREAD_HASHER;
269}
270
271DLLEXPORT bool trace_has_reporter(libtrace_t * libtrace)
272{
273        assert(libtrace->state != STATE_NEW);
274        return libtrace->reporter_thread.type == THREAD_REPORTER && libtrace->reporter_cbs;
275}
276
277/**
278 * When running the number of perpkt threads in use.
279 * TODO what if the trace is not running yet, or has finished??
280 *
281 * @brief libtrace_perpkt_thread_nb
282 * @param t The trace
283 * @return
284 */
285DLLEXPORT int trace_get_perpkt_threads(libtrace_t * t) {
286        return t->perpkt_thread_count;
287}
288
289DLLEXPORT int trace_get_perpkt_thread_id(libtrace_thread_t *thread) {
290        return thread->perpkt_num;
291}
292
293/**
294 * Changes the overall traces state and signals the condition.
295 *
296 * @param trace A pointer to the trace
297 * @param new_state The new state of the trace
298 * @param need_lock Set to true if libtrace_lock is not held, otherwise
299 *        false in the case the lock is currently held by this thread.
300 */
301static inline void libtrace_change_state(libtrace_t *trace,
302        const enum trace_state new_state, const bool need_lock)
303{
304        UNUSED enum trace_state prev_state;
305        if (need_lock)
306                pthread_mutex_lock(&trace->libtrace_lock);
307        prev_state = trace->state;
308        trace->state = new_state;
309
310        if (trace->config.debug_state)
311                fprintf(stderr, "Trace(%s) state changed from %s to %s\n",
312                        trace->uridata, get_trace_state_name(prev_state),
313                        get_trace_state_name(trace->state));
314
315        pthread_cond_broadcast(&trace->perpkt_cond);
316        if (need_lock)
317                pthread_mutex_unlock(&trace->libtrace_lock);
318}
319
320/**
321 * Changes a thread's state and broadcasts the condition variable. This
322 * should always be done when the lock is held.
323 *
324 * Additionally for perpkt threads the state counts are updated.
325 *
326 * @param trace A pointer to the trace
327 * @param t A pointer to the thread to modify
328 * @param new_state The new state of the thread
329 * @param need_lock Set to true if libtrace_lock is not held, otherwise
330 *        false in the case the lock is currently held by this thread.
331 */
332static inline void thread_change_state(libtrace_t *trace, libtrace_thread_t *t,
333        const enum thread_states new_state, const bool need_lock)
334{
335        enum thread_states prev_state;
336        if (need_lock)
337                pthread_mutex_lock(&trace->libtrace_lock);
338        prev_state = t->state;
339        t->state = new_state;
340        if (t->type == THREAD_PERPKT) {
341                --trace->perpkt_thread_states[prev_state];
342                ++trace->perpkt_thread_states[new_state];
343        }
344
345        if (trace->config.debug_state)
346                fprintf(stderr, "Thread %d state changed from %d to %d\n",
347                        (int) t->tid, prev_state, t->state);
348
349        if (trace->perpkt_thread_states[THREAD_FINISHED] == trace->perpkt_thread_count) {
350                /* Make sure we save our final stats in case someone wants
351                 * them at the end of their program.
352                 */
353
354                trace_get_statistics(trace, NULL);
355                libtrace_change_state(trace, STATE_FINISHED, false);
356        }
357
358        pthread_cond_broadcast(&trace->perpkt_cond);
359        if (need_lock)
360                pthread_mutex_unlock(&trace->libtrace_lock);
361}
362
363/**
364 * This is valid once a trace is initialised
365 *
366 * @return True if the format supports parallel threads.
367 */
368static inline bool trace_supports_parallel(libtrace_t *trace)
369{
370        assert(trace);
371        assert(trace->format);
372        if (trace->format->pstart_input)
373                return true;
374        else
375                return false;
376}
377
378void libtrace_zero_thread(libtrace_thread_t * t) {
379        t->accepted_packets = 0;
380        t->filtered_packets = 0;
381        t->recorded_first = false;
382        t->tracetime_offset_usec = 0;
383        t->user_data = 0;
384        t->format_data = 0;
385        libtrace_zero_ringbuffer(&t->rbuffer);
386        t->trace = NULL;
387        t->ret = NULL;
388        t->type = THREAD_EMPTY;
389        t->perpkt_num = -1;
390}
391
392// Ints are aligned int is atomic so safe to read and write at same time
393// However write must be locked, read doesn't (We never try read before written to table)
394libtrace_thread_t * get_thread_table(libtrace_t *libtrace) {
395        int i = 0;
396        pthread_t tid = pthread_self();
397
398        for (;i<libtrace->perpkt_thread_count ;++i) {
399                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
400                        return &libtrace->perpkt_threads[i];
401        }
402        return NULL;
403}
404
405static libtrace_thread_t * get_thread_descriptor(libtrace_t *libtrace) {
406        libtrace_thread_t *ret;
407        if (!(ret = get_thread_table(libtrace))) {
408                pthread_t tid = pthread_self();
409                // Check if we are reporter or something else
410                if (libtrace->hasher_thread.type == THREAD_REPORTER &&
411                                pthread_equal(tid, libtrace->reporter_thread.tid))
412                        ret = &libtrace->reporter_thread;
413                else if (libtrace->hasher_thread.type == THREAD_HASHER &&
414                         pthread_equal(tid, libtrace->hasher_thread.tid))
415                        ret = &libtrace->hasher_thread;
416                else
417                        ret = NULL;
418        }
419        return ret;
420}
421
422DLLEXPORT void libtrace_make_packet_safe(libtrace_packet_t *pkt) {
423        // Duplicate the packet in standard malloc'd memory and free the
424        // original, This is a 1:1 exchange so the ocache count remains unchanged.
425        if (pkt->buf_control != TRACE_CTRL_PACKET) {
426                libtrace_packet_t *dup;
427                dup = trace_copy_packet(pkt);
428                /* Release the external buffer */
429                trace_fin_packet(pkt);
430                /* Copy the duplicated packet over the existing */
431                memcpy(pkt, dup, sizeof(libtrace_packet_t));
432                /* Free the packet structure */
433                free(dup);
434        }
435}
436
437/**
438 * Makes a libtrace_result_t safe, used when pausing a trace.
439 * This will call libtrace_make_packet_safe if the result is
440 * a packet.
441 */
442DLLEXPORT void libtrace_make_result_safe(libtrace_result_t *res) {
443        if (res->type == RESULT_PACKET) {
444                libtrace_make_packet_safe(res->value.pkt);
445        }
446}
447
448/**
449 * Holds threads in a paused state, until released by broadcasting
450 * the condition mutex.
451 */
452static void trace_thread_pause(libtrace_t *trace, libtrace_thread_t *t) {
453        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
454        thread_change_state(trace, t, THREAD_PAUSED, false);
455        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
456                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
457        }
458        thread_change_state(trace, t, THREAD_RUNNING, false);
459        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
460}
461
462/**
463 * Sends a packet to the user, expects either a valid packet or a TICK packet.
464 *
465 * @param trace The trace
466 * @param t The current thread
467 * @param packet A pointer to the packet storage, which may be set to null upon
468 *               return, or a packet to be finished.
469 * @param tracetime If true packets are delayed to match with tracetime
470 * @return 0 is successful, otherwise if playing back in tracetime
471 *         READ_MESSAGE(-2) can be returned in which case the packet is not sent.
472 *
473 * @note READ_MESSAGE will only be returned if tracetime is true.
474 */
475static inline int dispatch_packet(libtrace_t *trace,
476                                  libtrace_thread_t *t,
477                                  libtrace_packet_t **packet,
478                                  bool tracetime) {
479
480        if ((*packet)->error > 0) {
481                if (tracetime) {
482                        if (delay_tracetime(trace, packet[0], t) == READ_MESSAGE)
483                                return READ_MESSAGE;
484                }
485                if (!IS_LIBTRACE_META_PACKET((*packet))) {
486                        t->accepted_packets++;
487                }
488                if (trace->perpkt_cbs->message_packet)
489                        *packet = (*trace->perpkt_cbs->message_packet)(trace, t, trace->global_blob, t->user_data, *packet);
490                trace_fin_packet(*packet);
491        } else {
492                assert((*packet)->error == READ_TICK);
493                libtrace_generic_t data = {.uint64 = trace_packet_get_order(*packet)};
494                send_message(trace, t, MESSAGE_TICK_COUNT, data, t);
495        }
496        return 0;
497}
498
499/**
500 * Sends a batch of packets to the user, expects either a valid packet or a
501 * TICK packet.
502 *
503 * @param trace The trace
504 * @param t The current thread
505 * @param packets [in,out] An array of packets, these may be null upon return
506 * @param nb_packets The total number of packets in the list
507 * @param empty [in,out] A pointer to an integer storing the first empty slot,
508 * upon return this is updated
509 * @param offset [in,out] The offset into the array, upon return this is updated
510 * @param tracetime If true packets are delayed to match with tracetime
511 * @return 0 is successful, otherwise if playing back in tracetime
512 *         READ_MESSAGE(-2) can be returned in which case the packet is not sent.
513 *
514 * @note READ_MESSAGE will only be returned if tracetime is true.
515 */
516static inline int dispatch_packets(libtrace_t *trace,
517                                  libtrace_thread_t *t,
518                                  libtrace_packet_t *packets[],
519                                  int nb_packets, int *empty, int *offset,
520                                  bool tracetime) {
521        for (;*offset < nb_packets; ++*offset) {
522                int ret;
523                ret = dispatch_packet(trace, t, &packets[*offset], tracetime);
524                if (ret == 0) {
525                        /* Move full slots to front as we go */
526                        if (packets[*offset]) {
527                                if (*empty != *offset) {
528                                        packets[*empty] = packets[*offset];
529                                        packets[*offset] = NULL;
530                                }
531                                ++*empty;
532                        }
533                } else {
534                        /* Break early */
535                        assert(ret == READ_MESSAGE);
536                        return READ_MESSAGE;
537                }
538        }
539
540        return 0;
541}
542
543/**
544 * Pauses a per packet thread, messages will not be processed when the thread
545 * is paused.
546 *
547 * This process involves reading packets if a hasher thread is used. As such
548 * this function can fail to pause due to errors when reading in which case
549 * the thread should be stopped instead.
550 *
551 *
552 * @brief trace_perpkt_thread_pause
553 * @return READ_ERROR(-1) or READ_EOF(0) or 1 if successfull
554 */
555static int trace_perpkt_thread_pause(libtrace_t *trace, libtrace_thread_t *t,
556                                     libtrace_packet_t *packets[],
557                                     int nb_packets, int *empty, int *offset) {
558        libtrace_packet_t * packet = NULL;
559
560        /* Let the user thread know we are going to pause */
561        send_message(trace, t, MESSAGE_PAUSING, gen_zero, t);
562
563        /* Send through any remaining packets (or messages) without delay */
564
565        /* First send those packets already read, as fast as possible
566         * This should never fail or check for messages etc. */
567        ASSERT_RET(dispatch_packets(trace, t, packets, nb_packets, empty,
568                                    offset, false), == 0);
569
570        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
571        /* If a hasher thread is running, empty input queues so we don't lose data */
572        if (trace_has_dedicated_hasher(trace)) {
573                // The hasher has stopped by this point, so the queue shouldn't be filling
574                while(!libtrace_ringbuffer_is_empty(&t->rbuffer) || t->format_data) {
575                        int ret = trace->pread(trace, t, &packet, 1);
576                        if (ret == 1) {
577                                if (packet->error > 0) {
578                                        store_first_packet(trace, packet, t);
579                                }
580                                ASSERT_RET(dispatch_packet(trace, t, &packet, false), == 0);
581                                if (packet == NULL)
582                                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
583                        } else if (ret != READ_MESSAGE) {
584                                /* Ignore messages we pick these up next loop */
585                                assert (ret == READ_EOF || ret == READ_ERROR);
586                                /* Verify no packets are remaining */
587                                /* TODO refactor this sanity check out!! */
588                                while (!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
589                                        ASSERT_RET(trace->pread(trace, t, &packet, 1), <= 0);
590                                        // No packets after this should have any data in them
591                                        assert(packet->error <= 0);
592                                }
593                                libtrace_ocache_free(&trace->packet_freelist, (void **) &packet, 1, 1);
594                                return -1;
595                        }
596                }
597        }
598        libtrace_ocache_free(&trace->packet_freelist, (void **) &packet, 1, 1);
599
600        /* Now we do the actual pause, this returns when we resumed */
601        trace_thread_pause(trace, t);
602        send_message(trace, t, MESSAGE_RESUMING, gen_zero, t);
603        return 1;
604}
605
606/**
607 * The is the entry point for our packet processing threads.
608 */
609static void* perpkt_threads_entry(void *data) {
610        libtrace_t *trace = (libtrace_t *)data;
611        libtrace_thread_t *t;
612        libtrace_message_t message = {0, {.uint64=0}, NULL};
613        libtrace_packet_t *packets[trace->config.burst_size];
614        size_t i;
615        //int ret;
616        /* The current reading position into the packets */
617        int offset = 0;
618        /* The number of packets last read */
619        int nb_packets = 0;
620        /* The offset to the first NULL packet upto offset */
621        int empty = 0;
622        int j;
623
624        /* Wait until trace_pstart has been completed */
625        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
626        t = get_thread_table(trace);
627        assert(t);
628        if (trace->state == STATE_ERROR) {
629                thread_change_state(trace, t, THREAD_FINISHED, false);
630                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
631                pthread_exit(NULL);
632        }
633        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
634
635        if (trace->format->pregister_thread) {
636                if (trace->format->pregister_thread(trace, t, 
637                                trace_is_parallel(trace)) < 0) {
638                        thread_change_state(trace, t, THREAD_FINISHED, false);
639                        pthread_exit(NULL);
640                }
641        }
642
643        /* Fill our buffer with empty packets */
644        memset(&packets, 0, sizeof(void*) * trace->config.burst_size);
645        libtrace_ocache_alloc(&trace->packet_freelist, (void **) packets,
646                              trace->config.burst_size,
647                              trace->config.burst_size);
648
649        /* ~~~~~~~~~~~ Setup complete now we loop ~~~~~~~~~~~~~~~ */
650
651        /* Let the per_packet function know we have started */
652        send_message(trace, t, MESSAGE_STARTING, gen_zero, t);
653        send_message(trace, t, MESSAGE_RESUMING, gen_zero, t);
654
655        for (;;) {
656
657                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
658                        int ret;
659                        switch (message.code) {
660                                case MESSAGE_DO_PAUSE: // This is internal
661                                        ret = trace_perpkt_thread_pause(trace, t,
662                                              packets, nb_packets, &empty, &offset);
663                                        if (ret == READ_EOF) {
664                                                goto eof;
665                                        } else if (ret == READ_ERROR) {
666                                                goto error;
667                                        }
668                                        assert(ret == 1);
669                                        continue;
670                                case MESSAGE_DO_STOP: // This is internal
671                                        goto eof;
672                        }
673                        send_message(trace, t, message.code, message.data, 
674                                        message.sender);
675                        /* Continue and the empty messages out before packets */
676                        continue;
677                }
678
679
680                /* Do we need to read a new set of packets MOST LIKELY we do */
681                if (offset == nb_packets) {
682                        /* Refill the packet buffer */
683                        if (empty != nb_packets) {
684                                // Refill the empty packets
685                                libtrace_ocache_alloc(&trace->packet_freelist,
686                                                      (void **) &packets[empty],
687                                                      nb_packets - empty,
688                                                      nb_packets - empty);
689                        }
690                        if (!trace->pread) {
691                                assert(packets[0]);
692                                nb_packets = trace_read_packet(trace, packets[0]);
693                                packets[0]->error = nb_packets;
694                                if (nb_packets > 0)
695                                        nb_packets = 1;
696                        } else {
697                                nb_packets = trace->pread(trace, t, packets, trace->config.burst_size);
698                        }
699                        offset = 0;
700                        empty = 0;
701                }
702
703                /* Handle error/message cases */
704                if (nb_packets > 0) {
705                        /* Store the first non-meta packet */
706                        for (j = 0; j < nb_packets; j++) {
707                                if (t->recorded_first)
708                                        break;
709                                if (packets[j]->error > 0) {
710                                        store_first_packet(trace, packets[j], t);
711                                }
712                        }
713                        dispatch_packets(trace, t, packets, nb_packets, &empty,
714                                         &offset, trace->tracetime);
715                } else {
716                        switch (nb_packets) {
717                        case READ_EOF:
718                                goto eof;
719                        case READ_ERROR:
720                                goto error;
721                        case READ_MESSAGE:
722                                nb_packets = 0;
723                                continue;
724                        default:
725                                fprintf(stderr, "Unexpected error %d!!\n", nb_packets);
726                                goto error;
727                        }
728                }
729
730        }
731
732error:
733        message.code = MESSAGE_DO_STOP;
734        message.sender = t;
735        message.data.uint64 = 0;
736        trace_message_perpkts(trace, &message);
737eof:
738        /* ~~~~~~~~~~~~~~ Trace is finished do tear down ~~~~~~~~~~~~~~~~~~~~~ */
739
740        // Let the per_packet function know we have stopped
741        send_message(trace, t, MESSAGE_PAUSING, gen_zero, t);
742        send_message(trace, t, MESSAGE_STOPPING, gen_zero, t);
743
744        // Free any remaining packets
745        for (i = 0; i < trace->config.burst_size; i++) {
746                if (packets[i]) {
747                        libtrace_ocache_free(&trace->packet_freelist, (void **) &packets[i], 1, 1);
748                        packets[i] = NULL;
749                }
750        }
751
752        thread_change_state(trace, t, THREAD_FINISHED, true);
753
754        /* Make sure the reporter sees we have finished */
755        if (trace_has_reporter(trace))
756                trace_post_reporter(trace);
757
758        // Release all ocache memory before unregistering with the format
759        // because this might(it does in DPDK) unlink the formats mempool
760        // causing destroy/finish packet to fail.
761        libtrace_ocache_unregister_thread(&trace->packet_freelist);
762        if (trace->format->punregister_thread) {
763                trace->format->punregister_thread(trace, t);
764        }
765        print_memory_stats();
766
767        pthread_exit(NULL);
768}
769
770/**
771 * The start point for our single threaded hasher thread, this will read
772 * and hash a packet from a data source and queue it against the correct
773 * core to process it.
774 */
775static void* hasher_entry(void *data) {
776        libtrace_t *trace = (libtrace_t *)data;
777        libtrace_thread_t * t;
778        int i;
779        libtrace_packet_t * packet;
780        libtrace_message_t message = {0, {.uint64=0}, NULL};
781        int pkt_skipped = 0;
782
783        assert(trace_has_dedicated_hasher(trace));
784        /* Wait until all threads are started and objects are initialised (ring buffers) */
785        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
786        t = &trace->hasher_thread;
787        assert(t->type == THREAD_HASHER && pthread_equal(pthread_self(), t->tid));
788        if (trace->state == STATE_ERROR) {
789                thread_change_state(trace, t, THREAD_FINISHED, false);
790                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
791                pthread_exit(NULL);
792        }
793        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
794
795        /* We are reading but it is not the parallel API */
796        if (trace->format->pregister_thread) {
797                trace->format->pregister_thread(trace, t, true);
798        }
799
800        /* Read all packets in then hash and queue against the correct thread */
801        while (1) {
802                int thread;
803                if (!pkt_skipped)
804                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
805                assert(packet);
806
807                // Check for messages that we expect MESSAGE_DO_PAUSE, (internal messages only)
808                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
809                        switch(message.code) {
810                                case MESSAGE_DO_PAUSE:
811                                        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
812                                        thread_change_state(trace, t, THREAD_PAUSED, false);
813                                        pthread_cond_broadcast(&trace->perpkt_cond);
814                                        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
815                                                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
816                                        }
817                                        thread_change_state(trace, t, THREAD_RUNNING, false);
818                                        pthread_cond_broadcast(&trace->perpkt_cond);
819                                        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
820                                        break;
821                                case MESSAGE_DO_STOP:
822                                        /* Either FINISHED or FINISHING */
823                                        assert(trace->started == false);
824                                        /* Mark the current packet as EOF */
825                                        packet->error = 0;
826                                        goto hasher_eof;
827                                default:
828                                        fprintf(stderr, "Hasher thread didn't expect message code=%d\n", message.code);
829                        }
830                        pkt_skipped = 1;
831                        continue;
832                }
833
834                if ((packet->error = trace_read_packet(trace, packet)) <1) {
835                        if (packet->error == READ_MESSAGE) {
836                                pkt_skipped = 1;
837                                continue;
838                        } else {
839                                break; /* We are EOF or error'd either way we stop  */
840                        }
841                }
842
843                /* We are guaranteed to have a hash function i.e. != NULL */
844                trace_packet_set_hash(packet, (*trace->hasher)(packet, trace->hasher_data));
845                thread = trace_packet_get_hash(packet) % trace->perpkt_thread_count;
846                /* Blocking write to the correct queue - I'm the only writer */
847                if (trace->perpkt_threads[thread].state != THREAD_FINISHED) {
848                        uint64_t order = trace_packet_get_order(packet);
849                        libtrace_ringbuffer_write(&trace->perpkt_threads[thread].rbuffer, packet);
850                        if (trace->config.tick_count && order % trace->config.tick_count == 0) {
851                                // Write ticks to everyone else
852                                libtrace_packet_t * pkts[trace->perpkt_thread_count];
853                                memset(pkts, 0, sizeof(void *) * trace->perpkt_thread_count);
854                                libtrace_ocache_alloc(&trace->packet_freelist, (void **) pkts, trace->perpkt_thread_count, trace->perpkt_thread_count);
855                                for (i = 0; i < trace->perpkt_thread_count; i++) {
856                                        pkts[i]->error = READ_TICK;
857                                        trace_packet_set_order(pkts[i], order);
858                                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, pkts[i]);
859                                }
860                        }
861                        pkt_skipped = 0;
862                } else {
863                        assert(!"Dropping a packet!!");
864                        pkt_skipped = 1; // Reuse that packet no one read it
865                }
866        }
867hasher_eof:
868        /* Broadcast our last failed read to all threads */
869        for (i = 0; i < trace->perpkt_thread_count; i++) {
870                libtrace_packet_t * bcast;
871                if (i == trace->perpkt_thread_count - 1) {
872                        bcast = packet;
873                } else {
874                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &bcast, 1, 1);
875                        bcast->error = packet->error;
876                }
877                ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
878                if (trace->perpkt_threads[i].state != THREAD_FINISHED) {
879                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, bcast);
880                } else {
881                        libtrace_ocache_free(&trace->packet_freelist, (void **) &bcast, 1, 1);
882                }
883                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
884        }
885
886        // We don't need to free the packet
887        thread_change_state(trace, t, THREAD_FINISHED, true);
888
889        libtrace_ocache_unregister_thread(&trace->packet_freelist);
890        if (trace->format->punregister_thread) {
891                trace->format->punregister_thread(trace, t);
892        }
893        print_memory_stats();
894
895        // TODO remove from TTABLE t sometime
896        pthread_exit(NULL);
897}
898
899/* Our simplest case when a thread becomes ready it can obtain an exclusive
900 * lock to read packets from the underlying trace.
901 */
902static int trace_pread_packet_first_in_first_served(libtrace_t *libtrace,
903                                                    libtrace_thread_t *t,
904                                                    libtrace_packet_t *packets[],
905                                                    size_t nb_packets) {
906        size_t i = 0;
907        //bool tick_hit = false;
908
909        ASSERT_RET(pthread_mutex_lock(&libtrace->read_packet_lock), == 0);
910        /* Read nb_packets */
911        for (i = 0; i < nb_packets; ++i) {
912                if (libtrace_message_queue_count(&t->messages) > 0) {
913                        if ( i==0 ) {
914                                ASSERT_RET(pthread_mutex_unlock(&libtrace->read_packet_lock), == 0);
915                                return READ_MESSAGE;
916                        } else {
917                                break;
918                        }
919                }
920                packets[i]->error = trace_read_packet(libtrace, packets[i]);
921
922                if (packets[i]->error <= 0) {
923                        /* We'll catch this next time if we have already got packets */
924                        if ( i==0 ) {
925                                ASSERT_RET(pthread_mutex_unlock(&libtrace->read_packet_lock), == 0);
926                                return packets[i]->error;
927                        } else {
928                                break;
929                        }
930                }
931                /*
932                if (libtrace->config.tick_count && trace_packet_get_order(packets[i]) % libtrace->config.tick_count == 0) {
933                        tick_hit = true;
934                }*/
935
936                // Doing this inside the lock ensures the first packet is
937                // always recorded first
938                if (!t->recorded_first && packets[0]->error > 0) {
939                        store_first_packet(libtrace, packets[0], t);
940                }
941        }
942        ASSERT_RET(pthread_mutex_unlock(&libtrace->read_packet_lock), == 0);
943        /* XXX TODO this needs to be inband with packets, or we don't bother in this case
944        if (tick_hit) {
945                libtrace_message_t tick;
946                tick.additional.uint64 = trace_packet_get_order(packets[i]);
947                tick.code = MESSAGE_TICK;
948                trace_send_message_to_perpkts(libtrace, &tick);
949        } */
950        return i;
951}
952
953/**
954 * For the case that we have a dedicated hasher thread
955 * 1. We read a packet from our buffer
956 * 2. Move that into the packet provided (packet)
957 */
958inline static int trace_pread_packet_hasher_thread(libtrace_t *libtrace,
959                                                   libtrace_thread_t *t,
960                                                   libtrace_packet_t *packets[],
961                                                   size_t nb_packets) {
962        size_t i;
963
964        /* We store the last error message here */
965        if (t->format_data) {
966                return ((libtrace_packet_t *)t->format_data)->error;
967        }
968
969        // Always grab at least one
970        if (packets[0]) // Recycle the old get the new
971                libtrace_ocache_free(&libtrace->packet_freelist, (void **) packets, 1, 1);
972        packets[0] = libtrace_ringbuffer_read(&t->rbuffer);
973
974        if (packets[0]->error <= 0 && packets[0]->error != READ_TICK) {
975                return packets[0]->error;
976        }
977
978        for (i = 1; i < nb_packets; i++) {
979                if (packets[i]) // Recycle the old get the new
980                        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packets[i], 1, 1);
981                if (!libtrace_ringbuffer_try_read(&t->rbuffer, (void **) &packets[i])) {
982                        packets[i] = NULL;
983                        break;
984                }
985
986                /* We will return an error or EOF the next time around */
987                if (packets[i]->error <= 0 && packets[0]->error != READ_TICK) {
988                        /* The message case will be checked automatically -
989                           However other cases like EOF and error will only be
990                           sent once*/
991                        if (packets[i]->error != READ_MESSAGE) {
992                                assert(t->format_data == NULL);
993                                t->format_data = packets[i];
994                        }
995                        break;
996                }
997        }
998
999        return i;
1000}
1001
1002/**
1003 * For the first packet of each queue we keep a copy and note the system
1004 * time it was received at.
1005 *
1006 * This is used for finding the first packet when playing back a trace
1007 * in trace time. And can be used by real time applications to print
1008 * results out every XXX seconds.
1009 */
1010void store_first_packet(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t)
1011{
1012
1013        libtrace_message_t mesg = {0, {.uint64=0}, NULL};
1014        struct timeval tv;
1015        libtrace_packet_t * dup;
1016
1017        if (t->recorded_first) {
1018                return;
1019        }
1020
1021        if (IS_LIBTRACE_META_PACKET(packet)) {
1022                return;
1023        }
1024
1025        /* We mark system time against a copy of the packet */
1026        gettimeofday(&tv, NULL);
1027        dup = trace_copy_packet(packet);
1028
1029        ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
1030        libtrace->first_packets.packets[t->perpkt_num].packet = dup;
1031        memcpy(&libtrace->first_packets.packets[t->perpkt_num].tv, &tv, sizeof(tv));
1032        libtrace->first_packets.count++;
1033
1034        /* Now update the first */
1035        if (libtrace->first_packets.count == 1) {
1036                /* We the first entry hence also the first known packet */
1037                libtrace->first_packets.first = t->perpkt_num;
1038        } else {
1039                /* Check if we are newer than the previous 'first' packet */
1040                size_t first = libtrace->first_packets.first;
1041                struct timeval cur_ts = trace_get_timeval(dup);
1042                struct timeval first_ts = trace_get_timeval(libtrace->first_packets.packets[first].packet);
1043                if (timercmp(&cur_ts, &first_ts, <))
1044                        libtrace->first_packets.first = t->perpkt_num;
1045        }
1046        ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
1047
1048        memset(&mesg, 0, sizeof(libtrace_message_t));
1049        mesg.code = MESSAGE_FIRST_PACKET;
1050        trace_message_reporter(libtrace, &mesg);
1051        trace_message_perpkts(libtrace, &mesg);
1052        t->recorded_first = true;
1053}
1054
1055DLLEXPORT int trace_get_first_packet(libtrace_t *libtrace,
1056                                     libtrace_thread_t *t,
1057                                     const libtrace_packet_t **packet,
1058                                     const struct timeval **tv)
1059{
1060        void * tmp;
1061        int ret = 0;
1062
1063        if (t) {
1064                if (t->type != THREAD_PERPKT || t->trace != libtrace)
1065                        return -1;
1066        }
1067
1068        /* Throw away these which we don't use */
1069        if (!packet)
1070                packet = (const libtrace_packet_t **) &tmp;
1071        if (!tv)
1072                tv = (const struct timeval **) &tmp;
1073
1074        ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
1075        if (t) {
1076                /* Get the requested thread */
1077                *packet = libtrace->first_packets.packets[t->perpkt_num].packet;
1078                *tv = &libtrace->first_packets.packets[t->perpkt_num].tv;
1079        } else if (libtrace->first_packets.count) {
1080                /* Get the first packet across all threads */
1081                *packet = libtrace->first_packets.packets[libtrace->first_packets.first].packet;
1082                *tv = &libtrace->first_packets.packets[libtrace->first_packets.first].tv;
1083                if (libtrace->first_packets.count == (size_t) libtrace->perpkt_thread_count) {
1084                        ret = 1;
1085                } else {
1086                        struct timeval curr_tv;
1087                        // If a second has passed since the first entry we will assume this is the very first packet
1088                        gettimeofday(&curr_tv, NULL);
1089                        if (curr_tv.tv_sec > (*tv)->tv_sec) {
1090                                if(curr_tv.tv_usec > (*tv)->tv_usec || curr_tv.tv_sec - (*tv)->tv_sec > 1) {
1091                                        ret = 1;
1092                                }
1093                        }
1094                }
1095        } else {
1096                *packet = NULL;
1097                *tv = NULL;
1098        }
1099        ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
1100        return ret;
1101}
1102
1103
1104DLLEXPORT uint64_t tv_to_usec(const struct timeval *tv)
1105{
1106        return (uint64_t) tv->tv_sec*1000000ull + (uint64_t) tv->tv_usec;
1107}
1108
1109inline static struct timeval usec_to_tv(uint64_t usec)
1110{
1111        struct timeval tv;
1112        tv.tv_sec = usec / 1000000;
1113        tv.tv_usec = usec % 1000000;
1114        return tv;
1115}
1116
1117/** Similar to delay_tracetime but send messages to all threads periodically */
1118static void* reporter_entry(void *data) {
1119        libtrace_message_t message = {0, {.uint64=0}, NULL};
1120        libtrace_t *trace = (libtrace_t *)data;
1121        libtrace_thread_t *t = &trace->reporter_thread;
1122
1123        /* Wait until all threads are started */
1124        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
1125        if (trace->state == STATE_ERROR) {
1126                thread_change_state(trace, t, THREAD_FINISHED, false);
1127                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1128                pthread_exit(NULL);
1129        }
1130        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1131
1132        if (trace->format->pregister_thread) {
1133                trace->format->pregister_thread(trace, t, false);
1134        }
1135
1136        send_message(trace, t, MESSAGE_STARTING, (libtrace_generic_t){0}, t);
1137        send_message(trace, t, MESSAGE_RESUMING, (libtrace_generic_t){0}, t);
1138
1139        while (!trace_has_finished(trace)) {
1140                if (trace->config.reporter_polling) {
1141                        if (libtrace_message_queue_try_get(&t->messages, &message) == LIBTRACE_MQ_FAILED)
1142                                message.code = MESSAGE_POST_REPORTER;
1143                } else {
1144                        libtrace_message_queue_get(&t->messages, &message);
1145                }
1146                switch (message.code) {
1147                        // Check for results
1148                        case MESSAGE_POST_REPORTER:
1149                                trace->combiner.read(trace, &trace->combiner);
1150                                break;
1151                        case MESSAGE_DO_PAUSE:
1152                                assert(trace->combiner.pause);
1153                                trace->combiner.pause(trace, &trace->combiner);
1154                                send_message(trace, t, MESSAGE_PAUSING,
1155                                                (libtrace_generic_t) {0}, t);
1156                                trace_thread_pause(trace, t);
1157                                send_message(trace, t, MESSAGE_RESUMING,
1158                                                (libtrace_generic_t) {0}, t);
1159                                break;
1160                default:
1161                        send_message(trace, t, message.code, message.data,
1162                                        message.sender);
1163                }
1164        }
1165
1166        // Flush out whats left now all our threads have finished
1167        trace->combiner.read_final(trace, &trace->combiner);
1168
1169        // GOODBYE
1170        send_message(trace, t, MESSAGE_PAUSING,(libtrace_generic_t) {0}, t);
1171        send_message(trace, t, MESSAGE_STOPPING,(libtrace_generic_t) {0}, t);
1172
1173        thread_change_state(trace, &trace->reporter_thread, THREAD_FINISHED, true);
1174        print_memory_stats();
1175        return NULL;
1176}
1177
1178/** Similar to delay_tracetime but send messages to all threads periodically */
1179static void* keepalive_entry(void *data) {
1180        struct timeval prev, next;
1181        libtrace_message_t message = {0, {.uint64=0}, NULL};
1182        libtrace_t *trace = (libtrace_t *)data;
1183        uint64_t next_release;
1184        libtrace_thread_t *t = &trace->keepalive_thread;
1185
1186        /* Wait until all threads are started */
1187        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
1188        if (trace->state == STATE_ERROR) {
1189                thread_change_state(trace, t, THREAD_FINISHED, false);
1190                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1191                pthread_exit(NULL);
1192        }
1193        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1194
1195        gettimeofday(&prev, NULL);
1196        memset(&message, 0, sizeof(libtrace_message_t));
1197        message.code = MESSAGE_TICK_INTERVAL;
1198
1199        while (trace->state != STATE_FINISHED) {
1200                fd_set rfds;
1201                next_release = tv_to_usec(&prev) + (trace->config.tick_interval * 1000);
1202                gettimeofday(&next, NULL);
1203                if (next_release > tv_to_usec(&next)) {
1204                        next = usec_to_tv(next_release - tv_to_usec(&next));
1205                        // Wait for timeout or a message
1206                        FD_ZERO(&rfds);
1207                        FD_SET(libtrace_message_queue_get_fd(&t->messages), &rfds);
1208                        if (select(libtrace_message_queue_get_fd(&t->messages)+1, &rfds, NULL, NULL, &next) == 1) {
1209                                libtrace_message_t msg;
1210                                libtrace_message_queue_get(&t->messages, &msg);
1211                                assert(msg.code == MESSAGE_DO_STOP);
1212                                goto done;
1213                        }
1214                }
1215                prev = usec_to_tv(next_release);
1216                if (trace->state == STATE_RUNNING) {
1217                        message.data.uint64 = ((((uint64_t)prev.tv_sec) << 32) +
1218                                               (((uint64_t)prev.tv_usec << 32)/1000000));
1219                        trace_message_perpkts(trace, &message);
1220                }
1221        }
1222done:
1223
1224        thread_change_state(trace, t, THREAD_FINISHED, true);
1225        return NULL;
1226}
1227
1228/**
1229 * Delays a packets playback so the playback will be in trace time.
1230 * This may break early if a message becomes available.
1231 *
1232 * Requires the first packet for this thread to be received.
1233 * @param libtrace  The trace
1234 * @param packet    The packet to delay
1235 * @param t         The current thread
1236 * @return Either READ_MESSAGE(-2) or 0 is successful
1237 */
1238static inline int delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t) {
1239        struct timeval curr_tv, pkt_tv;
1240        uint64_t next_release = t->tracetime_offset_usec;
1241        uint64_t curr_usec;
1242
1243        if (!t->tracetime_offset_usec) {
1244                const libtrace_packet_t *first_pkt;
1245                const struct timeval *sys_tv;
1246                int64_t initial_offset;
1247                int stable = trace_get_first_packet(libtrace, NULL, &first_pkt, &sys_tv);
1248                if (!first_pkt)
1249                        return 0;
1250                pkt_tv = trace_get_timeval(first_pkt);
1251                initial_offset = (int64_t)tv_to_usec(sys_tv) - (int64_t)tv_to_usec(&pkt_tv);
1252                /* In the unlikely case offset is 0, change it to 1 */
1253                if (stable)
1254                        t->tracetime_offset_usec = initial_offset ? initial_offset: 1;
1255                next_release = initial_offset;
1256        }
1257        /* next_release == offset */
1258        pkt_tv = trace_get_timeval(packet);
1259        next_release += tv_to_usec(&pkt_tv);
1260        gettimeofday(&curr_tv, NULL);
1261        curr_usec = tv_to_usec(&curr_tv);
1262        if (next_release > curr_usec) {
1263                int ret, mesg_fd = libtrace_message_queue_get_fd(&t->messages);
1264                struct timeval delay_tv = usec_to_tv(next_release-curr_usec);
1265                fd_set rfds;
1266                FD_ZERO(&rfds);
1267                FD_SET(mesg_fd, &rfds);
1268                // We need to wait
1269                ret = select(mesg_fd+1, &rfds, NULL, NULL, &delay_tv);
1270                if (ret == 0) {
1271                        return 0;
1272                } else if (ret > 0) {
1273                        return READ_MESSAGE;
1274                } else {
1275                        assert(!"trace_delay_packet: Unexpected return from select");
1276                }
1277        }
1278        return 0;
1279}
1280
1281/* Discards packets that don't match the filter.
1282 * Discarded packets are emptied and then moved to the end of the packet list.
1283 *
1284 * @param trace       The trace format, containing the filter
1285 * @param packets     An array of packets
1286 * @param nb_packets  The number of valid items in packets
1287 *
1288 * @return The number of packets that passed the filter, which are moved to
1289 *          the start of the packets array
1290 */
1291static inline size_t filter_packets(libtrace_t *trace,
1292                                    libtrace_packet_t **packets,
1293                                    size_t nb_packets) {
1294        size_t offset = 0;
1295        size_t i;
1296
1297        for (i = 0; i < nb_packets; ++i) {
1298                // The filter needs the trace attached to receive the link type
1299                packets[i]->trace = trace;
1300                if (trace_apply_filter(trace->filter, packets[i])) {
1301                        libtrace_packet_t *tmp;
1302                        tmp = packets[offset];
1303                        packets[offset++] = packets[i];
1304                        packets[i] = tmp;
1305                } else {
1306                        trace_fin_packet(packets[i]);
1307                }
1308        }
1309
1310        return offset;
1311}
1312
1313/* Read a batch of packets from the trace into a buffer.
1314 * Note that this function will block until a packet is read (or EOF is reached)
1315 *
1316 * @param libtrace    The trace
1317 * @param t           The thread
1318 * @param packets     An array of packets
1319 * @param nb_packets  The number of empty packets in packets
1320 * @return The number of packets read, 0 on EOF (or an error/message -1,-2).
1321 */
1322static int trace_pread_packet_wrapper(libtrace_t *libtrace,
1323                                      libtrace_thread_t *t,
1324                                      libtrace_packet_t *packets[],
1325                                      size_t nb_packets) {
1326        int i;
1327        assert(nb_packets);
1328        assert(libtrace && "libtrace is NULL in trace_read_packet()");
1329        if (trace_is_err(libtrace))
1330                return -1;
1331        if (!libtrace->started) {
1332                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
1333                              "You must call libtrace_start() before trace_read_packet()\n");
1334                return -1;
1335        }
1336
1337        if (libtrace->format->pread_packets) {
1338                int ret;
1339                for (i = 0; i < (int) nb_packets; ++i) {
1340                        assert(i[packets]);
1341                        if (!(packets[i]->buf_control==TRACE_CTRL_PACKET ||
1342                              packets[i]->buf_control==TRACE_CTRL_EXTERNAL)) {
1343                                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,
1344                                              "Packet passed to trace_read_packet() is invalid\n");
1345                                return -1;
1346                        }
1347                        packets[i]->which_trace_start = libtrace->startcount;
1348                }
1349                do {
1350                        ret=libtrace->format->pread_packets(libtrace, t,
1351                                                            packets,
1352                                                            nb_packets);
1353                        /* Error, EOF or message? */
1354                        if (ret <= 0) {
1355                                return ret;
1356                        }
1357
1358                        if (libtrace->filter) {
1359                                int remaining;
1360                                remaining = filter_packets(libtrace,
1361                                                           packets, ret);
1362                                t->filtered_packets += ret - remaining;
1363                                ret = remaining;
1364                        }
1365                        for (i = 0; i < ret; ++i) {
1366                                /* We do not mark the packet against the trace,
1367                                 * before hand or after. After breaks DAG meta
1368                                 * packets and before is inefficient */
1369                                //packets[i]->trace = libtrace;
1370                                /* TODO IN FORMAT?? Like traditional libtrace */
1371                                if (libtrace->snaplen>0)
1372                                        trace_set_capture_length(packets[i],
1373                                                        libtrace->snaplen);
1374                        }
1375                } while(ret == 0);
1376                return ret;
1377        }
1378        trace_set_err(libtrace, TRACE_ERR_UNSUPPORTED,
1379                      "This format does not support reading packets\n");
1380        return ~0U;
1381}
1382
1383/* Restarts a parallel trace, this is called from trace_pstart.
1384 * The libtrace lock is held upon calling this function.
1385 * Typically with a parallel trace the threads are not
1386 * killed rather.
1387 */
1388static int trace_prestart(libtrace_t * libtrace, void *global_blob,
1389                          libtrace_callback_set_t *per_packet_cbs, 
1390                          libtrace_callback_set_t *reporter_cbs) {
1391        int i, err = 0;
1392        if (libtrace->state != STATE_PAUSED) {
1393                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
1394                        "trace(%s) is not currently paused",
1395                              libtrace->uridata);
1396                return -1;
1397        }
1398
1399        assert(libtrace_parallel);
1400        assert(!libtrace->perpkt_thread_states[THREAD_RUNNING]);
1401
1402        /* Reset first packets */
1403        pthread_spin_lock(&libtrace->first_packets.lock);
1404        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1405                assert(!!libtrace->perpkt_threads[i].recorded_first == !!libtrace->first_packets.packets[i].packet);
1406                if (libtrace->first_packets.packets[i].packet) {
1407                        trace_destroy_packet(libtrace->first_packets.packets[i].packet);
1408                        libtrace->first_packets.packets[i].packet = NULL;
1409                        libtrace->first_packets.packets[i].tv.tv_sec = 0;
1410                        libtrace->first_packets.packets[i].tv.tv_usec = 0;
1411                        libtrace->first_packets.count--;
1412                        libtrace->perpkt_threads[i].recorded_first = false;
1413                }
1414        }
1415        assert(libtrace->first_packets.count == 0);
1416        libtrace->first_packets.first = 0;
1417        pthread_spin_unlock(&libtrace->first_packets.lock);
1418
1419        /* Reset delay */
1420        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1421                libtrace->perpkt_threads[i].tracetime_offset_usec = 0;
1422        }
1423
1424        /* Reset statistics */
1425        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1426                libtrace->perpkt_threads[i].accepted_packets = 0;
1427                libtrace->perpkt_threads[i].filtered_packets = 0;
1428        }
1429        libtrace->accepted_packets = 0;
1430        libtrace->filtered_packets = 0;
1431
1432        /* Update functions if requested */
1433        if(global_blob)
1434                libtrace->global_blob = global_blob;
1435
1436        if (per_packet_cbs) {
1437                if (libtrace->perpkt_cbs)
1438                        trace_destroy_callback_set(libtrace->perpkt_cbs);
1439                libtrace->perpkt_cbs = trace_create_callback_set();
1440                memcpy(libtrace->perpkt_cbs, per_packet_cbs, 
1441                                sizeof(libtrace_callback_set_t));
1442        }
1443
1444        if (reporter_cbs) {
1445                if (libtrace->reporter_cbs)
1446                        trace_destroy_callback_set(libtrace->reporter_cbs);
1447
1448                libtrace->reporter_cbs = trace_create_callback_set();
1449                memcpy(libtrace->reporter_cbs, reporter_cbs, 
1450                                sizeof(libtrace_callback_set_t));
1451        }
1452
1453        if (trace_is_parallel(libtrace)) {
1454                err = libtrace->format->pstart_input(libtrace);
1455        } else {
1456                if (libtrace->format->start_input) {
1457                        err = libtrace->format->start_input(libtrace);
1458                }
1459        }
1460
1461        if (err == 0) {
1462                libtrace->started = true;
1463                libtrace->startcount ++;
1464                libtrace_change_state(libtrace, STATE_RUNNING, false);
1465        }
1466        return err;
1467}
1468
1469/**
1470 * @return the number of CPU cores on the machine. -1 if unknown.
1471 */
1472SIMPLE_FUNCTION static int get_nb_cores() {
1473        int numCPU;
1474#ifdef _SC_NPROCESSORS_ONLN
1475        /* Most systems do this now */
1476        numCPU = sysconf(_SC_NPROCESSORS_ONLN);
1477
1478#else
1479        int mib[] = {CTL_HW, HW_AVAILCPU};
1480        size_t len = sizeof(numCPU);
1481
1482        /* get the number of CPUs from the system */
1483        sysctl(mib, 2, &numCPU, &len, NULL, 0);
1484#endif
1485        return numCPU <= 0 ? 1 : numCPU;
1486}
1487
1488/**
1489 * Verifies the configuration and sets default values for any values not
1490 * specified by the user.
1491 */
1492static void verify_configuration(libtrace_t *libtrace) {
1493
1494        if (libtrace->config.hasher_queue_size <= 0)
1495                libtrace->config.hasher_queue_size = 1000;
1496
1497        if (libtrace->config.perpkt_threads <= 0) {
1498                libtrace->perpkt_thread_count = get_nb_cores();
1499                if (libtrace->perpkt_thread_count <= 0)
1500                        // Lets just use one
1501                        libtrace->perpkt_thread_count = 1;
1502        } else {
1503                libtrace->perpkt_thread_count = libtrace->config.perpkt_threads;
1504        }
1505
1506        if (libtrace->config.reporter_thold <= 0)
1507                libtrace->config.reporter_thold = 100;
1508        if (libtrace->config.burst_size <= 0)
1509                libtrace->config.burst_size = 32;
1510        if (libtrace->config.thread_cache_size <= 0)
1511                libtrace->config.thread_cache_size = 64;
1512        if (libtrace->config.cache_size <= 0)
1513                libtrace->config.cache_size = (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count;
1514
1515        if (libtrace->config.cache_size <
1516                (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count)
1517                fprintf(stderr, "WARNING deadlocks may occur and extra memory allocating buffer sizes (packet_freelist_size) mismatched\n");
1518
1519        if (libtrace->combiner.initialise == NULL && libtrace->combiner.publish == NULL)
1520                libtrace->combiner = combiner_unordered;
1521
1522        /* Figure out if we are using a dedicated hasher thread? */
1523        if (libtrace->hasher && libtrace->perpkt_thread_count > 1) {
1524                libtrace->hasher_thread.type = THREAD_HASHER;
1525        }
1526}
1527
1528/**
1529 * Starts a libtrace_thread, including allocating memory for messaging.
1530 * Threads are expected to wait until the libtrace look is released.
1531 * Hence why we don't init structures until later.
1532 *
1533 * @param trace The trace the thread is associated with
1534 * @param t The thread that is filled when the thread is started
1535 * @param type The type of thread
1536 * @param start_routine The entry location of the thread
1537 * @param perpkt_num The perpkt thread number (should be set -1 if not perpkt)
1538 * @param name For debugging purposes set the threads name (Optional)
1539 *
1540 * @return 0 on success or -1 upon error in which case the libtrace error is set.
1541 *         In this situation the thread structure is zeroed.
1542 */
1543static int trace_start_thread(libtrace_t *trace,
1544                       libtrace_thread_t *t,
1545                       enum thread_types type,
1546                       void *(*start_routine) (void *),
1547                       int perpkt_num,
1548                       const char *name) {
1549#ifdef __linux__
1550        cpu_set_t cpus;
1551        int i;
1552#endif
1553        int ret;
1554        assert(t->type == THREAD_EMPTY);
1555        t->trace = trace;
1556        t->ret = NULL;
1557        t->user_data = NULL;
1558        t->type = type;
1559        t->state = THREAD_RUNNING;
1560
1561        assert(name);
1562
1563#ifdef __linux__
1564        CPU_ZERO(&cpus);
1565        for (i = 0; i < get_nb_cores(); i++)
1566                CPU_SET(i, &cpus);
1567
1568        ret = pthread_create(&t->tid, NULL, start_routine, (void *) trace);
1569        if( ret == 0 ) {
1570                ret = pthread_setaffinity_np(t->tid, sizeof(cpus), &cpus);
1571        }
1572
1573#else
1574        ret = pthread_create(&t->tid, NULL, start_routine, (void *) trace);
1575#endif
1576        if (ret != 0) {
1577                libtrace_zero_thread(t);
1578                trace_set_err(trace, ret, "Failed to create a thread of type=%d\n", type);
1579                return -1;
1580        }
1581        libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
1582        if (trace_has_dedicated_hasher(trace) && type == THREAD_PERPKT) {
1583                libtrace_ringbuffer_init(&t->rbuffer,
1584                                         trace->config.hasher_queue_size,
1585                                         trace->config.hasher_polling?
1586                                                 LIBTRACE_RINGBUFFER_POLLING:
1587                                                 LIBTRACE_RINGBUFFER_BLOCKING);
1588        }
1589#if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__)
1590        if(name)
1591                pthread_setname_np(t->tid, name);
1592#endif
1593        t->perpkt_num = perpkt_num;
1594        return 0;
1595}
1596
1597/** Parses the environment variable LIBTRACE_CONF into the supplied
1598 * configuration structure.
1599 *
1600 * @param[in,out] libtrace The trace from which we determine the URI and set
1601 * the configuration.
1602 *
1603 * We search for 3 environment variables and apply them to the config in the
1604 * following order. Such that the first has the lowest priority.
1605 *
1606 * 1. LIBTRACE_CONF, The global environment configuration
1607 * 2. LIBTRACE_CONF_<FORMAT>, Applied to a given format
1608 * 3. LIBTRACE_CONF_<FORMAT_URI>, Applied the specified trace
1609 *
1610 * E.g.
1611 * - int:eth0 would match LIBTRACE_CONF, LIBTRACE_CONF_INT, LIBTRACE_CONF_INT_ETH0
1612 * - dag:/dev/dag0,0 would match LIBTRACE_CONF, LIBTRACE_CONF_DAG, LIBTRACE_CONF_DAG__DEV_DAG0_0
1613 * - test.erf would match LIBTRACE_CONF, LIBTRACE_CONF_ERF, LIBTRACE_CONF_ERF_TEST_ERF
1614 *
1615 * @note All environment variables names MUST only contian
1616 * [A-Z], [0-9] and [_] (underscore) and not start with a number. Any characters
1617 * outside of this range should be captilised if possible or replaced with an
1618 * underscore.
1619 */
1620static void parse_env_config (libtrace_t *libtrace) {
1621        char env_name[1024] = "LIBTRACE_CONF_";
1622        size_t len = strlen(env_name);
1623        size_t mark = 0;
1624        size_t i;
1625        char * env;
1626
1627        /* Make our compound string */
1628        strncpy(&env_name[len], libtrace->format->name, sizeof(env_name) - len);
1629        len += strlen(libtrace->format->name);
1630        strncpy(&env_name[len], ":", sizeof(env_name) - len);
1631        len += 1;
1632        strncpy(&env_name[len], libtrace->uridata, sizeof(env_name) - len);
1633
1634        /* env names are allowed to be A-Z (CAPS) 0-9 and _ */
1635        for (i = 0; env_name[i] != 0; ++i) {
1636                env_name[i] = toupper(env_name[i]);
1637                if(env_name[i] == ':') {
1638                        mark = i;
1639                }
1640                if (!( (env_name[i] >= 'A' && env_name[i] <= 'Z') ||
1641                       (env_name[i] >= '0' && env_name[i] <= '9') )) {
1642                        env_name[i] = '_';
1643                }
1644        }
1645
1646        /* First apply global env settings LIBTRACE_CONF */
1647        env = getenv("LIBTRACE_CONF");
1648        if (env)
1649        {
1650                printf("Got env %s", env);
1651                trace_set_configuration(libtrace, env);
1652        }
1653
1654        /* Then format settings LIBTRACE_CONF_<FORMAT> */
1655        if (mark != 0) {
1656                env_name[mark] = 0;
1657                env = getenv(env_name);
1658                if (env) {
1659                        trace_set_configuration(libtrace, env);
1660                }
1661                env_name[mark] = '_';
1662        }
1663
1664        /* Finally this specific trace LIBTRACE_CONF_<FORMAT_URI> */
1665        env = getenv(env_name);
1666        if (env) {
1667                trace_set_configuration(libtrace, env);
1668        }
1669}
1670
1671DLLEXPORT bool trace_is_parallel(libtrace_t * libtrace) {
1672        if (libtrace->state == STATE_NEW)
1673                return trace_supports_parallel(libtrace);
1674        return libtrace->pread == trace_pread_packet_wrapper;
1675}
1676
1677DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob,
1678                           libtrace_callback_set_t *per_packet_cbs,
1679                           libtrace_callback_set_t *reporter_cbs) {
1680        int i;
1681        int ret = -1;
1682        char name[24];
1683        sigset_t sig_before, sig_block_all;
1684        assert(libtrace);
1685
1686        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1687        if (trace_is_err(libtrace)) {
1688                goto cleanup_none;
1689        }
1690
1691        if (libtrace->state == STATE_PAUSED) {
1692                ret = trace_prestart(libtrace, global_blob, per_packet_cbs, 
1693                                reporter_cbs);
1694                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1695                return ret;
1696        }
1697
1698        if (libtrace->state != STATE_NEW) {
1699                trace_set_err(libtrace, TRACE_ERR_BAD_STATE, "trace_pstart "
1700                              "should be called on a NEW or PAUSED trace but "
1701                              "instead was called from %s",
1702                              get_trace_state_name(libtrace->state));
1703                goto cleanup_none;
1704        }
1705
1706        /* Store the user defined things against the trace */
1707        libtrace->global_blob = global_blob;
1708
1709        /* Save a copy of the callbacks in case the user tries to change them
1710         * on us later */
1711        if (!per_packet_cbs) {
1712                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "trace_pstart "
1713                                "requires a non-NULL set of per packet "
1714                                "callbacks.");
1715                goto cleanup_none;
1716        }
1717
1718        if (per_packet_cbs->message_packet == NULL) {
1719                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "The per "
1720                                "packet callbacks must include a handler "
1721                                "for a packet. Please set this using "
1722                                "trace_set_packet_cb().");
1723                goto cleanup_none;
1724        }
1725
1726        libtrace->perpkt_cbs = trace_create_callback_set();
1727        memcpy(libtrace->perpkt_cbs, per_packet_cbs, sizeof(libtrace_callback_set_t));
1728       
1729        if (reporter_cbs) {
1730                libtrace->reporter_cbs = trace_create_callback_set();
1731                memcpy(libtrace->reporter_cbs, reporter_cbs, sizeof(libtrace_callback_set_t));
1732        }
1733
1734       
1735
1736
1737        /* And zero other fields */
1738        for (i = 0; i < THREAD_STATE_MAX; ++i) {
1739                libtrace->perpkt_thread_states[i] = 0;
1740        }
1741        libtrace->first_packets.first = 0;
1742        libtrace->first_packets.count = 0;
1743        libtrace->first_packets.packets = NULL;
1744        libtrace->perpkt_threads = NULL;
1745        /* Set a global which says we are using a parallel trace. This is
1746         * for backwards compatibility due to changes when destroying packets */
1747        libtrace_parallel = 1;
1748
1749        /* Parses configuration passed through environment variables */
1750        parse_env_config(libtrace);
1751        verify_configuration(libtrace);
1752
1753        ret = -1;
1754        /* Try start the format - we prefer parallel over single threaded, as
1755         * these formats should support messages better */
1756
1757        if (trace_supports_parallel(libtrace) &&
1758            !trace_has_dedicated_hasher(libtrace)) {
1759                ret = libtrace->format->pstart_input(libtrace);
1760                libtrace->pread = trace_pread_packet_wrapper;
1761        }
1762        if (ret != 0) {
1763                if (libtrace->format->start_input) {
1764                        ret = libtrace->format->start_input(libtrace);
1765                }
1766                if (libtrace->perpkt_thread_count > 1) {
1767                        libtrace->pread = trace_pread_packet_first_in_first_served;
1768                        /* Don't wait for a burst of packets if the format is
1769                         * live as this could block ring based formats and
1770                         * introduces delay. */
1771                        if (libtrace->format->info.live) {
1772                                libtrace->config.burst_size = 1;
1773                        }
1774                }
1775                else {
1776                        /* Use standard read_packet */
1777                        libtrace->pread = NULL;
1778                }
1779        }
1780
1781        if (ret < 0) {
1782                goto cleanup_none;
1783        }
1784
1785        /* --- Start all the threads we need --- */
1786        /* Disable signals because it is inherited by the threads we start */
1787        sigemptyset(&sig_block_all);
1788        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_block_all, &sig_before), == 0);
1789
1790        /* If we need a hasher thread start it
1791         * Special Case: If single threaded we don't need a hasher
1792         */
1793        if (trace_has_dedicated_hasher(libtrace)) {
1794                libtrace->hasher_thread.type = THREAD_EMPTY;
1795                ret = trace_start_thread(libtrace, &libtrace->hasher_thread,
1796                                   THREAD_HASHER, hasher_entry, -1,
1797                                   "hasher-thread");
1798                if (ret != 0)
1799                        goto cleanup_started;
1800                libtrace->pread = trace_pread_packet_hasher_thread;
1801        } else {
1802                libtrace->hasher_thread.type = THREAD_EMPTY;
1803        }
1804
1805        /* Start up our perpkt threads */
1806        libtrace->perpkt_threads = calloc(sizeof(libtrace_thread_t),
1807                                          libtrace->perpkt_thread_count);
1808        if (!libtrace->perpkt_threads) {
1809                trace_set_err(libtrace, errno, "trace_pstart "
1810                              "failed to allocate memory.");
1811                goto cleanup_threads;
1812        }
1813        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1814                snprintf(name, sizeof(name), "perpkt-%d", i);
1815                libtrace_zero_thread(&libtrace->perpkt_threads[i]);
1816                ret = trace_start_thread(libtrace, &libtrace->perpkt_threads[i],
1817                                   THREAD_PERPKT, perpkt_threads_entry, i,
1818                                   name);
1819                if (ret != 0)
1820                        goto cleanup_threads;
1821        }
1822
1823        /* Start the reporter thread */
1824        if (reporter_cbs) {
1825                if (libtrace->combiner.initialise)
1826                        libtrace->combiner.initialise(libtrace, &libtrace->combiner);
1827                ret = trace_start_thread(libtrace, &libtrace->reporter_thread,
1828                                   THREAD_REPORTER, reporter_entry, -1,
1829                                   "reporter_thread");
1830                if (ret != 0)
1831                        goto cleanup_threads;
1832        }
1833
1834        /* Start the keepalive thread */
1835        if (libtrace->config.tick_interval > 0) {
1836                ret = trace_start_thread(libtrace, &libtrace->keepalive_thread,
1837                                   THREAD_KEEPALIVE, keepalive_entry, -1,
1838                                   "keepalive_thread");
1839                if (ret != 0)
1840                        goto cleanup_threads;
1841        }
1842
1843        /* Init other data structures */
1844        libtrace->perpkt_thread_states[THREAD_RUNNING] = libtrace->perpkt_thread_count;
1845        ASSERT_RET(pthread_spin_init(&libtrace->first_packets.lock, 0), == 0);
1846        libtrace->first_packets.packets = calloc(libtrace->perpkt_thread_count,
1847                                                 sizeof(*libtrace->first_packets.packets));
1848        if (libtrace->first_packets.packets == NULL) {
1849                trace_set_err(libtrace, errno, "trace_pstart "
1850                              "failed to allocate memory.");
1851                goto cleanup_threads;
1852        }
1853
1854        if (libtrace_ocache_init(&libtrace->packet_freelist,
1855                             (void* (*)()) trace_create_packet,
1856                             (void (*)(void *))trace_destroy_packet,
1857                             libtrace->config.thread_cache_size,
1858                             libtrace->config.cache_size * 4,
1859                             libtrace->config.fixed_count) != 0) {
1860                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "trace_pstart "
1861                              "failed to allocate ocache.");
1862                goto cleanup_threads;
1863        }
1864
1865        /* Threads don't start */
1866        libtrace->started = true;
1867        libtrace->startcount ++;
1868        libtrace_change_state(libtrace, STATE_RUNNING, false);
1869
1870        ret = 0;
1871        goto success;
1872cleanup_threads:
1873        if (libtrace->first_packets.packets) {
1874                free(libtrace->first_packets.packets);
1875                libtrace->first_packets.packets = NULL;
1876        }
1877        libtrace_change_state(libtrace, STATE_ERROR, false);
1878        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1879        if (libtrace->hasher_thread.type == THREAD_HASHER) {
1880                pthread_join(libtrace->hasher_thread.tid, NULL);
1881                libtrace_zero_thread(&libtrace->hasher_thread);
1882        }
1883
1884        if (libtrace->perpkt_threads) {
1885                for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1886                        if (libtrace->perpkt_threads[i].type == THREAD_PERPKT) {
1887                                pthread_join(libtrace->perpkt_threads[i].tid, NULL);
1888                                libtrace_zero_thread(&libtrace->perpkt_threads[i]);
1889                        } else break;
1890                }
1891                free(libtrace->perpkt_threads);
1892                libtrace->perpkt_threads = NULL;
1893        }
1894
1895        if (libtrace->reporter_thread.type == THREAD_REPORTER) {
1896                pthread_join(libtrace->reporter_thread.tid, NULL);
1897                libtrace_zero_thread(&libtrace->reporter_thread);
1898        }
1899
1900        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
1901                pthread_join(libtrace->keepalive_thread.tid, NULL);
1902                libtrace_zero_thread(&libtrace->keepalive_thread);
1903        }
1904        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1905        libtrace_change_state(libtrace, STATE_NEW, false);
1906        assert(libtrace->perpkt_thread_states[THREAD_RUNNING] == 0);
1907        libtrace->perpkt_thread_states[THREAD_FINISHED] = 0;
1908cleanup_started:
1909        if (libtrace->pread == trace_pread_packet_wrapper) {
1910                if (libtrace->format->ppause_input)
1911                        libtrace->format->ppause_input(libtrace);
1912        } else {
1913                if (libtrace->format->pause_input)
1914                        libtrace->format->pause_input(libtrace);
1915        }
1916        ret = -1;
1917success:
1918        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_before, NULL), == 0);
1919cleanup_none:
1920        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1921        return ret;
1922}
1923
1924DLLEXPORT int trace_set_starting_cb(libtrace_callback_set_t *cbset,
1925                fn_cb_starting handler) {
1926        cbset->message_starting = handler;
1927        return 0;
1928}
1929
1930DLLEXPORT int trace_set_pausing_cb(libtrace_callback_set_t *cbset,
1931                fn_cb_dataless handler) {
1932        cbset->message_pausing = handler;
1933        return 0;
1934}
1935
1936DLLEXPORT int trace_set_resuming_cb(libtrace_callback_set_t *cbset,
1937                fn_cb_dataless handler) {
1938        cbset->message_resuming = handler;
1939        return 0;
1940}
1941
1942DLLEXPORT int trace_set_stopping_cb(libtrace_callback_set_t *cbset,
1943                fn_cb_dataless handler) {
1944        cbset->message_stopping = handler;
1945        return 0;
1946}
1947
1948DLLEXPORT int trace_set_packet_cb(libtrace_callback_set_t *cbset,
1949                fn_cb_packet handler) {
1950        cbset->message_packet = handler;
1951        return 0;
1952}
1953
1954DLLEXPORT int trace_set_first_packet_cb(libtrace_callback_set_t *cbset,
1955                fn_cb_first_packet handler) {
1956        cbset->message_first_packet = handler;
1957        return 0;
1958}
1959
1960DLLEXPORT int trace_set_tick_count_cb(libtrace_callback_set_t *cbset,
1961                fn_cb_tick handler) {
1962        cbset->message_tick_count = handler;
1963        return 0;
1964}
1965
1966DLLEXPORT int trace_set_tick_interval_cb(libtrace_callback_set_t *cbset,
1967                fn_cb_tick handler) {
1968        cbset->message_tick_interval = handler;
1969        return 0;
1970}
1971
1972DLLEXPORT int trace_set_result_cb(libtrace_callback_set_t *cbset,
1973                fn_cb_result handler) {
1974        cbset->message_result = handler;
1975        return 0;
1976}
1977
1978DLLEXPORT int trace_set_user_message_cb(libtrace_callback_set_t *cbset,
1979                fn_cb_usermessage handler) {
1980        cbset->message_user = handler;
1981        return 0;
1982}
1983
1984/*
1985 * Pauses a trace, this should only be called by the main thread
1986 * 1. Set started = false
1987 * 2. All perpkt threads are paused waiting on a condition var
1988 * 3. Then call ppause on the underlying format if found
1989 * 4. The traces state is paused
1990 *
1991 * Once done you should be able to modify the trace setup and call pstart again
1992 * TODO add support to change the number of threads.
1993 */
1994DLLEXPORT int trace_ppause(libtrace_t *libtrace)
1995{
1996        libtrace_thread_t *t;
1997        int i;
1998        assert(libtrace);
1999
2000        t = get_thread_table(libtrace);
2001        // Check state from within the lock if we are going to change it
2002        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
2003
2004        /* If we are already paused, just treat this as a NOOP */
2005        if (libtrace->state == STATE_PAUSED) {
2006                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2007                return 0;
2008        }
2009        if (!libtrace->started || libtrace->state != STATE_RUNNING) {
2010                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2011                trace_set_err(libtrace,TRACE_ERR_BAD_STATE, "You must call trace_start() before calling trace_ppause()");
2012                return -1;
2013        }
2014
2015        libtrace_change_state(libtrace, STATE_PAUSING, false);
2016        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2017
2018        // Special case handle the hasher thread case
2019        if (trace_has_dedicated_hasher(libtrace)) {
2020                if (libtrace->config.debug_state)
2021                        fprintf(stderr, "Hasher thread is running, asking it to pause ...");
2022                libtrace_message_t message = {0, {.uint64=0}, NULL};
2023                message.code = MESSAGE_DO_PAUSE;
2024                trace_message_thread(libtrace, &libtrace->hasher_thread, &message);
2025                // Wait for it to pause
2026                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
2027                while (libtrace->hasher_thread.state == THREAD_RUNNING) {
2028                        ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
2029                }
2030                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2031                if (libtrace->config.debug_state)
2032                        fprintf(stderr, " DONE\n");
2033        }
2034
2035        if (libtrace->config.debug_state)
2036                fprintf(stderr, "Asking perpkt threads to pause ...");
2037        // Stop threads, skip this one if it's a perpkt
2038        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2039                if (&libtrace->perpkt_threads[i] != t) {
2040                        libtrace_message_t message = {0, {.uint64=0}, NULL};
2041                        message.code = MESSAGE_DO_PAUSE;
2042                        ASSERT_RET(trace_message_thread(libtrace, &libtrace->perpkt_threads[i], &message), != -1);
2043                        if(trace_has_dedicated_hasher(libtrace)) {
2044                                // The hasher has stopped and other threads have messages waiting therefore
2045                                // If the queues are empty the other threads would have no data
2046                                // So send some message packets to simply ask the threads to check
2047                                // We are the only writer since hasher has paused
2048                                libtrace_packet_t *pkt;
2049                                libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &pkt, 1, 1);
2050                                pkt->error = READ_MESSAGE;
2051                                libtrace_ringbuffer_write(&libtrace->perpkt_threads[i].rbuffer, pkt);
2052                        }
2053                } else {
2054                        fprintf(stderr, "Mapper threads should not be used to pause a trace this could cause any number of problems!!\n");
2055                }
2056        }
2057
2058        if (t) {
2059                // A perpkt is doing the pausing, interesting, fake an extra thread paused
2060                // We rely on the user to *not* return before starting the trace again
2061                thread_change_state(libtrace, t, THREAD_PAUSED, true);
2062        }
2063
2064        // Wait for all threads to pause
2065        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
2066        while(libtrace->perpkt_thread_states[THREAD_RUNNING]) {
2067                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
2068        }
2069        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2070
2071        if (libtrace->config.debug_state)
2072                fprintf(stderr, " DONE\n");
2073
2074        // Deal with the reporter
2075        if (trace_has_reporter(libtrace)) {
2076                if (libtrace->config.debug_state)
2077                        fprintf(stderr, "Reporter thread is running, asking it to pause ...");
2078                if (pthread_equal(pthread_self(), libtrace->reporter_thread.tid)) {
2079                        libtrace->combiner.pause(libtrace, &libtrace->combiner);
2080                        thread_change_state(libtrace, &libtrace->reporter_thread, THREAD_PAUSED, true);
2081               
2082                } else {
2083                        libtrace_message_t message = {0, {.uint64=0}, NULL};
2084                        message.code = MESSAGE_DO_PAUSE;
2085                        trace_message_thread(libtrace, &libtrace->reporter_thread, &message);
2086                        // Wait for it to pause
2087                        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
2088                        while (libtrace->reporter_thread.state == THREAD_RUNNING) {
2089                                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
2090                        }
2091                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2092                }
2093                if (libtrace->config.debug_state)
2094                        fprintf(stderr, " DONE\n");
2095        }
2096
2097        /* Cache values before we pause */
2098        if (libtrace->stats == NULL)
2099                libtrace->stats = trace_create_statistics();
2100        // Save the statistics against the trace
2101        trace_get_statistics(libtrace, NULL);
2102        if (trace_is_parallel(libtrace)) {
2103                libtrace->started = false;
2104                if (libtrace->format->ppause_input)
2105                        libtrace->format->ppause_input(libtrace);
2106                // TODO What happens if we don't have pause input??
2107        } else {
2108                int err;
2109                err = trace_pause(libtrace);
2110                // We should handle this a bit better
2111                if (err)
2112                        return err;
2113        }
2114
2115        // Only set as paused after the pause has been called on the trace
2116        libtrace_change_state(libtrace, STATE_PAUSED, true);
2117        return 0;
2118}
2119
2120/**
2121 * Stop trace finish prematurely as though it meet an EOF
2122 * This should only be called by the main thread
2123 * 1. Calls ppause
2124 * 2. Sends a message asking for threads to finish
2125 * 3. Releases threads which will pause
2126 */
2127DLLEXPORT int trace_pstop(libtrace_t *libtrace)
2128{
2129        int i, err;
2130        libtrace_message_t message = {0, {.uint64=0}, NULL};
2131        assert(libtrace);
2132
2133        // Ensure all threads have paused and the underlying trace format has
2134        // been closed and all packets associated are cleaned up
2135        // Pause will do any state checks for us
2136        err = trace_ppause(libtrace);
2137        if (err)
2138                return err;
2139
2140        // Now send a message asking the threads to stop
2141        // This will be retrieved before trying to read another packet
2142
2143        message.code = MESSAGE_DO_STOP;
2144        trace_message_perpkts(libtrace, &message);
2145        if (trace_has_dedicated_hasher(libtrace))
2146                trace_message_thread(libtrace, &libtrace->hasher_thread, &message);
2147
2148        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2149                trace_message_thread(libtrace, &libtrace->perpkt_threads[i], &message);
2150        }
2151
2152        /* Now release the threads and let them stop - when the threads finish
2153         * the state will be set to finished */
2154        libtrace_change_state(libtrace, STATE_FINISHING, true);
2155        return 0;
2156}
2157
2158DLLEXPORT int trace_set_hasher(libtrace_t *trace, enum hasher_types type, fn_hasher hasher, void *data) {
2159        int ret = -1;
2160        if ((type == HASHER_CUSTOM && !hasher) || (type == HASHER_BALANCE && hasher)) {
2161                return -1;
2162        }
2163
2164        // Save the requirements
2165        trace->hasher_type = type;
2166        if (hasher) {
2167                if (trace->hasher_owner == HASH_OWNED_LIBTRACE) {
2168                        if (trace->hasher_data) {
2169                                free(trace->hasher_data);
2170                        }
2171                }
2172                trace->hasher = hasher;
2173                trace->hasher_data = data;
2174                trace->hasher_owner = HASH_OWNED_EXTERNAL;
2175        } else {
2176                trace->hasher = NULL;
2177                trace->hasher_data = NULL;
2178                trace->hasher_owner = HASH_OWNED_LIBTRACE;
2179        }
2180
2181        // Try push this to hardware - NOTE hardware could do custom if
2182        // there is a more efficient way to apply it, in this case
2183        // it will simply grab the function out of libtrace_t
2184        if (trace_supports_parallel(trace) && trace->format->config_input)
2185                ret = trace->format->config_input(trace, TRACE_OPTION_HASHER, &type);
2186
2187        if (ret == -1) {
2188                /* We have to deal with this ourself */
2189                if (!hasher) {
2190                        switch (type)
2191                        {
2192                                case HASHER_CUSTOM:
2193                                case HASHER_BALANCE:
2194                                        return 0;
2195                                case HASHER_BIDIRECTIONAL:
2196                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
2197                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
2198                                        toeplitz_init_config(trace->hasher_data, 1);
2199                                        return 0;
2200                                case HASHER_UNIDIRECTIONAL:
2201                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
2202                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
2203                                        toeplitz_init_config(trace->hasher_data, 0);
2204                                        return 0;
2205                        }
2206                        return -1;
2207                }
2208        } else {
2209                /* If the hasher is hardware we zero out the hasher and hasher
2210                 * data fields - only if we need a hasher do we do this */
2211                trace->hasher = NULL;
2212                trace->hasher_data = NULL;
2213        }
2214
2215        return 0;
2216}
2217
2218// Waits for all threads to finish
2219DLLEXPORT void trace_join(libtrace_t *libtrace) {
2220        int i;
2221
2222        /* Firstly wait for the perpkt threads to finish, since these are
2223         * user controlled */
2224        for (i=0; i< libtrace->perpkt_thread_count; i++) {
2225                ASSERT_RET(pthread_join(libtrace->perpkt_threads[i].tid, NULL), == 0);
2226                // So we must do our best effort to empty the queue - so
2227                // the producer (or any other threads) don't block.
2228                libtrace_packet_t * packet;
2229                assert(libtrace->perpkt_threads[i].state == THREAD_FINISHED);
2230                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
2231                        if (packet) // This could be NULL iff the perpkt finishes early
2232                                trace_destroy_packet(packet);
2233        }
2234
2235        /* Now the hasher */
2236        if (trace_has_dedicated_hasher(libtrace)) {
2237                pthread_join(libtrace->hasher_thread.tid, NULL);
2238                assert(libtrace->hasher_thread.state == THREAD_FINISHED);
2239        }
2240
2241        // Now that everything is finished nothing can be touching our
2242        // buffers so clean them up
2243        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2244                // Its possible 1 packet got added by the reporter (or 1 per any other thread) since we cleaned up
2245                // if they lost timeslice before-during a write
2246                libtrace_packet_t * packet;
2247                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
2248                        trace_destroy_packet(packet);
2249                if (trace_has_dedicated_hasher(libtrace)) {
2250                        assert(libtrace_ringbuffer_is_empty(&libtrace->perpkt_threads[i].rbuffer));
2251                        libtrace_ringbuffer_destroy(&libtrace->perpkt_threads[i].rbuffer);
2252                }
2253                // Cannot destroy vector yet, this happens with trace_destroy
2254        }
2255
2256        if (trace_has_reporter(libtrace)) {
2257                pthread_join(libtrace->reporter_thread.tid, NULL);
2258                assert(libtrace->reporter_thread.state == THREAD_FINISHED);
2259        }
2260
2261        // Wait for the tick (keepalive) thread if it has been started
2262        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
2263                libtrace_message_t msg = {0, {.uint64=0}, NULL};
2264                msg.code = MESSAGE_DO_STOP;
2265                trace_message_thread(libtrace, &libtrace->keepalive_thread, &msg);
2266                pthread_join(libtrace->keepalive_thread.tid, NULL);
2267        }
2268
2269        libtrace_change_state(libtrace, STATE_JOINED, true);
2270        print_memory_stats();
2271}
2272
2273DLLEXPORT int libtrace_thread_get_message_count(libtrace_t * libtrace,
2274                                                libtrace_thread_t *t)
2275{
2276        int ret;
2277        if (t == NULL)
2278                t = get_thread_descriptor(libtrace);
2279        if (t == NULL)
2280                return -1;
2281        ret = libtrace_message_queue_count(&t->messages);
2282        return ret < 0 ? 0 : ret;
2283}
2284
2285DLLEXPORT int libtrace_thread_get_message(libtrace_t * libtrace,
2286                                          libtrace_thread_t *t,
2287                                          libtrace_message_t * message)
2288{
2289        int ret;
2290        if (t == NULL)
2291                t = get_thread_descriptor(libtrace);
2292        if (t == NULL)
2293                return -1;
2294        ret = libtrace_message_queue_get(&t->messages, message);
2295        return ret < 0 ? 0 : ret;
2296}
2297
2298DLLEXPORT int libtrace_thread_try_get_message(libtrace_t * libtrace,
2299                                              libtrace_thread_t *t,
2300                                              libtrace_message_t * message)
2301{
2302        if (t == NULL)
2303                t = get_thread_descriptor(libtrace);
2304        if (t == NULL)
2305                return -1;
2306        if (libtrace_message_queue_try_get(&t->messages, message) != LIBTRACE_MQ_FAILED)
2307                return 0;
2308        else
2309                return -1;
2310}
2311
2312DLLEXPORT int trace_message_thread(libtrace_t * libtrace, libtrace_thread_t *t, libtrace_message_t * message)
2313{
2314        int ret;
2315        if (!message->sender)
2316                message->sender = get_thread_descriptor(libtrace);
2317
2318        ret = libtrace_message_queue_put(&t->messages, message);
2319        return ret < 0 ? 0 : ret;
2320}
2321
2322DLLEXPORT int trace_message_reporter(libtrace_t * libtrace, libtrace_message_t * message)
2323{
2324        if (!trace_has_reporter(libtrace) ||
2325            !(libtrace->reporter_thread.state == THREAD_RUNNING
2326              || libtrace->reporter_thread.state == THREAD_PAUSED))
2327                return -1;
2328
2329        return trace_message_thread(libtrace, &libtrace->reporter_thread, message);
2330}
2331
2332DLLEXPORT int trace_post_reporter(libtrace_t *libtrace)
2333{
2334        libtrace_message_t message = {0, {.uint64=0}, NULL};
2335        message.code = MESSAGE_POST_REPORTER;
2336        return trace_message_reporter(libtrace, (void *) &message);
2337}
2338
2339DLLEXPORT int trace_message_perpkts(libtrace_t * libtrace, libtrace_message_t * message)
2340{
2341        int i;
2342        int missed = 0;
2343        if (message->sender == NULL)
2344                message->sender = get_thread_descriptor(libtrace);
2345        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2346                if (libtrace->perpkt_threads[i].state == THREAD_RUNNING ||
2347                    libtrace->perpkt_threads[i].state == THREAD_PAUSED) {
2348                        libtrace_message_queue_put(&libtrace->perpkt_threads[i].messages, message);
2349                } else {
2350                        missed += 1;
2351                }
2352        }
2353        return -missed;
2354}
2355
2356/**
2357 * Publishes a result to the reduce queue
2358 * Should only be called by a perpkt thread, i.e. from a perpkt handler
2359 */
2360DLLEXPORT void trace_publish_result(libtrace_t *libtrace, libtrace_thread_t *t, uint64_t key, libtrace_generic_t value, int type) {
2361        libtrace_result_t res;
2362        res.type = type;
2363        res.key = key;
2364        res.value = value;
2365        assert(libtrace->combiner.publish);
2366        libtrace->combiner.publish(libtrace, t->perpkt_num, &libtrace->combiner, &res);
2367        return;
2368}
2369
2370DLLEXPORT void trace_set_combiner(libtrace_t *trace, const libtrace_combine_t *combiner, libtrace_generic_t config){
2371        if (combiner) {
2372                trace->combiner = *combiner;
2373                trace->combiner.configuration = config;
2374        } else {
2375                // No combiner, so don't try use it
2376                memset(&trace->combiner, 0, sizeof(trace->combiner));
2377        }
2378}
2379
2380DLLEXPORT uint64_t trace_packet_get_order(libtrace_packet_t * packet) {
2381        return packet->order;
2382}
2383
2384DLLEXPORT uint64_t trace_packet_get_hash(libtrace_packet_t * packet) {
2385        return packet->hash;
2386}
2387
2388DLLEXPORT void trace_packet_set_order(libtrace_packet_t * packet, uint64_t order) {
2389        packet->order = order;
2390}
2391
2392DLLEXPORT void trace_packet_set_hash(libtrace_packet_t * packet, uint64_t hash) {
2393        packet->hash = hash;
2394}
2395
2396DLLEXPORT bool trace_has_finished(libtrace_t * libtrace) {
2397        return libtrace->state == STATE_FINISHED || libtrace->state == STATE_JOINED;
2398}
2399
2400/**
2401 * @return True if the trace is not running such that it can be configured
2402 */
2403static inline bool trace_is_configurable(libtrace_t *trace) {
2404        return trace->state == STATE_NEW ||
2405                        trace->state == STATE_PAUSED;
2406}
2407
2408DLLEXPORT int trace_set_perpkt_threads(libtrace_t *trace, int nb) {
2409        // Only supported on new traces not paused traces
2410        if (trace->state != STATE_NEW) return -1;
2411
2412        /* TODO consider allowing an offset from the total number of cores i.e.
2413         * -1 reserve 1 core */
2414        if (nb >= 0) {
2415                trace->config.perpkt_threads = nb;
2416                return 0;
2417        } else {
2418                return -1;
2419        }
2420}
2421
2422DLLEXPORT int trace_set_tick_interval(libtrace_t *trace, size_t millisec) {
2423        if (!trace_is_configurable(trace)) return -1;
2424
2425        trace->config.tick_interval = millisec;
2426        return 0;
2427}
2428
2429DLLEXPORT int trace_set_tick_count(libtrace_t *trace, size_t count) {
2430        if (!trace_is_configurable(trace)) return -1;
2431
2432        trace->config.tick_count = count;
2433        return 0;
2434}
2435
2436DLLEXPORT int trace_set_tracetime(libtrace_t *trace, bool tracetime) {
2437        if (!trace_is_configurable(trace)) return -1;
2438
2439        trace->tracetime = tracetime;
2440        return 0;
2441}
2442
2443DLLEXPORT int trace_set_cache_size(libtrace_t *trace, size_t size) {
2444        if (!trace_is_configurable(trace)) return -1;
2445
2446        trace->config.cache_size = size;
2447        return 0;
2448}
2449
2450DLLEXPORT int trace_set_thread_cache_size(libtrace_t *trace, size_t size) {
2451        if (!trace_is_configurable(trace)) return -1;
2452
2453        trace->config.thread_cache_size = size;
2454        return 0;
2455}
2456
2457DLLEXPORT int trace_set_fixed_count(libtrace_t *trace, bool fixed) {
2458        if (!trace_is_configurable(trace)) return -1;
2459
2460        trace->config.fixed_count = fixed;
2461        return 0;
2462}
2463
2464DLLEXPORT int trace_set_burst_size(libtrace_t *trace, size_t size) {
2465        if (!trace_is_configurable(trace)) return -1;
2466
2467        trace->config.burst_size = size;
2468        return 0;
2469}
2470
2471DLLEXPORT int trace_set_hasher_queue_size(libtrace_t *trace, size_t size) {
2472        if (!trace_is_configurable(trace)) return -1;
2473
2474        trace->config.hasher_queue_size = size;
2475        return 0;
2476}
2477
2478DLLEXPORT int trace_set_hasher_polling(libtrace_t *trace, bool polling) {
2479        if (!trace_is_configurable(trace)) return -1;
2480
2481        trace->config.hasher_polling = polling;
2482        return 0;
2483}
2484
2485DLLEXPORT int trace_set_reporter_polling(libtrace_t *trace, bool polling) {
2486        if (!trace_is_configurable(trace)) return -1;
2487
2488        trace->config.reporter_polling = polling;
2489        return 0;
2490}
2491
2492DLLEXPORT int trace_set_reporter_thold(libtrace_t *trace, size_t thold) {
2493        if (!trace_is_configurable(trace)) return -1;
2494
2495        trace->config.reporter_thold = thold;
2496        return 0;
2497}
2498
2499DLLEXPORT int trace_set_debug_state(libtrace_t *trace, bool debug_state) {
2500        if (!trace_is_configurable(trace)) return -1;
2501
2502        trace->config.debug_state = debug_state;
2503        return 0;
2504}
2505
2506static bool config_bool_parse(char *value, size_t nvalue) {
2507        if (strncmp(value, "true", nvalue) == 0)
2508                return true;
2509        else if (strncmp(value, "false", nvalue) == 0)
2510                return false;
2511        else
2512                return strtoll(value, NULL, 10) != 0;
2513}
2514
2515/* Note update documentation on trace_set_configuration */
2516static void config_string(struct user_configuration *uc, char *key, size_t nkey, char *value, size_t nvalue) {
2517        assert(key);
2518        assert(value);
2519        assert(uc);
2520        if (strncmp(key, "cache_size", nkey) == 0
2521            || strncmp(key, "cs", nkey) == 0) {
2522                uc->cache_size = strtoll(value, NULL, 10);
2523        } else if (strncmp(key, "thread_cache_size", nkey) == 0
2524                   || strncmp(key, "tcs", nkey) == 0) {
2525                uc->thread_cache_size = strtoll(value, NULL, 10);
2526        } else if (strncmp(key, "fixed_count", nkey) == 0
2527                   || strncmp(key, "fc", nkey) == 0) {
2528                uc->fixed_count = config_bool_parse(value, nvalue);
2529        } else if (strncmp(key, "burst_size", nkey) == 0
2530                   || strncmp(key, "bs", nkey) == 0) {
2531                uc->burst_size = strtoll(value, NULL, 10);
2532        } else if (strncmp(key, "tick_interval", nkey) == 0
2533                   || strncmp(key, "ti", nkey) == 0) {
2534                uc->tick_interval = strtoll(value, NULL, 10);
2535        } else if (strncmp(key, "tick_count", nkey) == 0
2536                   || strncmp(key, "tc", nkey) == 0) {
2537                uc->tick_count = strtoll(value, NULL, 10);
2538        } else if (strncmp(key, "perpkt_threads", nkey) == 0
2539                   || strncmp(key, "pt", nkey) == 0) {
2540                uc->perpkt_threads = strtoll(value, NULL, 10);
2541        } else if (strncmp(key, "hasher_queue_size", nkey) == 0
2542                   || strncmp(key, "hqs", nkey) == 0) {
2543                uc->hasher_queue_size = strtoll(value, NULL, 10);
2544        } else if (strncmp(key, "hasher_polling", nkey) == 0
2545                   || strncmp(key, "hp", nkey) == 0) {
2546                uc->hasher_polling = config_bool_parse(value, nvalue);
2547        } else if (strncmp(key, "reporter_polling", nkey) == 0
2548                   || strncmp(key, "rp", nkey) == 0) {
2549                uc->reporter_polling = config_bool_parse(value, nvalue);
2550        } else if (strncmp(key, "reporter_thold", nkey) == 0
2551                   || strncmp(key, "rt", nkey) == 0) {
2552                uc->reporter_thold = strtoll(value, NULL, 10);
2553        } else if (strncmp(key, "debug_state", nkey) == 0
2554                   || strncmp(key, "ds", nkey) == 0) {
2555                uc->debug_state = config_bool_parse(value, nvalue);
2556        } else {
2557                fprintf(stderr, "No matching option %s(=%s), ignoring\n", key, value);
2558        }
2559}
2560
2561DLLEXPORT int trace_set_configuration(libtrace_t *trace, const char *str) {
2562        char *pch;
2563        char key[100];
2564        char value[100];
2565        char *dup;
2566        assert(str);
2567        assert(trace);
2568
2569        if (!trace_is_configurable(trace)) return -1;
2570
2571        dup = strdup(str);
2572        pch = strtok (dup," ,.-");
2573        while (pch != NULL)
2574        {
2575                if (sscanf(pch, "%99[^=]=%99s", key, value) == 2) {
2576                        config_string(&trace->config, key, sizeof(key), value, sizeof(value));
2577                } else {
2578                        fprintf(stderr, "Error parsing option %s\n", pch);
2579                }
2580                pch = strtok (NULL," ,.-");
2581        }
2582        free(dup);
2583
2584        return 0;
2585}
2586
2587DLLEXPORT int trace_set_configuration_file(libtrace_t *trace, FILE *file) {
2588        char line[1024];
2589        if (!trace_is_configurable(trace)) return -1;
2590
2591        while (fgets(line, sizeof(line), file) != NULL)
2592        {
2593                trace_set_configuration(trace, line);
2594        }
2595
2596        if(ferror(file))
2597                return -1;
2598        else
2599                return 0;
2600}
2601
2602DLLEXPORT void trace_free_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
2603        assert(packet);
2604        /* Always release any resources this might be holding */
2605        trace_fin_packet(packet);
2606        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packet, 1, 1);
2607}
2608
2609DLLEXPORT void trace_increment_packet_refcount(libtrace_packet_t *packet) {
2610        pthread_mutex_lock(&(packet->ref_lock));
2611        if (packet->refcount < 0) {
2612                packet->refcount = 1;
2613        } else {
2614                packet->refcount ++;
2615        }
2616        pthread_mutex_unlock(&(packet->ref_lock));
2617}
2618
2619DLLEXPORT void trace_decrement_packet_refcount(libtrace_packet_t *packet) {
2620        pthread_mutex_lock(&(packet->ref_lock));
2621        packet->refcount --;
2622
2623        if (packet->refcount <= 0) {
2624                trace_free_packet(packet->trace, packet);
2625        }
2626        pthread_mutex_unlock(&(packet->ref_lock));
2627}
2628
2629
2630DLLEXPORT libtrace_info_t *trace_get_information(libtrace_t * libtrace) {
2631        if (libtrace->format)
2632                return &libtrace->format->info;
2633        else
2634                return NULL;
2635}
Note: See TracBrowser for help on using the repository browser.