source: lib/trace_parallel.c @ 84dbd79

ringperformance
Last change on this file since 84dbd79 was 84dbd79, checked in by Shane Alcock <salcock@…>, 2 years ago

Improve performance by removing wasteful sanity checking in pread

While these sanity checks would normally be a very good idea,
they actually end up consuming a lot of CPU cycles at high
packet rates and could reduce capture and processing throughput.

  • the sanity checks mostly protect against things that should *never* be touched (i.e setting packet buffer types to invalid values etc) by anyone other than libtrace itself. I know that we can't trust users to not do stupid stuff with the pointers we give them, but we also can't be wasting the CPU time of sensible users by constantly checking that they are not doing something weird either.
  • often, a pread will not use all of the packets in the array so we're frequently checking the integrity of packets that have not been used since the last time we checked them. Again, this is a waste of time that really starts to add up at high packet rates.

I accept that removing these checks might not be to everyone's
liking. I think there is scope to put them back in at some
later point, probably wrapped inside some preprocessor
conditional which will allow people to choose whether to build
"safe" libtrace vs "high-performance" libtrace.

  • Property mode set to 100644
File size: 84.7 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27#define _GNU_SOURCE
28#include "common.h"
29#include "config.h"
30#include <assert.h>
31#include <errno.h>
32#include <fcntl.h>
33#include <stdio.h>
34#include <stdlib.h>
35#include <string.h>
36#include <sys/stat.h>
37#include <sys/types.h>
38#ifndef WIN32
39#include <sys/socket.h>
40#endif
41#include <stdarg.h>
42#include <sys/param.h>
43
44#ifdef HAVE_LIMITS_H
45#  include <limits.h>
46#endif
47
48#ifdef HAVE_SYS_LIMITS_H
49#  include <sys/limits.h>
50#endif
51
52#ifdef HAVE_NET_IF_ARP_H
53#  include <net/if_arp.h>
54#endif
55
56#ifdef HAVE_NET_IF_H
57#  include <net/if.h>
58#endif
59
60#ifdef HAVE_NETINET_IN_H
61#  include <netinet/in.h>
62#endif
63
64#ifdef HAVE_NET_ETHERNET_H
65#  include <net/ethernet.h>
66#endif
67
68#ifdef HAVE_NETINET_IF_ETHER_H
69#  include <netinet/if_ether.h>
70#endif
71
72#include <time.h>
73#ifdef WIN32
74#include <sys/timeb.h>
75#endif
76
77#include "libtrace.h"
78#include "libtrace_parallel.h"
79
80#ifdef HAVE_NET_BPF_H
81#  include <net/bpf.h>
82#else
83#  ifdef HAVE_PCAP_BPF_H
84#    include <pcap-bpf.h>
85#  endif
86#endif
87
88
89#include "libtrace_int.h"
90#include "format_helper.h"
91#include "rt_protocol.h"
92#include "hash_toeplitz.h"
93
94#include <pthread.h>
95#include <signal.h>
96#include <unistd.h>
97#include <ctype.h>
98
99static inline int delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t);
100extern int libtrace_parallel;
101
102struct mem_stats {
103        struct memfail {
104           uint64_t cache_hit;
105           uint64_t ring_hit;
106           uint64_t miss;
107           uint64_t recycled;
108        } readbulk, read, write, writebulk;
109};
110
111
112#ifdef ENABLE_MEM_STATS
113// Grrr gcc wants this spelt out
114__thread struct mem_stats mem_hits = {{0},{0},{0},{0}};
115
116
117static void print_memory_stats() {
118        uint64_t total;
119#if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__)
120        char t_name[50];
121        pthread_getname_np(pthread_self(), t_name, sizeof(t_name));
122
123        fprintf(stderr, "Thread ID#%d - %s\n", (int) pthread_self(), t_name);
124#else
125        fprintf(stderr, "Thread ID#%d\n", (int) pthread_self());
126#endif
127
128        total = mem_hits.read.cache_hit + mem_hits.read.ring_hit + mem_hits.read.miss;
129        if (total) {
130                fprintf(stderr, "\tRead:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
131                                mem_hits.read.cache_hit, mem_hits.read.ring_hit, mem_hits.read.miss, mem_hits.read.recycled);
132                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
133                                total, (double) mem_hits.read.miss / (double) total * 100.0);
134        }
135
136        total = mem_hits.readbulk.cache_hit + mem_hits.readbulk.ring_hit + mem_hits.readbulk.miss;
137        if (total) {
138                fprintf(stderr, "\tReadbulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
139                                mem_hits.readbulk.cache_hit, mem_hits.readbulk.ring_hit, mem_hits.readbulk.miss, mem_hits.readbulk.recycled);
140
141
142                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
143                                total, (double) mem_hits.readbulk.miss / (double) total * 100.0);
144        }
145
146        total = mem_hits.write.cache_hit + mem_hits.write.ring_hit + mem_hits.write.miss;
147        if (total) {
148                fprintf(stderr, "\tWrite:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
149                                mem_hits.write.cache_hit, mem_hits.write.ring_hit, mem_hits.write.miss, mem_hits.write.recycled);
150
151                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
152                                total, (double) mem_hits.write.miss / (double) total * 100.0);
153        }
154
155        total = mem_hits.writebulk.cache_hit + mem_hits.writebulk.ring_hit + mem_hits.writebulk.miss;
156        if (total) {
157                fprintf(stderr, "\tWritebulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
158                                mem_hits.writebulk.cache_hit, mem_hits.writebulk.ring_hit, mem_hits.writebulk.miss, mem_hits.writebulk.recycled);
159
160                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
161                                total, (double) mem_hits.writebulk.miss / (double) total * 100.0);
162        }
163}
164#else
165static void print_memory_stats() {}
166#endif
167
168static const libtrace_generic_t gen_zero = {0};
169
170/* This should optimise away the switch to nothing in the explict cases */
171inline void send_message(libtrace_t *trace, libtrace_thread_t *thread,
172                const enum libtrace_messages type,
173                libtrace_generic_t data, libtrace_thread_t *sender) {
174
175        fn_cb_dataless fn = NULL;
176        enum libtrace_messages switchtype;
177        libtrace_callback_set_t *cbs = NULL;
178
179        if (thread == &trace->reporter_thread) {
180                cbs = trace->reporter_cbs;
181        } else {
182                cbs = trace->perpkt_cbs;
183        }
184
185        if (cbs == NULL)
186                return;
187
188        if (type >= MESSAGE_USER)
189                switchtype = MESSAGE_USER;
190        else
191                switchtype = (enum libtrace_messages) type;
192
193        switch (switchtype) {
194        case MESSAGE_STARTING:
195                if (cbs->message_starting)
196                        thread->user_data = (*cbs->message_starting)(trace,
197                                        thread, trace->global_blob);
198                return;
199        case MESSAGE_FIRST_PACKET:
200                if (cbs->message_first_packet)
201                                (*cbs->message_first_packet)(trace, thread,
202                                trace->global_blob, thread->user_data,
203                                sender);
204                return;
205        case MESSAGE_TICK_COUNT:
206                if (cbs->message_tick_count)
207                        (*cbs->message_tick_count)(trace, thread,
208                                        trace->global_blob, thread->user_data,
209                                        data.uint64);
210                return;
211        case MESSAGE_TICK_INTERVAL:
212                if (cbs->message_tick_interval)
213                        (*cbs->message_tick_interval)(trace, thread,
214                                        trace->global_blob, thread->user_data,
215                                        data.uint64);
216                return;
217        case MESSAGE_STOPPING:
218                fn = cbs->message_stopping;
219                break;
220        case MESSAGE_RESUMING:
221                fn = cbs->message_resuming;
222                break;
223        case MESSAGE_PAUSING:
224                fn = cbs->message_pausing;
225                break;
226        case MESSAGE_USER:
227                if (cbs->message_user)
228                        (*cbs->message_user)(trace, thread, trace->global_blob,
229                                        thread->user_data, type, data, sender);
230                return;
231        case MESSAGE_RESULT:
232                if (cbs->message_result)
233                        (*cbs->message_result)(trace, thread,
234                                        trace->global_blob, thread->user_data,
235                                        data.res);
236                return;
237
238        /* These should be unused */
239        case MESSAGE_DO_PAUSE:
240        case MESSAGE_DO_STOP:
241        case MESSAGE_POST_REPORTER:
242        case MESSAGE_PACKET:
243                return;
244        }
245
246        if (fn)
247                (*fn)(trace, thread, trace->global_blob, thread->user_data);
248}
249
250DLLEXPORT void trace_destroy_callback_set(libtrace_callback_set_t *cbset) {
251        free(cbset);
252}
253
254DLLEXPORT libtrace_callback_set_t *trace_create_callback_set() {
255        libtrace_callback_set_t *cbset;
256
257        cbset = (libtrace_callback_set_t *)malloc(sizeof(libtrace_callback_set_t));
258        memset(cbset, 0, sizeof(libtrace_callback_set_t));
259        return cbset;
260}
261
262/*
263 * This can be used once the hasher thread has been started and internally after
264 * verify_configuration.
265 */
266DLLEXPORT bool trace_has_dedicated_hasher(libtrace_t * libtrace)
267{
268        return libtrace->hasher_thread.type == THREAD_HASHER;
269}
270
271DLLEXPORT bool trace_has_reporter(libtrace_t * libtrace)
272{
273        assert(libtrace->state != STATE_NEW);
274        return libtrace->reporter_thread.type == THREAD_REPORTER && libtrace->reporter_cbs;
275}
276
277/**
278 * When running the number of perpkt threads in use.
279 * TODO what if the trace is not running yet, or has finished??
280 *
281 * @brief libtrace_perpkt_thread_nb
282 * @param t The trace
283 * @return
284 */
285DLLEXPORT int trace_get_perpkt_threads(libtrace_t * t) {
286        return t->perpkt_thread_count;
287}
288
289DLLEXPORT int trace_get_perpkt_thread_id(libtrace_thread_t *thread) {
290        return thread->perpkt_num;
291}
292
293/**
294 * Changes the overall traces state and signals the condition.
295 *
296 * @param trace A pointer to the trace
297 * @param new_state The new state of the trace
298 * @param need_lock Set to true if libtrace_lock is not held, otherwise
299 *        false in the case the lock is currently held by this thread.
300 */
301static inline void libtrace_change_state(libtrace_t *trace,
302        const enum trace_state new_state, const bool need_lock)
303{
304        UNUSED enum trace_state prev_state;
305        if (need_lock)
306                pthread_mutex_lock(&trace->libtrace_lock);
307        prev_state = trace->state;
308        trace->state = new_state;
309
310        if (trace->config.debug_state)
311                fprintf(stderr, "Trace(%s) state changed from %s to %s\n",
312                        trace->uridata, get_trace_state_name(prev_state),
313                        get_trace_state_name(trace->state));
314
315        pthread_cond_broadcast(&trace->perpkt_cond);
316        if (need_lock)
317                pthread_mutex_unlock(&trace->libtrace_lock);
318}
319
320/**
321 * Changes a thread's state and broadcasts the condition variable. This
322 * should always be done when the lock is held.
323 *
324 * Additionally for perpkt threads the state counts are updated.
325 *
326 * @param trace A pointer to the trace
327 * @param t A pointer to the thread to modify
328 * @param new_state The new state of the thread
329 * @param need_lock Set to true if libtrace_lock is not held, otherwise
330 *        false in the case the lock is currently held by this thread.
331 */
332static inline void thread_change_state(libtrace_t *trace, libtrace_thread_t *t,
333        const enum thread_states new_state, const bool need_lock)
334{
335        enum thread_states prev_state;
336        if (need_lock)
337                pthread_mutex_lock(&trace->libtrace_lock);
338        prev_state = t->state;
339        t->state = new_state;
340        if (t->type == THREAD_PERPKT) {
341                --trace->perpkt_thread_states[prev_state];
342                ++trace->perpkt_thread_states[new_state];
343        }
344
345        if (trace->config.debug_state)
346                fprintf(stderr, "Thread %d state changed from %d to %d\n",
347                        (int) t->tid, prev_state, t->state);
348
349        if (trace->perpkt_thread_states[THREAD_FINISHED] == trace->perpkt_thread_count) {
350                /* Make sure we save our final stats in case someone wants
351                 * them at the end of their program.
352                 */
353
354                trace_get_statistics(trace, NULL);
355                libtrace_change_state(trace, STATE_FINISHED, false);
356        }
357
358        pthread_cond_broadcast(&trace->perpkt_cond);
359        if (need_lock)
360                pthread_mutex_unlock(&trace->libtrace_lock);
361}
362
363/**
364 * This is valid once a trace is initialised
365 *
366 * @return True if the format supports parallel threads.
367 */
368static inline bool trace_supports_parallel(libtrace_t *trace)
369{
370        assert(trace);
371        assert(trace->format);
372        if (trace->format->pstart_input)
373                return true;
374        else
375                return false;
376}
377
378void libtrace_zero_thread(libtrace_thread_t * t) {
379        t->accepted_packets = 0;
380        t->filtered_packets = 0;
381        t->recorded_first = false;
382        t->tracetime_offset_usec = 0;
383        t->user_data = 0;
384        t->format_data = 0;
385        libtrace_zero_ringbuffer(&t->rbuffer);
386        t->trace = NULL;
387        t->ret = NULL;
388        t->type = THREAD_EMPTY;
389        t->perpkt_num = -1;
390}
391
392// Ints are aligned int is atomic so safe to read and write at same time
393// However write must be locked, read doesn't (We never try read before written to table)
394libtrace_thread_t * get_thread_table(libtrace_t *libtrace) {
395        int i = 0;
396        pthread_t tid = pthread_self();
397
398        for (;i<libtrace->perpkt_thread_count ;++i) {
399                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
400                        return &libtrace->perpkt_threads[i];
401        }
402        return NULL;
403}
404
405static libtrace_thread_t * get_thread_descriptor(libtrace_t *libtrace) {
406        libtrace_thread_t *ret;
407        if (!(ret = get_thread_table(libtrace))) {
408                pthread_t tid = pthread_self();
409                // Check if we are reporter or something else
410                if (libtrace->hasher_thread.type == THREAD_REPORTER &&
411                                pthread_equal(tid, libtrace->reporter_thread.tid))
412                        ret = &libtrace->reporter_thread;
413                else if (libtrace->hasher_thread.type == THREAD_HASHER &&
414                         pthread_equal(tid, libtrace->hasher_thread.tid))
415                        ret = &libtrace->hasher_thread;
416                else
417                        ret = NULL;
418        }
419        return ret;
420}
421
422DLLEXPORT void libtrace_make_packet_safe(libtrace_packet_t *pkt) {
423        // Duplicate the packet in standard malloc'd memory and free the
424        // original, This is a 1:1 exchange so the ocache count remains unchanged.
425        if (pkt->buf_control != TRACE_CTRL_PACKET) {
426                libtrace_packet_t *dup;
427                dup = trace_copy_packet(pkt);
428                /* Release the external buffer */
429                trace_fin_packet(pkt);
430                /* Copy the duplicated packet over the existing */
431                memcpy(pkt, dup, sizeof(libtrace_packet_t));
432                /* Free the packet structure */
433                free(dup);
434        }
435}
436
437/**
438 * Makes a libtrace_result_t safe, used when pausing a trace.
439 * This will call libtrace_make_packet_safe if the result is
440 * a packet.
441 */
442DLLEXPORT void libtrace_make_result_safe(libtrace_result_t *res) {
443        if (res->type == RESULT_PACKET) {
444                libtrace_make_packet_safe(res->value.pkt);
445        }
446}
447
448/**
449 * Holds threads in a paused state, until released by broadcasting
450 * the condition mutex.
451 */
452static void trace_thread_pause(libtrace_t *trace, libtrace_thread_t *t) {
453        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
454        thread_change_state(trace, t, THREAD_PAUSED, false);
455        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
456                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
457        }
458        thread_change_state(trace, t, THREAD_RUNNING, false);
459        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
460}
461
462/**
463 * Sends a packet to the user, expects either a valid packet or a TICK packet.
464 *
465 * @param trace The trace
466 * @param t The current thread
467 * @param packet A pointer to the packet storage, which may be set to null upon
468 *               return, or a packet to be finished.
469 * @param tracetime If true packets are delayed to match with tracetime
470 * @return 0 is successful, otherwise if playing back in tracetime
471 *         READ_MESSAGE(-2) can be returned in which case the packet is not sent.
472 *
473 * @note READ_MESSAGE will only be returned if tracetime is true.
474 */
475static inline int dispatch_packet(libtrace_t *trace,
476                                  libtrace_thread_t *t,
477                                  libtrace_packet_t **packet,
478                                  bool tracetime) {
479
480        if ((*packet)->error > 0) {
481                if (tracetime) {
482                        if (delay_tracetime(trace, packet[0], t) == READ_MESSAGE)
483                                return READ_MESSAGE;
484                }
485                if (!IS_LIBTRACE_META_PACKET((*packet))) {
486                        t->accepted_packets++;
487                }
488                if (trace->perpkt_cbs->message_packet)
489                        *packet = (*trace->perpkt_cbs->message_packet)(trace, t, trace->global_blob, t->user_data, *packet);
490                trace_fin_packet(*packet);
491        } else {
492                assert((*packet)->error == READ_TICK);
493                libtrace_generic_t data = {.uint64 = trace_packet_get_order(*packet)};
494                send_message(trace, t, MESSAGE_TICK_COUNT, data, t);
495        }
496        return 0;
497}
498
499/**
500 * Sends a batch of packets to the user, expects either a valid packet or a
501 * TICK packet.
502 *
503 * @param trace The trace
504 * @param t The current thread
505 * @param packets [in,out] An array of packets, these may be null upon return
506 * @param nb_packets The total number of packets in the list
507 * @param empty [in,out] A pointer to an integer storing the first empty slot,
508 * upon return this is updated
509 * @param offset [in,out] The offset into the array, upon return this is updated
510 * @param tracetime If true packets are delayed to match with tracetime
511 * @return 0 is successful, otherwise if playing back in tracetime
512 *         READ_MESSAGE(-2) can be returned in which case the packet is not sent.
513 *
514 * @note READ_MESSAGE will only be returned if tracetime is true.
515 */
516static inline int dispatch_packets(libtrace_t *trace,
517                                  libtrace_thread_t *t,
518                                  libtrace_packet_t *packets[],
519                                  int nb_packets, int *empty, int *offset,
520                                  bool tracetime) {
521        for (;*offset < nb_packets; ++*offset) {
522                int ret;
523                ret = dispatch_packet(trace, t, &packets[*offset], tracetime);
524                if (ret == 0) {
525                        /* Move full slots to front as we go */
526                        if (packets[*offset]) {
527                                if (*empty != *offset) {
528                                        packets[*empty] = packets[*offset];
529                                        packets[*offset] = NULL;
530                                }
531                                ++*empty;
532                        }
533                } else {
534                        /* Break early */
535                        assert(ret == READ_MESSAGE);
536                        return READ_MESSAGE;
537                }
538        }
539
540        return 0;
541}
542
543/**
544 * Pauses a per packet thread, messages will not be processed when the thread
545 * is paused.
546 *
547 * This process involves reading packets if a hasher thread is used. As such
548 * this function can fail to pause due to errors when reading in which case
549 * the thread should be stopped instead.
550 *
551 *
552 * @brief trace_perpkt_thread_pause
553 * @return READ_ERROR(-1) or READ_EOF(0) or 1 if successfull
554 */
555static int trace_perpkt_thread_pause(libtrace_t *trace, libtrace_thread_t *t,
556                                     libtrace_packet_t *packets[],
557                                     int nb_packets, int *empty, int *offset) {
558        libtrace_packet_t * packet = NULL;
559
560        /* Let the user thread know we are going to pause */
561        send_message(trace, t, MESSAGE_PAUSING, gen_zero, t);
562
563        /* Send through any remaining packets (or messages) without delay */
564
565        /* First send those packets already read, as fast as possible
566         * This should never fail or check for messages etc. */
567        ASSERT_RET(dispatch_packets(trace, t, packets, nb_packets, empty,
568                                    offset, false), == 0);
569
570        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
571        /* If a hasher thread is running, empty input queues so we don't lose data */
572        if (trace_has_dedicated_hasher(trace)) {
573                // The hasher has stopped by this point, so the queue shouldn't be filling
574                while(!libtrace_ringbuffer_is_empty(&t->rbuffer) || t->format_data) {
575                        int ret = trace->pread(trace, t, &packet, 1);
576                        if (ret == 1) {
577                                if (packet->error > 0) {
578                                        store_first_packet(trace, packet, t);
579                                }
580                                ASSERT_RET(dispatch_packet(trace, t, &packet, false), == 0);
581                                if (packet == NULL)
582                                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
583                        } else if (ret != READ_MESSAGE) {
584                                /* Ignore messages we pick these up next loop */
585                                assert (ret == READ_EOF || ret == READ_ERROR);
586                                /* Verify no packets are remaining */
587                                /* TODO refactor this sanity check out!! */
588                                while (!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
589                                        ASSERT_RET(trace->pread(trace, t, &packet, 1), <= 0);
590                                        // No packets after this should have any data in them
591                                        assert(packet->error <= 0);
592                                }
593                                libtrace_ocache_free(&trace->packet_freelist, (void **) &packet, 1, 1);
594                                return -1;
595                        }
596                }
597        }
598        libtrace_ocache_free(&trace->packet_freelist, (void **) &packet, 1, 1);
599
600        /* Now we do the actual pause, this returns when we resumed */
601        trace_thread_pause(trace, t);
602        send_message(trace, t, MESSAGE_RESUMING, gen_zero, t);
603        return 1;
604}
605
606/**
607 * The is the entry point for our packet processing threads.
608 */
609static void* perpkt_threads_entry(void *data) {
610        libtrace_t *trace = (libtrace_t *)data;
611        libtrace_thread_t *t;
612        libtrace_message_t message = {0, {.uint64=0}, NULL};
613        libtrace_packet_t *packets[trace->config.burst_size];
614        size_t i;
615        //int ret;
616        /* The current reading position into the packets */
617        int offset = 0;
618        /* The number of packets last read */
619        int nb_packets = 0;
620        /* The offset to the first NULL packet upto offset */
621        int empty = 0;
622        int j;
623
624        /* Wait until trace_pstart has been completed */
625        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
626        t = get_thread_table(trace);
627        assert(t);
628        if (trace->state == STATE_ERROR) {
629                thread_change_state(trace, t, THREAD_FINISHED, false);
630                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
631                pthread_exit(NULL);
632        }
633        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
634
635        if (trace->format->pregister_thread) {
636                if (trace->format->pregister_thread(trace, t, 
637                                trace_is_parallel(trace)) < 0) {
638                        thread_change_state(trace, t, THREAD_FINISHED, false);
639                        pthread_exit(NULL);
640                }
641        }
642
643        /* Fill our buffer with empty packets */
644        memset(&packets, 0, sizeof(void*) * trace->config.burst_size);
645        libtrace_ocache_alloc(&trace->packet_freelist, (void **) packets,
646                              trace->config.burst_size,
647                              trace->config.burst_size);
648
649        /* ~~~~~~~~~~~ Setup complete now we loop ~~~~~~~~~~~~~~~ */
650
651        /* Let the per_packet function know we have started */
652        send_message(trace, t, MESSAGE_STARTING, gen_zero, t);
653        send_message(trace, t, MESSAGE_RESUMING, gen_zero, t);
654
655        for (;;) {
656
657                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
658                        int ret;
659                        switch (message.code) {
660                                case MESSAGE_DO_PAUSE: // This is internal
661                                        ret = trace_perpkt_thread_pause(trace, t,
662                                              packets, nb_packets, &empty, &offset);
663                                        if (ret == READ_EOF) {
664                                                goto eof;
665                                        } else if (ret == READ_ERROR) {
666                                                goto error;
667                                        }
668                                        assert(ret == 1);
669                                        continue;
670                                case MESSAGE_DO_STOP: // This is internal
671                                        goto eof;
672                        }
673                        send_message(trace, t, message.code, message.data, 
674                                        message.sender);
675                        /* Continue and the empty messages out before packets */
676                        continue;
677                }
678
679
680                /* Do we need to read a new set of packets MOST LIKELY we do */
681                if (offset == nb_packets) {
682                        /* Refill the packet buffer */
683                        if (empty != nb_packets) {
684                                // Refill the empty packets
685                                libtrace_ocache_alloc(&trace->packet_freelist,
686                                                      (void **) &packets[empty],
687                                                      nb_packets - empty,
688                                                      nb_packets - empty);
689                        }
690                        if (!trace->pread) {
691                                assert(packets[0]);
692                                nb_packets = trace_read_packet(trace, packets[0]);
693                                packets[0]->error = nb_packets;
694                                if (nb_packets > 0)
695                                        nb_packets = 1;
696                        } else {
697                                nb_packets = trace->pread(trace, t, packets, trace->config.burst_size);
698                        }
699                        offset = 0;
700                        empty = 0;
701                }
702
703                /* Handle error/message cases */
704                if (nb_packets > 0) {
705                        /* Store the first non-meta packet */
706                        for (j = 0; j < nb_packets; j++) {
707                                if (t->recorded_first)
708                                        break;
709                                if (packets[j]->error > 0) {
710                                        store_first_packet(trace, packets[j], t);
711                                }
712                        }
713                        dispatch_packets(trace, t, packets, nb_packets, &empty,
714                                         &offset, trace->tracetime);
715                } else {
716                        switch (nb_packets) {
717                        case READ_EOF:
718                                goto eof;
719                        case READ_ERROR:
720                                goto error;
721                        case READ_MESSAGE:
722                                nb_packets = 0;
723                                continue;
724                        default:
725                                fprintf(stderr, "Unexpected error %d!!\n", nb_packets);
726                                goto error;
727                        }
728                }
729
730        }
731
732error:
733        message.code = MESSAGE_DO_STOP;
734        message.sender = t;
735        message.data.uint64 = 0;
736        trace_message_perpkts(trace, &message);
737eof:
738        /* ~~~~~~~~~~~~~~ Trace is finished do tear down ~~~~~~~~~~~~~~~~~~~~~ */
739
740        // Let the per_packet function know we have stopped
741        send_message(trace, t, MESSAGE_PAUSING, gen_zero, t);
742        send_message(trace, t, MESSAGE_STOPPING, gen_zero, t);
743
744        // Free any remaining packets
745        for (i = 0; i < trace->config.burst_size; i++) {
746                if (packets[i]) {
747                        libtrace_ocache_free(&trace->packet_freelist, (void **) &packets[i], 1, 1);
748                        packets[i] = NULL;
749                }
750        }
751
752        thread_change_state(trace, t, THREAD_FINISHED, true);
753
754        /* Make sure the reporter sees we have finished */
755        if (trace_has_reporter(trace))
756                trace_post_reporter(trace);
757
758        // Release all ocache memory before unregistering with the format
759        // because this might(it does in DPDK) unlink the formats mempool
760        // causing destroy/finish packet to fail.
761        libtrace_ocache_unregister_thread(&trace->packet_freelist);
762        if (trace->format->punregister_thread) {
763                trace->format->punregister_thread(trace, t);
764        }
765        print_memory_stats();
766
767        pthread_exit(NULL);
768}
769
770/**
771 * The start point for our single threaded hasher thread, this will read
772 * and hash a packet from a data source and queue it against the correct
773 * core to process it.
774 */
775static void* hasher_entry(void *data) {
776        libtrace_t *trace = (libtrace_t *)data;
777        libtrace_thread_t * t;
778        int i;
779        libtrace_packet_t * packet;
780        libtrace_message_t message = {0, {.uint64=0}, NULL};
781        int pkt_skipped = 0;
782
783        assert(trace_has_dedicated_hasher(trace));
784        /* Wait until all threads are started and objects are initialised (ring buffers) */
785        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
786        t = &trace->hasher_thread;
787        assert(t->type == THREAD_HASHER && pthread_equal(pthread_self(), t->tid));
788        if (trace->state == STATE_ERROR) {
789                thread_change_state(trace, t, THREAD_FINISHED, false);
790                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
791                pthread_exit(NULL);
792        }
793        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
794
795        /* We are reading but it is not the parallel API */
796        if (trace->format->pregister_thread) {
797                trace->format->pregister_thread(trace, t, true);
798        }
799
800        /* Read all packets in then hash and queue against the correct thread */
801        while (1) {
802                int thread;
803                if (!pkt_skipped)
804                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
805                assert(packet);
806
807                // Check for messages that we expect MESSAGE_DO_PAUSE, (internal messages only)
808                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
809                        switch(message.code) {
810                                case MESSAGE_DO_PAUSE:
811                                        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
812                                        thread_change_state(trace, t, THREAD_PAUSED, false);
813                                        pthread_cond_broadcast(&trace->perpkt_cond);
814                                        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
815                                                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
816                                        }
817                                        thread_change_state(trace, t, THREAD_RUNNING, false);
818                                        pthread_cond_broadcast(&trace->perpkt_cond);
819                                        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
820                                        break;
821                                case MESSAGE_DO_STOP:
822                                        /* Either FINISHED or FINISHING */
823                                        assert(trace->started == false);
824                                        /* Mark the current packet as EOF */
825                                        packet->error = 0;
826                                        goto hasher_eof;
827                                default:
828                                        fprintf(stderr, "Hasher thread didn't expect message code=%d\n", message.code);
829                        }
830                        pkt_skipped = 1;
831                        continue;
832                }
833
834                if ((packet->error = trace_read_packet(trace, packet)) <1) {
835                        if (packet->error == READ_MESSAGE) {
836                                pkt_skipped = 1;
837                                continue;
838                        } else {
839                                break; /* We are EOF or error'd either way we stop  */
840                        }
841                }
842
843                /* We are guaranteed to have a hash function i.e. != NULL */
844                trace_packet_set_hash(packet, (*trace->hasher)(packet, trace->hasher_data));
845                thread = trace_packet_get_hash(packet) % trace->perpkt_thread_count;
846                /* Blocking write to the correct queue - I'm the only writer */
847                if (trace->perpkt_threads[thread].state != THREAD_FINISHED) {
848                        uint64_t order = trace_packet_get_order(packet);
849                        libtrace_ringbuffer_write(&trace->perpkt_threads[thread].rbuffer, packet);
850                        if (trace->config.tick_count && order % trace->config.tick_count == 0) {
851                                // Write ticks to everyone else
852                                libtrace_packet_t * pkts[trace->perpkt_thread_count];
853                                memset(pkts, 0, sizeof(void *) * trace->perpkt_thread_count);
854                                libtrace_ocache_alloc(&trace->packet_freelist, (void **) pkts, trace->perpkt_thread_count, trace->perpkt_thread_count);
855                                for (i = 0; i < trace->perpkt_thread_count; i++) {
856                                        pkts[i]->error = READ_TICK;
857                                        trace_packet_set_order(pkts[i], order);
858                                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, pkts[i]);
859                                }
860                        }
861                        pkt_skipped = 0;
862                } else {
863                        assert(!"Dropping a packet!!");
864                        pkt_skipped = 1; // Reuse that packet no one read it
865                }
866        }
867hasher_eof:
868        /* Broadcast our last failed read to all threads */
869        for (i = 0; i < trace->perpkt_thread_count; i++) {
870                libtrace_packet_t * bcast;
871                if (i == trace->perpkt_thread_count - 1) {
872                        bcast = packet;
873                } else {
874                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &bcast, 1, 1);
875                        bcast->error = packet->error;
876                }
877                ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
878                if (trace->perpkt_threads[i].state != THREAD_FINISHED) {
879                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, bcast);
880                } else {
881                        libtrace_ocache_free(&trace->packet_freelist, (void **) &bcast, 1, 1);
882                }
883                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
884        }
885
886        // We don't need to free the packet
887        thread_change_state(trace, t, THREAD_FINISHED, true);
888
889        libtrace_ocache_unregister_thread(&trace->packet_freelist);
890        if (trace->format->punregister_thread) {
891                trace->format->punregister_thread(trace, t);
892        }
893        print_memory_stats();
894
895        // TODO remove from TTABLE t sometime
896        pthread_exit(NULL);
897}
898
899/* Our simplest case when a thread becomes ready it can obtain an exclusive
900 * lock to read packets from the underlying trace.
901 */
902static int trace_pread_packet_first_in_first_served(libtrace_t *libtrace,
903                                                    libtrace_thread_t *t,
904                                                    libtrace_packet_t *packets[],
905                                                    size_t nb_packets) {
906        size_t i = 0;
907        //bool tick_hit = false;
908
909        ASSERT_RET(pthread_mutex_lock(&libtrace->read_packet_lock), == 0);
910        /* Read nb_packets */
911        for (i = 0; i < nb_packets; ++i) {
912                if (libtrace_message_queue_count(&t->messages) > 0) {
913                        if ( i==0 ) {
914                                ASSERT_RET(pthread_mutex_unlock(&libtrace->read_packet_lock), == 0);
915                                return READ_MESSAGE;
916                        } else {
917                                break;
918                        }
919                }
920                packets[i]->error = trace_read_packet(libtrace, packets[i]);
921
922                if (packets[i]->error <= 0) {
923                        /* We'll catch this next time if we have already got packets */
924                        if ( i==0 ) {
925                                ASSERT_RET(pthread_mutex_unlock(&libtrace->read_packet_lock), == 0);
926                                return packets[i]->error;
927                        } else {
928                                break;
929                        }
930                }
931                /*
932                if (libtrace->config.tick_count && trace_packet_get_order(packets[i]) % libtrace->config.tick_count == 0) {
933                        tick_hit = true;
934                }*/
935
936                // Doing this inside the lock ensures the first packet is
937                // always recorded first
938                if (!t->recorded_first && packets[0]->error > 0) {
939                        store_first_packet(libtrace, packets[0], t);
940                }
941        }
942        ASSERT_RET(pthread_mutex_unlock(&libtrace->read_packet_lock), == 0);
943        /* XXX TODO this needs to be inband with packets, or we don't bother in this case
944        if (tick_hit) {
945                libtrace_message_t tick;
946                tick.additional.uint64 = trace_packet_get_order(packets[i]);
947                tick.code = MESSAGE_TICK;
948                trace_send_message_to_perpkts(libtrace, &tick);
949        } */
950        return i;
951}
952
953/**
954 * For the case that we have a dedicated hasher thread
955 * 1. We read a packet from our buffer
956 * 2. Move that into the packet provided (packet)
957 */
958inline static int trace_pread_packet_hasher_thread(libtrace_t *libtrace,
959                                                   libtrace_thread_t *t,
960                                                   libtrace_packet_t *packets[],
961                                                   size_t nb_packets) {
962        size_t i;
963
964        /* We store the last error message here */
965        if (t->format_data) {
966                return ((libtrace_packet_t *)t->format_data)->error;
967        }
968
969        // Always grab at least one
970        if (packets[0]) // Recycle the old get the new
971                libtrace_ocache_free(&libtrace->packet_freelist, (void **) packets, 1, 1);
972        packets[0] = libtrace_ringbuffer_read(&t->rbuffer);
973
974        if (packets[0]->error <= 0 && packets[0]->error != READ_TICK) {
975                return packets[0]->error;
976        }
977
978        for (i = 1; i < nb_packets; i++) {
979                if (packets[i]) // Recycle the old get the new
980                        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packets[i], 1, 1);
981                if (!libtrace_ringbuffer_try_read(&t->rbuffer, (void **) &packets[i])) {
982                        packets[i] = NULL;
983                        break;
984                }
985
986                /* We will return an error or EOF the next time around */
987                if (packets[i]->error <= 0 && packets[0]->error != READ_TICK) {
988                        /* The message case will be checked automatically -
989                           However other cases like EOF and error will only be
990                           sent once*/
991                        if (packets[i]->error != READ_MESSAGE) {
992                                assert(t->format_data == NULL);
993                                t->format_data = packets[i];
994                        }
995                        break;
996                }
997        }
998
999        return i;
1000}
1001
1002/**
1003 * For the first packet of each queue we keep a copy and note the system
1004 * time it was received at.
1005 *
1006 * This is used for finding the first packet when playing back a trace
1007 * in trace time. And can be used by real time applications to print
1008 * results out every XXX seconds.
1009 */
1010void store_first_packet(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t)
1011{
1012
1013        libtrace_message_t mesg = {0, {.uint64=0}, NULL};
1014        struct timeval tv;
1015        libtrace_packet_t * dup;
1016
1017        if (t->recorded_first) {
1018                return;
1019        }
1020
1021        if (IS_LIBTRACE_META_PACKET(packet)) {
1022                return;
1023        }
1024
1025        /* We mark system time against a copy of the packet */
1026        gettimeofday(&tv, NULL);
1027        dup = trace_copy_packet(packet);
1028
1029        ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
1030        libtrace->first_packets.packets[t->perpkt_num].packet = dup;
1031        memcpy(&libtrace->first_packets.packets[t->perpkt_num].tv, &tv, sizeof(tv));
1032        libtrace->first_packets.count++;
1033
1034        /* Now update the first */
1035        if (libtrace->first_packets.count == 1) {
1036                /* We the first entry hence also the first known packet */
1037                libtrace->first_packets.first = t->perpkt_num;
1038        } else {
1039                /* Check if we are newer than the previous 'first' packet */
1040                size_t first = libtrace->first_packets.first;
1041                struct timeval cur_ts = trace_get_timeval(dup);
1042                struct timeval first_ts = trace_get_timeval(libtrace->first_packets.packets[first].packet);
1043                if (timercmp(&cur_ts, &first_ts, <))
1044                        libtrace->first_packets.first = t->perpkt_num;
1045        }
1046        ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
1047
1048        memset(&mesg, 0, sizeof(libtrace_message_t));
1049        mesg.code = MESSAGE_FIRST_PACKET;
1050        trace_message_reporter(libtrace, &mesg);
1051        trace_message_perpkts(libtrace, &mesg);
1052        t->recorded_first = true;
1053}
1054
1055DLLEXPORT int trace_get_first_packet(libtrace_t *libtrace,
1056                                     libtrace_thread_t *t,
1057                                     const libtrace_packet_t **packet,
1058                                     const struct timeval **tv)
1059{
1060        void * tmp;
1061        int ret = 0;
1062
1063        if (t) {
1064                if (t->type != THREAD_PERPKT || t->trace != libtrace)
1065                        return -1;
1066        }
1067
1068        /* Throw away these which we don't use */
1069        if (!packet)
1070                packet = (const libtrace_packet_t **) &tmp;
1071        if (!tv)
1072                tv = (const struct timeval **) &tmp;
1073
1074        ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
1075        if (t) {
1076                /* Get the requested thread */
1077                *packet = libtrace->first_packets.packets[t->perpkt_num].packet;
1078                *tv = &libtrace->first_packets.packets[t->perpkt_num].tv;
1079        } else if (libtrace->first_packets.count) {
1080                /* Get the first packet across all threads */
1081                *packet = libtrace->first_packets.packets[libtrace->first_packets.first].packet;
1082                *tv = &libtrace->first_packets.packets[libtrace->first_packets.first].tv;
1083                if (libtrace->first_packets.count == (size_t) libtrace->perpkt_thread_count) {
1084                        ret = 1;
1085                } else {
1086                        struct timeval curr_tv;
1087                        // If a second has passed since the first entry we will assume this is the very first packet
1088                        gettimeofday(&curr_tv, NULL);
1089                        if (curr_tv.tv_sec > (*tv)->tv_sec) {
1090                                if(curr_tv.tv_usec > (*tv)->tv_usec || curr_tv.tv_sec - (*tv)->tv_sec > 1) {
1091                                        ret = 1;
1092                                }
1093                        }
1094                }
1095        } else {
1096                *packet = NULL;
1097                *tv = NULL;
1098        }
1099        ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
1100        return ret;
1101}
1102
1103
1104DLLEXPORT uint64_t tv_to_usec(const struct timeval *tv)
1105{
1106        return (uint64_t) tv->tv_sec*1000000ull + (uint64_t) tv->tv_usec;
1107}
1108
1109inline static struct timeval usec_to_tv(uint64_t usec)
1110{
1111        struct timeval tv;
1112        tv.tv_sec = usec / 1000000;
1113        tv.tv_usec = usec % 1000000;
1114        return tv;
1115}
1116
1117/** Similar to delay_tracetime but send messages to all threads periodically */
1118static void* reporter_entry(void *data) {
1119        libtrace_message_t message = {0, {.uint64=0}, NULL};
1120        libtrace_t *trace = (libtrace_t *)data;
1121        libtrace_thread_t *t = &trace->reporter_thread;
1122
1123        /* Wait until all threads are started */
1124        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
1125        if (trace->state == STATE_ERROR) {
1126                thread_change_state(trace, t, THREAD_FINISHED, false);
1127                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1128                pthread_exit(NULL);
1129        }
1130        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1131
1132        if (trace->format->pregister_thread) {
1133                trace->format->pregister_thread(trace, t, false);
1134        }
1135
1136        send_message(trace, t, MESSAGE_STARTING, (libtrace_generic_t){0}, t);
1137        send_message(trace, t, MESSAGE_RESUMING, (libtrace_generic_t){0}, t);
1138
1139        while (!trace_has_finished(trace)) {
1140                if (trace->config.reporter_polling) {
1141                        if (libtrace_message_queue_try_get(&t->messages, &message) == LIBTRACE_MQ_FAILED)
1142                                message.code = MESSAGE_POST_REPORTER;
1143                } else {
1144                        libtrace_message_queue_get(&t->messages, &message);
1145                }
1146                switch (message.code) {
1147                        // Check for results
1148                        case MESSAGE_POST_REPORTER:
1149                                trace->combiner.read(trace, &trace->combiner);
1150                                break;
1151                        case MESSAGE_DO_PAUSE:
1152                                assert(trace->combiner.pause);
1153                                trace->combiner.pause(trace, &trace->combiner);
1154                                send_message(trace, t, MESSAGE_PAUSING,
1155                                                (libtrace_generic_t) {0}, t);
1156                                trace_thread_pause(trace, t);
1157                                send_message(trace, t, MESSAGE_RESUMING,
1158                                                (libtrace_generic_t) {0}, t);
1159                                break;
1160                default:
1161                        send_message(trace, t, message.code, message.data,
1162                                        message.sender);
1163                }
1164        }
1165
1166        // Flush out whats left now all our threads have finished
1167        trace->combiner.read_final(trace, &trace->combiner);
1168
1169        // GOODBYE
1170        send_message(trace, t, MESSAGE_PAUSING,(libtrace_generic_t) {0}, t);
1171        send_message(trace, t, MESSAGE_STOPPING,(libtrace_generic_t) {0}, t);
1172
1173        thread_change_state(trace, &trace->reporter_thread, THREAD_FINISHED, true);
1174        print_memory_stats();
1175        return NULL;
1176}
1177
1178/** Similar to delay_tracetime but send messages to all threads periodically */
1179static void* keepalive_entry(void *data) {
1180        struct timeval prev, next;
1181        libtrace_message_t message = {0, {.uint64=0}, NULL};
1182        libtrace_t *trace = (libtrace_t *)data;
1183        uint64_t next_release;
1184        libtrace_thread_t *t = &trace->keepalive_thread;
1185
1186        /* Wait until all threads are started */
1187        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
1188        if (trace->state == STATE_ERROR) {
1189                thread_change_state(trace, t, THREAD_FINISHED, false);
1190                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1191                pthread_exit(NULL);
1192        }
1193        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1194
1195        gettimeofday(&prev, NULL);
1196        memset(&message, 0, sizeof(libtrace_message_t));
1197        message.code = MESSAGE_TICK_INTERVAL;
1198
1199        while (trace->state != STATE_FINISHED) {
1200                fd_set rfds;
1201                next_release = tv_to_usec(&prev) + (trace->config.tick_interval * 1000);
1202                gettimeofday(&next, NULL);
1203                if (next_release > tv_to_usec(&next)) {
1204                        next = usec_to_tv(next_release - tv_to_usec(&next));
1205                        // Wait for timeout or a message
1206                        FD_ZERO(&rfds);
1207                        FD_SET(libtrace_message_queue_get_fd(&t->messages), &rfds);
1208                        if (select(libtrace_message_queue_get_fd(&t->messages)+1, &rfds, NULL, NULL, &next) == 1) {
1209                                libtrace_message_t msg;
1210                                libtrace_message_queue_get(&t->messages, &msg);
1211                                assert(msg.code == MESSAGE_DO_STOP);
1212                                goto done;
1213                        }
1214                }
1215                prev = usec_to_tv(next_release);
1216                if (trace->state == STATE_RUNNING) {
1217                        message.data.uint64 = ((((uint64_t)prev.tv_sec) << 32) +
1218                                               (((uint64_t)prev.tv_usec << 32)/1000000));
1219                        trace_message_perpkts(trace, &message);
1220                }
1221        }
1222done:
1223
1224        thread_change_state(trace, t, THREAD_FINISHED, true);
1225        return NULL;
1226}
1227
1228/**
1229 * Delays a packets playback so the playback will be in trace time.
1230 * This may break early if a message becomes available.
1231 *
1232 * Requires the first packet for this thread to be received.
1233 * @param libtrace  The trace
1234 * @param packet    The packet to delay
1235 * @param t         The current thread
1236 * @return Either READ_MESSAGE(-2) or 0 is successful
1237 */
1238static inline int delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t) {
1239        struct timeval curr_tv, pkt_tv;
1240        uint64_t next_release = t->tracetime_offset_usec;
1241        uint64_t curr_usec;
1242
1243        if (!t->tracetime_offset_usec) {
1244                const libtrace_packet_t *first_pkt;
1245                const struct timeval *sys_tv;
1246                int64_t initial_offset;
1247                int stable = trace_get_first_packet(libtrace, NULL, &first_pkt, &sys_tv);
1248                if (!first_pkt)
1249                        return 0;
1250                pkt_tv = trace_get_timeval(first_pkt);
1251                initial_offset = (int64_t)tv_to_usec(sys_tv) - (int64_t)tv_to_usec(&pkt_tv);
1252                /* In the unlikely case offset is 0, change it to 1 */
1253                if (stable)
1254                        t->tracetime_offset_usec = initial_offset ? initial_offset: 1;
1255                next_release = initial_offset;
1256        }
1257        /* next_release == offset */
1258        pkt_tv = trace_get_timeval(packet);
1259        next_release += tv_to_usec(&pkt_tv);
1260        gettimeofday(&curr_tv, NULL);
1261        curr_usec = tv_to_usec(&curr_tv);
1262        if (next_release > curr_usec) {
1263                int ret, mesg_fd = libtrace_message_queue_get_fd(&t->messages);
1264                struct timeval delay_tv = usec_to_tv(next_release-curr_usec);
1265                fd_set rfds;
1266                FD_ZERO(&rfds);
1267                FD_SET(mesg_fd, &rfds);
1268                // We need to wait
1269                ret = select(mesg_fd+1, &rfds, NULL, NULL, &delay_tv);
1270                if (ret == 0) {
1271                        return 0;
1272                } else if (ret > 0) {
1273                        return READ_MESSAGE;
1274                } else {
1275                        assert(!"trace_delay_packet: Unexpected return from select");
1276                }
1277        }
1278        return 0;
1279}
1280
1281/* Discards packets that don't match the filter.
1282 * Discarded packets are emptied and then moved to the end of the packet list.
1283 *
1284 * @param trace       The trace format, containing the filter
1285 * @param packets     An array of packets
1286 * @param nb_packets  The number of valid items in packets
1287 *
1288 * @return The number of packets that passed the filter, which are moved to
1289 *          the start of the packets array
1290 */
1291static inline size_t filter_packets(libtrace_t *trace,
1292                                    libtrace_packet_t **packets,
1293                                    size_t nb_packets) {
1294        size_t offset = 0;
1295        size_t i;
1296
1297        for (i = 0; i < nb_packets; ++i) {
1298                // The filter needs the trace attached to receive the link type
1299                packets[i]->trace = trace;
1300                if (trace_apply_filter(trace->filter, packets[i])) {
1301                        libtrace_packet_t *tmp;
1302                        tmp = packets[offset];
1303                        packets[offset++] = packets[i];
1304                        packets[i] = tmp;
1305                } else {
1306                        trace_fin_packet(packets[i]);
1307                }
1308        }
1309
1310        return offset;
1311}
1312
1313/* Read a batch of packets from the trace into a buffer.
1314 * Note that this function will block until a packet is read (or EOF is reached)
1315 *
1316 * @param libtrace    The trace
1317 * @param t           The thread
1318 * @param packets     An array of packets
1319 * @param nb_packets  The number of empty packets in packets
1320 * @return The number of packets read, 0 on EOF (or an error/message -1,-2).
1321 */
1322static int trace_pread_packet_wrapper(libtrace_t *libtrace,
1323                                      libtrace_thread_t *t,
1324                                      libtrace_packet_t *packets[],
1325                                      size_t nb_packets) {
1326        int i;
1327        assert(nb_packets);
1328        assert(libtrace && "libtrace is NULL in trace_read_packet()");
1329        if (trace_is_err(libtrace))
1330                return -1;
1331        if (!libtrace->started) {
1332                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
1333                              "You must call libtrace_start() before trace_read_packet()\n");
1334                return -1;
1335        }
1336
1337        if (libtrace->format->pread_packets) {
1338                int ret;
1339                do {
1340                        ret=libtrace->format->pread_packets(libtrace, t,
1341                                                            packets,
1342                                                            nb_packets);
1343                        /* Error, EOF or message? */
1344                        if (ret <= 0) {
1345                                return ret;
1346                        }
1347                        if (libtrace->filter) {
1348                                int remaining;
1349                                remaining = filter_packets(libtrace,
1350                                                           packets, ret);
1351                                t->filtered_packets += ret - remaining;
1352                                ret = remaining;
1353                        }
1354                        for (i = 0; i < ret; ++i) {
1355                                /* We do not mark the packet against the trace,
1356                                 * before hand or after. After breaks DAG meta
1357                                 * packets and before is inefficient */
1358                                //packets[i]->trace = libtrace;
1359                                /* TODO IN FORMAT?? Like traditional libtrace */
1360                                if (libtrace->snaplen>0)
1361                                        trace_set_capture_length(packets[i],
1362                                                        libtrace->snaplen);
1363                                packets[i]->which_trace_start = libtrace->startcount;
1364                        }
1365                } while(ret == 0);
1366                return ret;
1367        }
1368        trace_set_err(libtrace, TRACE_ERR_UNSUPPORTED,
1369                      "This format does not support reading packets\n");
1370        return ~0U;
1371}
1372
1373/* Restarts a parallel trace, this is called from trace_pstart.
1374 * The libtrace lock is held upon calling this function.
1375 * Typically with a parallel trace the threads are not
1376 * killed rather.
1377 */
1378static int trace_prestart(libtrace_t * libtrace, void *global_blob,
1379                          libtrace_callback_set_t *per_packet_cbs, 
1380                          libtrace_callback_set_t *reporter_cbs) {
1381        int i, err = 0;
1382        if (libtrace->state != STATE_PAUSED) {
1383                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
1384                        "trace(%s) is not currently paused",
1385                              libtrace->uridata);
1386                return -1;
1387        }
1388
1389        assert(libtrace_parallel);
1390        assert(!libtrace->perpkt_thread_states[THREAD_RUNNING]);
1391
1392        /* Reset first packets */
1393        pthread_spin_lock(&libtrace->first_packets.lock);
1394        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1395                assert(!!libtrace->perpkt_threads[i].recorded_first == !!libtrace->first_packets.packets[i].packet);
1396                if (libtrace->first_packets.packets[i].packet) {
1397                        trace_destroy_packet(libtrace->first_packets.packets[i].packet);
1398                        libtrace->first_packets.packets[i].packet = NULL;
1399                        libtrace->first_packets.packets[i].tv.tv_sec = 0;
1400                        libtrace->first_packets.packets[i].tv.tv_usec = 0;
1401                        libtrace->first_packets.count--;
1402                        libtrace->perpkt_threads[i].recorded_first = false;
1403                }
1404        }
1405        assert(libtrace->first_packets.count == 0);
1406        libtrace->first_packets.first = 0;
1407        pthread_spin_unlock(&libtrace->first_packets.lock);
1408
1409        /* Reset delay */
1410        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1411                libtrace->perpkt_threads[i].tracetime_offset_usec = 0;
1412        }
1413
1414        /* Reset statistics */
1415        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1416                libtrace->perpkt_threads[i].accepted_packets = 0;
1417                libtrace->perpkt_threads[i].filtered_packets = 0;
1418        }
1419        libtrace->accepted_packets = 0;
1420        libtrace->filtered_packets = 0;
1421
1422        /* Update functions if requested */
1423        if(global_blob)
1424                libtrace->global_blob = global_blob;
1425
1426        if (per_packet_cbs) {
1427                if (libtrace->perpkt_cbs)
1428                        trace_destroy_callback_set(libtrace->perpkt_cbs);
1429                libtrace->perpkt_cbs = trace_create_callback_set();
1430                memcpy(libtrace->perpkt_cbs, per_packet_cbs, 
1431                                sizeof(libtrace_callback_set_t));
1432        }
1433
1434        if (reporter_cbs) {
1435                if (libtrace->reporter_cbs)
1436                        trace_destroy_callback_set(libtrace->reporter_cbs);
1437
1438                libtrace->reporter_cbs = trace_create_callback_set();
1439                memcpy(libtrace->reporter_cbs, reporter_cbs, 
1440                                sizeof(libtrace_callback_set_t));
1441        }
1442
1443        if (trace_is_parallel(libtrace)) {
1444                err = libtrace->format->pstart_input(libtrace);
1445        } else {
1446                if (libtrace->format->start_input) {
1447                        err = libtrace->format->start_input(libtrace);
1448                }
1449        }
1450
1451        if (err == 0) {
1452                libtrace->started = true;
1453                libtrace->startcount ++;
1454                libtrace_change_state(libtrace, STATE_RUNNING, false);
1455        }
1456        return err;
1457}
1458
1459/**
1460 * @return the number of CPU cores on the machine. -1 if unknown.
1461 */
1462SIMPLE_FUNCTION static int get_nb_cores() {
1463        int numCPU;
1464#ifdef _SC_NPROCESSORS_ONLN
1465        /* Most systems do this now */
1466        numCPU = sysconf(_SC_NPROCESSORS_ONLN);
1467
1468#else
1469        int mib[] = {CTL_HW, HW_AVAILCPU};
1470        size_t len = sizeof(numCPU);
1471
1472        /* get the number of CPUs from the system */
1473        sysctl(mib, 2, &numCPU, &len, NULL, 0);
1474#endif
1475        return numCPU <= 0 ? 1 : numCPU;
1476}
1477
1478/**
1479 * Verifies the configuration and sets default values for any values not
1480 * specified by the user.
1481 */
1482static void verify_configuration(libtrace_t *libtrace) {
1483
1484        if (libtrace->config.hasher_queue_size <= 0)
1485                libtrace->config.hasher_queue_size = 1000;
1486
1487        if (libtrace->config.perpkt_threads <= 0) {
1488                libtrace->perpkt_thread_count = get_nb_cores();
1489                if (libtrace->perpkt_thread_count <= 0)
1490                        // Lets just use one
1491                        libtrace->perpkt_thread_count = 1;
1492        } else {
1493                libtrace->perpkt_thread_count = libtrace->config.perpkt_threads;
1494        }
1495
1496        if (libtrace->config.reporter_thold <= 0)
1497                libtrace->config.reporter_thold = 100;
1498        if (libtrace->config.burst_size <= 0)
1499                libtrace->config.burst_size = 32;
1500        if (libtrace->config.thread_cache_size <= 0)
1501                libtrace->config.thread_cache_size = 64;
1502        if (libtrace->config.cache_size <= 0)
1503                libtrace->config.cache_size = (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count;
1504
1505        if (libtrace->config.cache_size <
1506                (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count)
1507                fprintf(stderr, "WARNING deadlocks may occur and extra memory allocating buffer sizes (packet_freelist_size) mismatched\n");
1508
1509        if (libtrace->combiner.initialise == NULL && libtrace->combiner.publish == NULL)
1510                libtrace->combiner = combiner_unordered;
1511
1512        /* Figure out if we are using a dedicated hasher thread? */
1513        if (libtrace->hasher && libtrace->perpkt_thread_count > 1) {
1514                libtrace->hasher_thread.type = THREAD_HASHER;
1515        }
1516}
1517
1518/**
1519 * Starts a libtrace_thread, including allocating memory for messaging.
1520 * Threads are expected to wait until the libtrace look is released.
1521 * Hence why we don't init structures until later.
1522 *
1523 * @param trace The trace the thread is associated with
1524 * @param t The thread that is filled when the thread is started
1525 * @param type The type of thread
1526 * @param start_routine The entry location of the thread
1527 * @param perpkt_num The perpkt thread number (should be set -1 if not perpkt)
1528 * @param name For debugging purposes set the threads name (Optional)
1529 *
1530 * @return 0 on success or -1 upon error in which case the libtrace error is set.
1531 *         In this situation the thread structure is zeroed.
1532 */
1533static int trace_start_thread(libtrace_t *trace,
1534                       libtrace_thread_t *t,
1535                       enum thread_types type,
1536                       void *(*start_routine) (void *),
1537                       int perpkt_num,
1538                       const char *name) {
1539#ifdef __linux__
1540        cpu_set_t cpus;
1541        int i;
1542#endif
1543        int ret;
1544        assert(t->type == THREAD_EMPTY);
1545        t->trace = trace;
1546        t->ret = NULL;
1547        t->user_data = NULL;
1548        t->type = type;
1549        t->state = THREAD_RUNNING;
1550
1551        assert(name);
1552
1553#ifdef __linux__
1554        CPU_ZERO(&cpus);
1555        for (i = 0; i < get_nb_cores(); i++)
1556                CPU_SET(i, &cpus);
1557
1558        ret = pthread_create(&t->tid, NULL, start_routine, (void *) trace);
1559        if( ret == 0 ) {
1560                ret = pthread_setaffinity_np(t->tid, sizeof(cpus), &cpus);
1561        }
1562
1563#else
1564        ret = pthread_create(&t->tid, NULL, start_routine, (void *) trace);
1565#endif
1566        if (ret != 0) {
1567                libtrace_zero_thread(t);
1568                trace_set_err(trace, ret, "Failed to create a thread of type=%d\n", type);
1569                return -1;
1570        }
1571        libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
1572        if (trace_has_dedicated_hasher(trace) && type == THREAD_PERPKT) {
1573                libtrace_ringbuffer_init(&t->rbuffer,
1574                                         trace->config.hasher_queue_size,
1575                                         trace->config.hasher_polling?
1576                                                 LIBTRACE_RINGBUFFER_POLLING:
1577                                                 LIBTRACE_RINGBUFFER_BLOCKING);
1578        }
1579#if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__)
1580        if(name)
1581                pthread_setname_np(t->tid, name);
1582#endif
1583        t->perpkt_num = perpkt_num;
1584        return 0;
1585}
1586
1587/** Parses the environment variable LIBTRACE_CONF into the supplied
1588 * configuration structure.
1589 *
1590 * @param[in,out] libtrace The trace from which we determine the URI and set
1591 * the configuration.
1592 *
1593 * We search for 3 environment variables and apply them to the config in the
1594 * following order. Such that the first has the lowest priority.
1595 *
1596 * 1. LIBTRACE_CONF, The global environment configuration
1597 * 2. LIBTRACE_CONF_<FORMAT>, Applied to a given format
1598 * 3. LIBTRACE_CONF_<FORMAT_URI>, Applied the specified trace
1599 *
1600 * E.g.
1601 * - int:eth0 would match LIBTRACE_CONF, LIBTRACE_CONF_INT, LIBTRACE_CONF_INT_ETH0
1602 * - dag:/dev/dag0,0 would match LIBTRACE_CONF, LIBTRACE_CONF_DAG, LIBTRACE_CONF_DAG__DEV_DAG0_0
1603 * - test.erf would match LIBTRACE_CONF, LIBTRACE_CONF_ERF, LIBTRACE_CONF_ERF_TEST_ERF
1604 *
1605 * @note All environment variables names MUST only contian
1606 * [A-Z], [0-9] and [_] (underscore) and not start with a number. Any characters
1607 * outside of this range should be captilised if possible or replaced with an
1608 * underscore.
1609 */
1610static void parse_env_config (libtrace_t *libtrace) {
1611        char env_name[1024] = "LIBTRACE_CONF_";
1612        size_t len = strlen(env_name);
1613        size_t mark = 0;
1614        size_t i;
1615        char * env;
1616
1617        /* Make our compound string */
1618        strncpy(&env_name[len], libtrace->format->name, sizeof(env_name) - len);
1619        len += strlen(libtrace->format->name);
1620        strncpy(&env_name[len], ":", sizeof(env_name) - len);
1621        len += 1;
1622        strncpy(&env_name[len], libtrace->uridata, sizeof(env_name) - len);
1623
1624        /* env names are allowed to be A-Z (CAPS) 0-9 and _ */
1625        for (i = 0; env_name[i] != 0; ++i) {
1626                env_name[i] = toupper(env_name[i]);
1627                if(env_name[i] == ':') {
1628                        mark = i;
1629                }
1630                if (!( (env_name[i] >= 'A' && env_name[i] <= 'Z') ||
1631                       (env_name[i] >= '0' && env_name[i] <= '9') )) {
1632                        env_name[i] = '_';
1633                }
1634        }
1635
1636        /* First apply global env settings LIBTRACE_CONF */
1637        env = getenv("LIBTRACE_CONF");
1638        if (env)
1639        {
1640                printf("Got env %s", env);
1641                trace_set_configuration(libtrace, env);
1642        }
1643
1644        /* Then format settings LIBTRACE_CONF_<FORMAT> */
1645        if (mark != 0) {
1646                env_name[mark] = 0;
1647                env = getenv(env_name);
1648                if (env) {
1649                        trace_set_configuration(libtrace, env);
1650                }
1651                env_name[mark] = '_';
1652        }
1653
1654        /* Finally this specific trace LIBTRACE_CONF_<FORMAT_URI> */
1655        env = getenv(env_name);
1656        if (env) {
1657                trace_set_configuration(libtrace, env);
1658        }
1659}
1660
1661DLLEXPORT bool trace_is_parallel(libtrace_t * libtrace) {
1662        if (libtrace->state == STATE_NEW)
1663                return trace_supports_parallel(libtrace);
1664        return libtrace->pread == trace_pread_packet_wrapper;
1665}
1666
1667DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob,
1668                           libtrace_callback_set_t *per_packet_cbs,
1669                           libtrace_callback_set_t *reporter_cbs) {
1670        int i;
1671        int ret = -1;
1672        char name[24];
1673        sigset_t sig_before, sig_block_all;
1674        assert(libtrace);
1675
1676        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1677        if (trace_is_err(libtrace)) {
1678                goto cleanup_none;
1679        }
1680
1681        if (libtrace->state == STATE_PAUSED) {
1682                ret = trace_prestart(libtrace, global_blob, per_packet_cbs, 
1683                                reporter_cbs);
1684                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1685                return ret;
1686        }
1687
1688        if (libtrace->state != STATE_NEW) {
1689                trace_set_err(libtrace, TRACE_ERR_BAD_STATE, "trace_pstart "
1690                              "should be called on a NEW or PAUSED trace but "
1691                              "instead was called from %s",
1692                              get_trace_state_name(libtrace->state));
1693                goto cleanup_none;
1694        }
1695
1696        /* Store the user defined things against the trace */
1697        libtrace->global_blob = global_blob;
1698
1699        /* Save a copy of the callbacks in case the user tries to change them
1700         * on us later */
1701        if (!per_packet_cbs) {
1702                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "trace_pstart "
1703                                "requires a non-NULL set of per packet "
1704                                "callbacks.");
1705                goto cleanup_none;
1706        }
1707
1708        if (per_packet_cbs->message_packet == NULL) {
1709                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "The per "
1710                                "packet callbacks must include a handler "
1711                                "for a packet. Please set this using "
1712                                "trace_set_packet_cb().");
1713                goto cleanup_none;
1714        }
1715
1716        libtrace->perpkt_cbs = trace_create_callback_set();
1717        memcpy(libtrace->perpkt_cbs, per_packet_cbs, sizeof(libtrace_callback_set_t));
1718       
1719        if (reporter_cbs) {
1720                libtrace->reporter_cbs = trace_create_callback_set();
1721                memcpy(libtrace->reporter_cbs, reporter_cbs, sizeof(libtrace_callback_set_t));
1722        }
1723
1724       
1725
1726
1727        /* And zero other fields */
1728        for (i = 0; i < THREAD_STATE_MAX; ++i) {
1729                libtrace->perpkt_thread_states[i] = 0;
1730        }
1731        libtrace->first_packets.first = 0;
1732        libtrace->first_packets.count = 0;
1733        libtrace->first_packets.packets = NULL;
1734        libtrace->perpkt_threads = NULL;
1735        /* Set a global which says we are using a parallel trace. This is
1736         * for backwards compatibility due to changes when destroying packets */
1737        libtrace_parallel = 1;
1738
1739        /* Parses configuration passed through environment variables */
1740        parse_env_config(libtrace);
1741        verify_configuration(libtrace);
1742
1743        ret = -1;
1744        /* Try start the format - we prefer parallel over single threaded, as
1745         * these formats should support messages better */
1746
1747        if (trace_supports_parallel(libtrace) &&
1748            !trace_has_dedicated_hasher(libtrace)) {
1749                ret = libtrace->format->pstart_input(libtrace);
1750                libtrace->pread = trace_pread_packet_wrapper;
1751        }
1752        if (ret != 0) {
1753                if (libtrace->format->start_input) {
1754                        ret = libtrace->format->start_input(libtrace);
1755                }
1756                if (libtrace->perpkt_thread_count > 1) {
1757                        libtrace->pread = trace_pread_packet_first_in_first_served;
1758                        /* Don't wait for a burst of packets if the format is
1759                         * live as this could block ring based formats and
1760                         * introduces delay. */
1761                        if (libtrace->format->info.live) {
1762                                libtrace->config.burst_size = 1;
1763                        }
1764                }
1765                else {
1766                        /* Use standard read_packet */
1767                        libtrace->pread = NULL;
1768                }
1769        }
1770
1771        if (ret < 0) {
1772                goto cleanup_none;
1773        }
1774
1775        /* --- Start all the threads we need --- */
1776        /* Disable signals because it is inherited by the threads we start */
1777        sigemptyset(&sig_block_all);
1778        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_block_all, &sig_before), == 0);
1779
1780        /* If we need a hasher thread start it
1781         * Special Case: If single threaded we don't need a hasher
1782         */
1783        if (trace_has_dedicated_hasher(libtrace)) {
1784                libtrace->hasher_thread.type = THREAD_EMPTY;
1785                ret = trace_start_thread(libtrace, &libtrace->hasher_thread,
1786                                   THREAD_HASHER, hasher_entry, -1,
1787                                   "hasher-thread");
1788                if (ret != 0)
1789                        goto cleanup_started;
1790                libtrace->pread = trace_pread_packet_hasher_thread;
1791        } else {
1792                libtrace->hasher_thread.type = THREAD_EMPTY;
1793        }
1794
1795        /* Start up our perpkt threads */
1796        libtrace->perpkt_threads = calloc(sizeof(libtrace_thread_t),
1797                                          libtrace->perpkt_thread_count);
1798        if (!libtrace->perpkt_threads) {
1799                trace_set_err(libtrace, errno, "trace_pstart "
1800                              "failed to allocate memory.");
1801                goto cleanup_threads;
1802        }
1803        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1804                snprintf(name, sizeof(name), "perpkt-%d", i);
1805                libtrace_zero_thread(&libtrace->perpkt_threads[i]);
1806                ret = trace_start_thread(libtrace, &libtrace->perpkt_threads[i],
1807                                   THREAD_PERPKT, perpkt_threads_entry, i,
1808                                   name);
1809                if (ret != 0)
1810                        goto cleanup_threads;
1811        }
1812
1813        /* Start the reporter thread */
1814        if (reporter_cbs) {
1815                if (libtrace->combiner.initialise)
1816                        libtrace->combiner.initialise(libtrace, &libtrace->combiner);
1817                ret = trace_start_thread(libtrace, &libtrace->reporter_thread,
1818                                   THREAD_REPORTER, reporter_entry, -1,
1819                                   "reporter_thread");
1820                if (ret != 0)
1821                        goto cleanup_threads;
1822        }
1823
1824        /* Start the keepalive thread */
1825        if (libtrace->config.tick_interval > 0) {
1826                ret = trace_start_thread(libtrace, &libtrace->keepalive_thread,
1827                                   THREAD_KEEPALIVE, keepalive_entry, -1,
1828                                   "keepalive_thread");
1829                if (ret != 0)
1830                        goto cleanup_threads;
1831        }
1832
1833        /* Init other data structures */
1834        libtrace->perpkt_thread_states[THREAD_RUNNING] = libtrace->perpkt_thread_count;
1835        ASSERT_RET(pthread_spin_init(&libtrace->first_packets.lock, 0), == 0);
1836        libtrace->first_packets.packets = calloc(libtrace->perpkt_thread_count,
1837                                                 sizeof(*libtrace->first_packets.packets));
1838        if (libtrace->first_packets.packets == NULL) {
1839                trace_set_err(libtrace, errno, "trace_pstart "
1840                              "failed to allocate memory.");
1841                goto cleanup_threads;
1842        }
1843
1844        if (libtrace_ocache_init(&libtrace->packet_freelist,
1845                             (void* (*)()) trace_create_packet,
1846                             (void (*)(void *))trace_destroy_packet,
1847                             libtrace->config.thread_cache_size,
1848                             libtrace->config.cache_size * 4,
1849                             libtrace->config.fixed_count) != 0) {
1850                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "trace_pstart "
1851                              "failed to allocate ocache.");
1852                goto cleanup_threads;
1853        }
1854
1855        /* Threads don't start */
1856        libtrace->started = true;
1857        libtrace->startcount ++;
1858        libtrace_change_state(libtrace, STATE_RUNNING, false);
1859
1860        ret = 0;
1861        goto success;
1862cleanup_threads:
1863        if (libtrace->first_packets.packets) {
1864                free(libtrace->first_packets.packets);
1865                libtrace->first_packets.packets = NULL;
1866        }
1867        libtrace_change_state(libtrace, STATE_ERROR, false);
1868        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1869        if (libtrace->hasher_thread.type == THREAD_HASHER) {
1870                pthread_join(libtrace->hasher_thread.tid, NULL);
1871                libtrace_zero_thread(&libtrace->hasher_thread);
1872        }
1873
1874        if (libtrace->perpkt_threads) {
1875                for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1876                        if (libtrace->perpkt_threads[i].type == THREAD_PERPKT) {
1877                                pthread_join(libtrace->perpkt_threads[i].tid, NULL);
1878                                libtrace_zero_thread(&libtrace->perpkt_threads[i]);
1879                        } else break;
1880                }
1881                free(libtrace->perpkt_threads);
1882                libtrace->perpkt_threads = NULL;
1883        }
1884
1885        if (libtrace->reporter_thread.type == THREAD_REPORTER) {
1886                pthread_join(libtrace->reporter_thread.tid, NULL);
1887                libtrace_zero_thread(&libtrace->reporter_thread);
1888        }
1889
1890        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
1891                pthread_join(libtrace->keepalive_thread.tid, NULL);
1892                libtrace_zero_thread(&libtrace->keepalive_thread);
1893        }
1894        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1895        libtrace_change_state(libtrace, STATE_NEW, false);
1896        assert(libtrace->perpkt_thread_states[THREAD_RUNNING] == 0);
1897        libtrace->perpkt_thread_states[THREAD_FINISHED] = 0;
1898cleanup_started:
1899        if (libtrace->pread == trace_pread_packet_wrapper) {
1900                if (libtrace->format->ppause_input)
1901                        libtrace->format->ppause_input(libtrace);
1902        } else {
1903                if (libtrace->format->pause_input)
1904                        libtrace->format->pause_input(libtrace);
1905        }
1906        ret = -1;
1907success:
1908        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_before, NULL), == 0);
1909cleanup_none:
1910        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1911        return ret;
1912}
1913
1914DLLEXPORT int trace_set_starting_cb(libtrace_callback_set_t *cbset,
1915                fn_cb_starting handler) {
1916        cbset->message_starting = handler;
1917        return 0;
1918}
1919
1920DLLEXPORT int trace_set_pausing_cb(libtrace_callback_set_t *cbset,
1921                fn_cb_dataless handler) {
1922        cbset->message_pausing = handler;
1923        return 0;
1924}
1925
1926DLLEXPORT int trace_set_resuming_cb(libtrace_callback_set_t *cbset,
1927                fn_cb_dataless handler) {
1928        cbset->message_resuming = handler;
1929        return 0;
1930}
1931
1932DLLEXPORT int trace_set_stopping_cb(libtrace_callback_set_t *cbset,
1933                fn_cb_dataless handler) {
1934        cbset->message_stopping = handler;
1935        return 0;
1936}
1937
1938DLLEXPORT int trace_set_packet_cb(libtrace_callback_set_t *cbset,
1939                fn_cb_packet handler) {
1940        cbset->message_packet = handler;
1941        return 0;
1942}
1943
1944DLLEXPORT int trace_set_first_packet_cb(libtrace_callback_set_t *cbset,
1945                fn_cb_first_packet handler) {
1946        cbset->message_first_packet = handler;
1947        return 0;
1948}
1949
1950DLLEXPORT int trace_set_tick_count_cb(libtrace_callback_set_t *cbset,
1951                fn_cb_tick handler) {
1952        cbset->message_tick_count = handler;
1953        return 0;
1954}
1955
1956DLLEXPORT int trace_set_tick_interval_cb(libtrace_callback_set_t *cbset,
1957                fn_cb_tick handler) {
1958        cbset->message_tick_interval = handler;
1959        return 0;
1960}
1961
1962DLLEXPORT int trace_set_result_cb(libtrace_callback_set_t *cbset,
1963                fn_cb_result handler) {
1964        cbset->message_result = handler;
1965        return 0;
1966}
1967
1968DLLEXPORT int trace_set_user_message_cb(libtrace_callback_set_t *cbset,
1969                fn_cb_usermessage handler) {
1970        cbset->message_user = handler;
1971        return 0;
1972}
1973
1974/*
1975 * Pauses a trace, this should only be called by the main thread
1976 * 1. Set started = false
1977 * 2. All perpkt threads are paused waiting on a condition var
1978 * 3. Then call ppause on the underlying format if found
1979 * 4. The traces state is paused
1980 *
1981 * Once done you should be able to modify the trace setup and call pstart again
1982 * TODO add support to change the number of threads.
1983 */
1984DLLEXPORT int trace_ppause(libtrace_t *libtrace)
1985{
1986        libtrace_thread_t *t;
1987        int i;
1988        assert(libtrace);
1989
1990        t = get_thread_table(libtrace);
1991        // Check state from within the lock if we are going to change it
1992        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1993
1994        /* If we are already paused, just treat this as a NOOP */
1995        if (libtrace->state == STATE_PAUSED) {
1996                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1997                return 0;
1998        }
1999        if (!libtrace->started || libtrace->state != STATE_RUNNING) {
2000                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2001                trace_set_err(libtrace,TRACE_ERR_BAD_STATE, "You must call trace_start() before calling trace_ppause()");
2002                return -1;
2003        }
2004
2005        libtrace_change_state(libtrace, STATE_PAUSING, false);
2006        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2007
2008        // Special case handle the hasher thread case
2009        if (trace_has_dedicated_hasher(libtrace)) {
2010                if (libtrace->config.debug_state)
2011                        fprintf(stderr, "Hasher thread is running, asking it to pause ...");
2012                libtrace_message_t message = {0, {.uint64=0}, NULL};
2013                message.code = MESSAGE_DO_PAUSE;
2014                trace_message_thread(libtrace, &libtrace->hasher_thread, &message);
2015                // Wait for it to pause
2016                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
2017                while (libtrace->hasher_thread.state == THREAD_RUNNING) {
2018                        ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
2019                }
2020                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2021                if (libtrace->config.debug_state)
2022                        fprintf(stderr, " DONE\n");
2023        }
2024
2025        if (libtrace->config.debug_state)
2026                fprintf(stderr, "Asking perpkt threads to pause ...");
2027        // Stop threads, skip this one if it's a perpkt
2028        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2029                if (&libtrace->perpkt_threads[i] != t) {
2030                        libtrace_message_t message = {0, {.uint64=0}, NULL};
2031                        message.code = MESSAGE_DO_PAUSE;
2032                        ASSERT_RET(trace_message_thread(libtrace, &libtrace->perpkt_threads[i], &message), != -1);
2033                        if(trace_has_dedicated_hasher(libtrace)) {
2034                                // The hasher has stopped and other threads have messages waiting therefore
2035                                // If the queues are empty the other threads would have no data
2036                                // So send some message packets to simply ask the threads to check
2037                                // We are the only writer since hasher has paused
2038                                libtrace_packet_t *pkt;
2039                                libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &pkt, 1, 1);
2040                                pkt->error = READ_MESSAGE;
2041                                libtrace_ringbuffer_write(&libtrace->perpkt_threads[i].rbuffer, pkt);
2042                        }
2043                } else {
2044                        fprintf(stderr, "Mapper threads should not be used to pause a trace this could cause any number of problems!!\n");
2045                }
2046        }
2047
2048        if (t) {
2049                // A perpkt is doing the pausing, interesting, fake an extra thread paused
2050                // We rely on the user to *not* return before starting the trace again
2051                thread_change_state(libtrace, t, THREAD_PAUSED, true);
2052        }
2053
2054        // Wait for all threads to pause
2055        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
2056        while(libtrace->perpkt_thread_states[THREAD_RUNNING]) {
2057                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
2058        }
2059        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2060
2061        if (libtrace->config.debug_state)
2062                fprintf(stderr, " DONE\n");
2063
2064        // Deal with the reporter
2065        if (trace_has_reporter(libtrace)) {
2066                if (libtrace->config.debug_state)
2067                        fprintf(stderr, "Reporter thread is running, asking it to pause ...");
2068                if (pthread_equal(pthread_self(), libtrace->reporter_thread.tid)) {
2069                        libtrace->combiner.pause(libtrace, &libtrace->combiner);
2070                        thread_change_state(libtrace, &libtrace->reporter_thread, THREAD_PAUSED, true);
2071               
2072                } else {
2073                        libtrace_message_t message = {0, {.uint64=0}, NULL};
2074                        message.code = MESSAGE_DO_PAUSE;
2075                        trace_message_thread(libtrace, &libtrace->reporter_thread, &message);
2076                        // Wait for it to pause
2077                        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
2078                        while (libtrace->reporter_thread.state == THREAD_RUNNING) {
2079                                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
2080                        }
2081                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2082                }
2083                if (libtrace->config.debug_state)
2084                        fprintf(stderr, " DONE\n");
2085        }
2086
2087        /* Cache values before we pause */
2088        if (libtrace->stats == NULL)
2089                libtrace->stats = trace_create_statistics();
2090        // Save the statistics against the trace
2091        trace_get_statistics(libtrace, NULL);
2092        if (trace_is_parallel(libtrace)) {
2093                libtrace->started = false;
2094                if (libtrace->format->ppause_input)
2095                        libtrace->format->ppause_input(libtrace);
2096                // TODO What happens if we don't have pause input??
2097        } else {
2098                int err;
2099                err = trace_pause(libtrace);
2100                // We should handle this a bit better
2101                if (err)
2102                        return err;
2103        }
2104
2105        // Only set as paused after the pause has been called on the trace
2106        libtrace_change_state(libtrace, STATE_PAUSED, true);
2107        return 0;
2108}
2109
2110/**
2111 * Stop trace finish prematurely as though it meet an EOF
2112 * This should only be called by the main thread
2113 * 1. Calls ppause
2114 * 2. Sends a message asking for threads to finish
2115 * 3. Releases threads which will pause
2116 */
2117DLLEXPORT int trace_pstop(libtrace_t *libtrace)
2118{
2119        int i, err;
2120        libtrace_message_t message = {0, {.uint64=0}, NULL};
2121        assert(libtrace);
2122
2123        // Ensure all threads have paused and the underlying trace format has
2124        // been closed and all packets associated are cleaned up
2125        // Pause will do any state checks for us
2126        err = trace_ppause(libtrace);
2127        if (err)
2128                return err;
2129
2130        // Now send a message asking the threads to stop
2131        // This will be retrieved before trying to read another packet
2132
2133        message.code = MESSAGE_DO_STOP;
2134        trace_message_perpkts(libtrace, &message);
2135        if (trace_has_dedicated_hasher(libtrace))
2136                trace_message_thread(libtrace, &libtrace->hasher_thread, &message);
2137
2138        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2139                trace_message_thread(libtrace, &libtrace->perpkt_threads[i], &message);
2140        }
2141
2142        /* Now release the threads and let them stop - when the threads finish
2143         * the state will be set to finished */
2144        libtrace_change_state(libtrace, STATE_FINISHING, true);
2145        return 0;
2146}
2147
2148DLLEXPORT int trace_set_hasher(libtrace_t *trace, enum hasher_types type, fn_hasher hasher, void *data) {
2149        int ret = -1;
2150        if ((type == HASHER_CUSTOM && !hasher) || (type == HASHER_BALANCE && hasher)) {
2151                return -1;
2152        }
2153
2154        // Save the requirements
2155        trace->hasher_type = type;
2156        if (hasher) {
2157                if (trace->hasher_owner == HASH_OWNED_LIBTRACE) {
2158                        if (trace->hasher_data) {
2159                                free(trace->hasher_data);
2160                        }
2161                }
2162                trace->hasher = hasher;
2163                trace->hasher_data = data;
2164                trace->hasher_owner = HASH_OWNED_EXTERNAL;
2165        } else {
2166                trace->hasher = NULL;
2167                trace->hasher_data = NULL;
2168                trace->hasher_owner = HASH_OWNED_LIBTRACE;
2169        }
2170
2171        // Try push this to hardware - NOTE hardware could do custom if
2172        // there is a more efficient way to apply it, in this case
2173        // it will simply grab the function out of libtrace_t
2174        if (trace_supports_parallel(trace) && trace->format->config_input)
2175                ret = trace->format->config_input(trace, TRACE_OPTION_HASHER, &type);
2176
2177        if (ret == -1) {
2178                /* We have to deal with this ourself */
2179                if (!hasher) {
2180                        switch (type)
2181                        {
2182                                case HASHER_CUSTOM:
2183                                case HASHER_BALANCE:
2184                                        return 0;
2185                                case HASHER_BIDIRECTIONAL:
2186                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
2187                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
2188                                        toeplitz_init_config(trace->hasher_data, 1);
2189                                        return 0;
2190                                case HASHER_UNIDIRECTIONAL:
2191                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
2192                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
2193                                        toeplitz_init_config(trace->hasher_data, 0);
2194                                        return 0;
2195                        }
2196                        return -1;
2197                }
2198        } else {
2199                /* If the hasher is hardware we zero out the hasher and hasher
2200                 * data fields - only if we need a hasher do we do this */
2201                trace->hasher = NULL;
2202                trace->hasher_data = NULL;
2203        }
2204
2205        return 0;
2206}
2207
2208// Waits for all threads to finish
2209DLLEXPORT void trace_join(libtrace_t *libtrace) {
2210        int i;
2211
2212        /* Firstly wait for the perpkt threads to finish, since these are
2213         * user controlled */
2214        for (i=0; i< libtrace->perpkt_thread_count; i++) {
2215                ASSERT_RET(pthread_join(libtrace->perpkt_threads[i].tid, NULL), == 0);
2216                // So we must do our best effort to empty the queue - so
2217                // the producer (or any other threads) don't block.
2218                libtrace_packet_t * packet;
2219                assert(libtrace->perpkt_threads[i].state == THREAD_FINISHED);
2220                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
2221                        if (packet) // This could be NULL iff the perpkt finishes early
2222                                trace_destroy_packet(packet);
2223        }
2224
2225        /* Now the hasher */
2226        if (trace_has_dedicated_hasher(libtrace)) {
2227                pthread_join(libtrace->hasher_thread.tid, NULL);
2228                assert(libtrace->hasher_thread.state == THREAD_FINISHED);
2229        }
2230
2231        // Now that everything is finished nothing can be touching our
2232        // buffers so clean them up
2233        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2234                // Its possible 1 packet got added by the reporter (or 1 per any other thread) since we cleaned up
2235                // if they lost timeslice before-during a write
2236                libtrace_packet_t * packet;
2237                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
2238                        trace_destroy_packet(packet);
2239                if (trace_has_dedicated_hasher(libtrace)) {
2240                        assert(libtrace_ringbuffer_is_empty(&libtrace->perpkt_threads[i].rbuffer));
2241                        libtrace_ringbuffer_destroy(&libtrace->perpkt_threads[i].rbuffer);
2242                }
2243                // Cannot destroy vector yet, this happens with trace_destroy
2244        }
2245
2246        if (trace_has_reporter(libtrace)) {
2247                pthread_join(libtrace->reporter_thread.tid, NULL);
2248                assert(libtrace->reporter_thread.state == THREAD_FINISHED);
2249        }
2250
2251        // Wait for the tick (keepalive) thread if it has been started
2252        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
2253                libtrace_message_t msg = {0, {.uint64=0}, NULL};
2254                msg.code = MESSAGE_DO_STOP;
2255                trace_message_thread(libtrace, &libtrace->keepalive_thread, &msg);
2256                pthread_join(libtrace->keepalive_thread.tid, NULL);
2257        }
2258
2259        libtrace_change_state(libtrace, STATE_JOINED, true);
2260        print_memory_stats();
2261}
2262
2263DLLEXPORT int libtrace_thread_get_message_count(libtrace_t * libtrace,
2264                                                libtrace_thread_t *t)
2265{
2266        int ret;
2267        if (t == NULL)
2268                t = get_thread_descriptor(libtrace);
2269        if (t == NULL)
2270                return -1;
2271        ret = libtrace_message_queue_count(&t->messages);
2272        return ret < 0 ? 0 : ret;
2273}
2274
2275DLLEXPORT int libtrace_thread_get_message(libtrace_t * libtrace,
2276                                          libtrace_thread_t *t,
2277                                          libtrace_message_t * message)
2278{
2279        int ret;
2280        if (t == NULL)
2281                t = get_thread_descriptor(libtrace);
2282        if (t == NULL)
2283                return -1;
2284        ret = libtrace_message_queue_get(&t->messages, message);
2285        return ret < 0 ? 0 : ret;
2286}
2287
2288DLLEXPORT int libtrace_thread_try_get_message(libtrace_t * libtrace,
2289                                              libtrace_thread_t *t,
2290                                              libtrace_message_t * message)
2291{
2292        if (t == NULL)
2293                t = get_thread_descriptor(libtrace);
2294        if (t == NULL)
2295                return -1;
2296        if (libtrace_message_queue_try_get(&t->messages, message) != LIBTRACE_MQ_FAILED)
2297                return 0;
2298        else
2299                return -1;
2300}
2301
2302DLLEXPORT int trace_message_thread(libtrace_t * libtrace, libtrace_thread_t *t, libtrace_message_t * message)
2303{
2304        int ret;
2305        if (!message->sender)
2306                message->sender = get_thread_descriptor(libtrace);
2307
2308        ret = libtrace_message_queue_put(&t->messages, message);
2309        return ret < 0 ? 0 : ret;
2310}
2311
2312DLLEXPORT int trace_message_reporter(libtrace_t * libtrace, libtrace_message_t * message)
2313{
2314        if (!trace_has_reporter(libtrace) ||
2315            !(libtrace->reporter_thread.state == THREAD_RUNNING
2316              || libtrace->reporter_thread.state == THREAD_PAUSED))
2317                return -1;
2318
2319        return trace_message_thread(libtrace, &libtrace->reporter_thread, message);
2320}
2321
2322DLLEXPORT int trace_post_reporter(libtrace_t *libtrace)
2323{
2324        libtrace_message_t message = {0, {.uint64=0}, NULL};
2325        message.code = MESSAGE_POST_REPORTER;
2326        return trace_message_reporter(libtrace, (void *) &message);
2327}
2328
2329DLLEXPORT int trace_message_perpkts(libtrace_t * libtrace, libtrace_message_t * message)
2330{
2331        int i;
2332        int missed = 0;
2333        if (message->sender == NULL)
2334                message->sender = get_thread_descriptor(libtrace);
2335        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2336                if (libtrace->perpkt_threads[i].state == THREAD_RUNNING ||
2337                    libtrace->perpkt_threads[i].state == THREAD_PAUSED) {
2338                        libtrace_message_queue_put(&libtrace->perpkt_threads[i].messages, message);
2339                } else {
2340                        missed += 1;
2341                }
2342        }
2343        return -missed;
2344}
2345
2346/**
2347 * Publishes a result to the reduce queue
2348 * Should only be called by a perpkt thread, i.e. from a perpkt handler
2349 */
2350DLLEXPORT void trace_publish_result(libtrace_t *libtrace, libtrace_thread_t *t, uint64_t key, libtrace_generic_t value, int type) {
2351        libtrace_result_t res;
2352        res.type = type;
2353        res.key = key;
2354        res.value = value;
2355        assert(libtrace->combiner.publish);
2356        libtrace->combiner.publish(libtrace, t->perpkt_num, &libtrace->combiner, &res);
2357        return;
2358}
2359
2360DLLEXPORT void trace_set_combiner(libtrace_t *trace, const libtrace_combine_t *combiner, libtrace_generic_t config){
2361        if (combiner) {
2362                trace->combiner = *combiner;
2363                trace->combiner.configuration = config;
2364        } else {
2365                // No combiner, so don't try use it
2366                memset(&trace->combiner, 0, sizeof(trace->combiner));
2367        }
2368}
2369
2370DLLEXPORT uint64_t trace_packet_get_order(libtrace_packet_t * packet) {
2371        return packet->order;
2372}
2373
2374DLLEXPORT uint64_t trace_packet_get_hash(libtrace_packet_t * packet) {
2375        return packet->hash;
2376}
2377
2378DLLEXPORT void trace_packet_set_order(libtrace_packet_t * packet, uint64_t order) {
2379        packet->order = order;
2380}
2381
2382DLLEXPORT void trace_packet_set_hash(libtrace_packet_t * packet, uint64_t hash) {
2383        packet->hash = hash;
2384}
2385
2386DLLEXPORT bool trace_has_finished(libtrace_t * libtrace) {
2387        return libtrace->state == STATE_FINISHED || libtrace->state == STATE_JOINED;
2388}
2389
2390/**
2391 * @return True if the trace is not running such that it can be configured
2392 */
2393static inline bool trace_is_configurable(libtrace_t *trace) {
2394        return trace->state == STATE_NEW ||
2395                        trace->state == STATE_PAUSED;
2396}
2397
2398DLLEXPORT int trace_set_perpkt_threads(libtrace_t *trace, int nb) {
2399        // Only supported on new traces not paused traces
2400        if (trace->state != STATE_NEW) return -1;
2401
2402        /* TODO consider allowing an offset from the total number of cores i.e.
2403         * -1 reserve 1 core */
2404        if (nb >= 0) {
2405                trace->config.perpkt_threads = nb;
2406                return 0;
2407        } else {
2408                return -1;
2409        }
2410}
2411
2412DLLEXPORT int trace_set_tick_interval(libtrace_t *trace, size_t millisec) {
2413        if (!trace_is_configurable(trace)) return -1;
2414
2415        trace->config.tick_interval = millisec;
2416        return 0;
2417}
2418
2419DLLEXPORT int trace_set_tick_count(libtrace_t *trace, size_t count) {
2420        if (!trace_is_configurable(trace)) return -1;
2421
2422        trace->config.tick_count = count;
2423        return 0;
2424}
2425
2426DLLEXPORT int trace_set_tracetime(libtrace_t *trace, bool tracetime) {
2427        if (!trace_is_configurable(trace)) return -1;
2428
2429        trace->tracetime = tracetime;
2430        return 0;
2431}
2432
2433DLLEXPORT int trace_set_cache_size(libtrace_t *trace, size_t size) {
2434        if (!trace_is_configurable(trace)) return -1;
2435
2436        trace->config.cache_size = size;
2437        return 0;
2438}
2439
2440DLLEXPORT int trace_set_thread_cache_size(libtrace_t *trace, size_t size) {
2441        if (!trace_is_configurable(trace)) return -1;
2442
2443        trace->config.thread_cache_size = size;
2444        return 0;
2445}
2446
2447DLLEXPORT int trace_set_fixed_count(libtrace_t *trace, bool fixed) {
2448        if (!trace_is_configurable(trace)) return -1;
2449
2450        trace->config.fixed_count = fixed;
2451        return 0;
2452}
2453
2454DLLEXPORT int trace_set_burst_size(libtrace_t *trace, size_t size) {
2455        if (!trace_is_configurable(trace)) return -1;
2456
2457        trace->config.burst_size = size;
2458        return 0;
2459}
2460
2461DLLEXPORT int trace_set_hasher_queue_size(libtrace_t *trace, size_t size) {
2462        if (!trace_is_configurable(trace)) return -1;
2463
2464        trace->config.hasher_queue_size = size;
2465        return 0;
2466}
2467
2468DLLEXPORT int trace_set_hasher_polling(libtrace_t *trace, bool polling) {
2469        if (!trace_is_configurable(trace)) return -1;
2470
2471        trace->config.hasher_polling = polling;
2472        return 0;
2473}
2474
2475DLLEXPORT int trace_set_reporter_polling(libtrace_t *trace, bool polling) {
2476        if (!trace_is_configurable(trace)) return -1;
2477
2478        trace->config.reporter_polling = polling;
2479        return 0;
2480}
2481
2482DLLEXPORT int trace_set_reporter_thold(libtrace_t *trace, size_t thold) {
2483        if (!trace_is_configurable(trace)) return -1;
2484
2485        trace->config.reporter_thold = thold;
2486        return 0;
2487}
2488
2489DLLEXPORT int trace_set_debug_state(libtrace_t *trace, bool debug_state) {
2490        if (!trace_is_configurable(trace)) return -1;
2491
2492        trace->config.debug_state = debug_state;
2493        return 0;
2494}
2495
2496static bool config_bool_parse(char *value, size_t nvalue) {
2497        if (strncmp(value, "true", nvalue) == 0)
2498                return true;
2499        else if (strncmp(value, "false", nvalue) == 0)
2500                return false;
2501        else
2502                return strtoll(value, NULL, 10) != 0;
2503}
2504
2505/* Note update documentation on trace_set_configuration */
2506static void config_string(struct user_configuration *uc, char *key, size_t nkey, char *value, size_t nvalue) {
2507        assert(key);
2508        assert(value);
2509        assert(uc);
2510        if (strncmp(key, "cache_size", nkey) == 0
2511            || strncmp(key, "cs", nkey) == 0) {
2512                uc->cache_size = strtoll(value, NULL, 10);
2513        } else if (strncmp(key, "thread_cache_size", nkey) == 0
2514                   || strncmp(key, "tcs", nkey) == 0) {
2515                uc->thread_cache_size = strtoll(value, NULL, 10);
2516        } else if (strncmp(key, "fixed_count", nkey) == 0
2517                   || strncmp(key, "fc", nkey) == 0) {
2518                uc->fixed_count = config_bool_parse(value, nvalue);
2519        } else if (strncmp(key, "burst_size", nkey) == 0
2520                   || strncmp(key, "bs", nkey) == 0) {
2521                uc->burst_size = strtoll(value, NULL, 10);
2522        } else if (strncmp(key, "tick_interval", nkey) == 0
2523                   || strncmp(key, "ti", nkey) == 0) {
2524                uc->tick_interval = strtoll(value, NULL, 10);
2525        } else if (strncmp(key, "tick_count", nkey) == 0
2526                   || strncmp(key, "tc", nkey) == 0) {
2527                uc->tick_count = strtoll(value, NULL, 10);
2528        } else if (strncmp(key, "perpkt_threads", nkey) == 0
2529                   || strncmp(key, "pt", nkey) == 0) {
2530                uc->perpkt_threads = strtoll(value, NULL, 10);
2531        } else if (strncmp(key, "hasher_queue_size", nkey) == 0
2532                   || strncmp(key, "hqs", nkey) == 0) {
2533                uc->hasher_queue_size = strtoll(value, NULL, 10);
2534        } else if (strncmp(key, "hasher_polling", nkey) == 0
2535                   || strncmp(key, "hp", nkey) == 0) {
2536                uc->hasher_polling = config_bool_parse(value, nvalue);
2537        } else if (strncmp(key, "reporter_polling", nkey) == 0
2538                   || strncmp(key, "rp", nkey) == 0) {
2539                uc->reporter_polling = config_bool_parse(value, nvalue);
2540        } else if (strncmp(key, "reporter_thold", nkey) == 0
2541                   || strncmp(key, "rt", nkey) == 0) {
2542                uc->reporter_thold = strtoll(value, NULL, 10);
2543        } else if (strncmp(key, "debug_state", nkey) == 0
2544                   || strncmp(key, "ds", nkey) == 0) {
2545                uc->debug_state = config_bool_parse(value, nvalue);
2546        } else {
2547                fprintf(stderr, "No matching option %s(=%s), ignoring\n", key, value);
2548        }
2549}
2550
2551DLLEXPORT int trace_set_configuration(libtrace_t *trace, const char *str) {
2552        char *pch;
2553        char key[100];
2554        char value[100];
2555        char *dup;
2556        assert(str);
2557        assert(trace);
2558
2559        if (!trace_is_configurable(trace)) return -1;
2560
2561        dup = strdup(str);
2562        pch = strtok (dup," ,.-");
2563        while (pch != NULL)
2564        {
2565                if (sscanf(pch, "%99[^=]=%99s", key, value) == 2) {
2566                        config_string(&trace->config, key, sizeof(key), value, sizeof(value));
2567                } else {
2568                        fprintf(stderr, "Error parsing option %s\n", pch);
2569                }
2570                pch = strtok (NULL," ,.-");
2571        }
2572        free(dup);
2573
2574        return 0;
2575}
2576
2577DLLEXPORT int trace_set_configuration_file(libtrace_t *trace, FILE *file) {
2578        char line[1024];
2579        if (!trace_is_configurable(trace)) return -1;
2580
2581        while (fgets(line, sizeof(line), file) != NULL)
2582        {
2583                trace_set_configuration(trace, line);
2584        }
2585
2586        if(ferror(file))
2587                return -1;
2588        else
2589                return 0;
2590}
2591
2592DLLEXPORT void trace_free_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
2593        assert(packet);
2594        /* Always release any resources this might be holding */
2595        trace_fin_packet(packet);
2596        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packet, 1, 1);
2597}
2598
2599DLLEXPORT void trace_increment_packet_refcount(libtrace_packet_t *packet) {
2600        pthread_mutex_lock(&(packet->ref_lock));
2601        if (packet->refcount < 0) {
2602                packet->refcount = 1;
2603        } else {
2604                packet->refcount ++;
2605        }
2606        pthread_mutex_unlock(&(packet->ref_lock));
2607}
2608
2609DLLEXPORT void trace_decrement_packet_refcount(libtrace_packet_t *packet) {
2610        pthread_mutex_lock(&(packet->ref_lock));
2611        packet->refcount --;
2612
2613        if (packet->refcount <= 0) {
2614                trace_free_packet(packet->trace, packet);
2615        }
2616        pthread_mutex_unlock(&(packet->ref_lock));
2617}
2618
2619
2620DLLEXPORT libtrace_info_t *trace_get_information(libtrace_t * libtrace) {
2621        if (libtrace->format)
2622                return &libtrace->format->info;
2623        else
2624                return NULL;
2625}
Note: See TracBrowser for help on using the repository browser.