source: lib/trace_parallel.c @ c1205bd

cachetimestampsdevelopetsiliverc-4.0.4ringdecrementfixringperformance
Last change on this file since c1205bd was c1205bd, checked in by Shane Alcock <salcock@…>, 3 years ago

Add packet reference counting API

This API allows users to keep track of references to libtrace
packet structures that are not being released by the per packet
processing threads, e.g. those passed to the reporter or to
any user-created pthreads.

If you only have one reference to a packet at any given time,
you won't need this API. However, if you have multiple
concurrent references, this API will allow you to ensure that the
packet is correctly freed only once all references have been
finished with.

  • Property mode set to 100644
File size: 84.9 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27#define _GNU_SOURCE
28#include "common.h"
29#include "config.h"
30#include <assert.h>
31#include <errno.h>
32#include <fcntl.h>
33#include <stdio.h>
34#include <stdlib.h>
35#include <string.h>
36#include <sys/stat.h>
37#include <sys/types.h>
38#ifndef WIN32
39#include <sys/socket.h>
40#endif
41#include <stdarg.h>
42#include <sys/param.h>
43
44#ifdef HAVE_LIMITS_H
45#  include <limits.h>
46#endif
47
48#ifdef HAVE_SYS_LIMITS_H
49#  include <sys/limits.h>
50#endif
51
52#ifdef HAVE_NET_IF_ARP_H
53#  include <net/if_arp.h>
54#endif
55
56#ifdef HAVE_NET_IF_H
57#  include <net/if.h>
58#endif
59
60#ifdef HAVE_NETINET_IN_H
61#  include <netinet/in.h>
62#endif
63
64#ifdef HAVE_NET_ETHERNET_H
65#  include <net/ethernet.h>
66#endif
67
68#ifdef HAVE_NETINET_IF_ETHER_H
69#  include <netinet/if_ether.h>
70#endif
71
72#include <time.h>
73#ifdef WIN32
74#include <sys/timeb.h>
75#endif
76
77#include "libtrace.h"
78#include "libtrace_parallel.h"
79
80#ifdef HAVE_NET_BPF_H
81#  include <net/bpf.h>
82#else
83#  ifdef HAVE_PCAP_BPF_H
84#    include <pcap-bpf.h>
85#  endif
86#endif
87
88
89#include "libtrace_int.h"
90#include "format_helper.h"
91#include "rt_protocol.h"
92#include "hash_toeplitz.h"
93
94#include <pthread.h>
95#include <signal.h>
96#include <unistd.h>
97#include <ctype.h>
98
99static inline int delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t);
100extern int libtrace_parallel;
101
102struct mem_stats {
103        struct memfail {
104           uint64_t cache_hit;
105           uint64_t ring_hit;
106           uint64_t miss;
107           uint64_t recycled;
108        } readbulk, read, write, writebulk;
109};
110
111
112#ifdef ENABLE_MEM_STATS
113// Grrr gcc wants this spelt out
114__thread struct mem_stats mem_hits = {{0},{0},{0},{0}};
115
116
117static void print_memory_stats() {
118        uint64_t total;
119#if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__)
120        char t_name[50];
121        pthread_getname_np(pthread_self(), t_name, sizeof(t_name));
122
123        fprintf(stderr, "Thread ID#%d - %s\n", (int) pthread_self(), t_name);
124#else
125        fprintf(stderr, "Thread ID#%d\n", (int) pthread_self());
126#endif
127
128        total = mem_hits.read.cache_hit + mem_hits.read.ring_hit + mem_hits.read.miss;
129        if (total) {
130                fprintf(stderr, "\tRead:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
131                                mem_hits.read.cache_hit, mem_hits.read.ring_hit, mem_hits.read.miss, mem_hits.read.recycled);
132                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
133                                total, (double) mem_hits.read.miss / (double) total * 100.0);
134        }
135
136        total = mem_hits.readbulk.cache_hit + mem_hits.readbulk.ring_hit + mem_hits.readbulk.miss;
137        if (total) {
138                fprintf(stderr, "\tReadbulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
139                                mem_hits.readbulk.cache_hit, mem_hits.readbulk.ring_hit, mem_hits.readbulk.miss, mem_hits.readbulk.recycled);
140
141
142                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
143                                total, (double) mem_hits.readbulk.miss / (double) total * 100.0);
144        }
145
146        total = mem_hits.write.cache_hit + mem_hits.write.ring_hit + mem_hits.write.miss;
147        if (total) {
148                fprintf(stderr, "\tWrite:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
149                                mem_hits.write.cache_hit, mem_hits.write.ring_hit, mem_hits.write.miss, mem_hits.write.recycled);
150
151                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
152                                total, (double) mem_hits.write.miss / (double) total * 100.0);
153        }
154
155        total = mem_hits.writebulk.cache_hit + mem_hits.writebulk.ring_hit + mem_hits.writebulk.miss;
156        if (total) {
157                fprintf(stderr, "\tWritebulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
158                                mem_hits.writebulk.cache_hit, mem_hits.writebulk.ring_hit, mem_hits.writebulk.miss, mem_hits.writebulk.recycled);
159
160                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
161                                total, (double) mem_hits.writebulk.miss / (double) total * 100.0);
162        }
163}
164#else
165static void print_memory_stats() {}
166#endif
167
168static const libtrace_generic_t gen_zero = {0};
169
170/* This should optimise away the switch to nothing in the explict cases */
171inline void send_message(libtrace_t *trace, libtrace_thread_t *thread,
172                const enum libtrace_messages type,
173                libtrace_generic_t data, libtrace_thread_t *sender) {
174
175        fn_cb_dataless fn = NULL;
176        enum libtrace_messages switchtype;
177        libtrace_callback_set_t *cbs = NULL;
178
179        if (thread == &trace->reporter_thread) {
180                cbs = trace->reporter_cbs;
181        } else {
182                cbs = trace->perpkt_cbs;
183        }
184
185        if (cbs == NULL)
186                return;
187
188        if (type >= MESSAGE_USER)
189                switchtype = MESSAGE_USER;
190        else
191                switchtype = (enum libtrace_messages) type;
192
193        switch (switchtype) {
194        case MESSAGE_STARTING:
195                if (cbs->message_starting)
196                        thread->user_data = (*cbs->message_starting)(trace,
197                                        thread, trace->global_blob);
198                return;
199        case MESSAGE_FIRST_PACKET:
200                if (cbs->message_first_packet)
201                                (*cbs->message_first_packet)(trace, thread,
202                                trace->global_blob, thread->user_data,
203                                sender);
204                return;
205        case MESSAGE_TICK_COUNT:
206                if (cbs->message_tick_count)
207                        (*cbs->message_tick_count)(trace, thread,
208                                        trace->global_blob, thread->user_data,
209                                        data.uint64);
210                return;
211        case MESSAGE_TICK_INTERVAL:
212                if (cbs->message_tick_interval)
213                        (*cbs->message_tick_interval)(trace, thread,
214                                        trace->global_blob, thread->user_data,
215                                        data.uint64);
216                return;
217        case MESSAGE_STOPPING:
218                fn = cbs->message_stopping;
219                break;
220        case MESSAGE_RESUMING:
221                fn = cbs->message_resuming;
222                break;
223        case MESSAGE_PAUSING:
224                fn = cbs->message_pausing;
225                break;
226        case MESSAGE_USER:
227                if (cbs->message_user)
228                        (*cbs->message_user)(trace, thread, trace->global_blob,
229                                        thread->user_data, type, data, sender);
230                return;
231        case MESSAGE_RESULT:
232                if (cbs->message_result)
233                        (*cbs->message_result)(trace, thread,
234                                        trace->global_blob, thread->user_data,
235                                        data.res);
236                return;
237
238        /* These should be unused */
239        case MESSAGE_DO_PAUSE:
240        case MESSAGE_DO_STOP:
241        case MESSAGE_POST_REPORTER:
242        case MESSAGE_PACKET:
243                return;
244        }
245
246        if (fn)
247                (*fn)(trace, thread, trace->global_blob, thread->user_data);
248}
249
250DLLEXPORT void trace_destroy_callback_set(libtrace_callback_set_t *cbset) {
251        free(cbset);
252}
253
254DLLEXPORT libtrace_callback_set_t *trace_create_callback_set() {
255        libtrace_callback_set_t *cbset;
256
257        cbset = (libtrace_callback_set_t *)malloc(sizeof(libtrace_callback_set_t));
258        memset(cbset, 0, sizeof(libtrace_callback_set_t));
259        return cbset;
260}
261
262/*
263 * This can be used once the hasher thread has been started and internally after
264 * verify_configuration.
265 */
266DLLEXPORT bool trace_has_dedicated_hasher(libtrace_t * libtrace)
267{
268        return libtrace->hasher_thread.type == THREAD_HASHER;
269}
270
271DLLEXPORT bool trace_has_reporter(libtrace_t * libtrace)
272{
273        assert(libtrace->state != STATE_NEW);
274        return libtrace->reporter_thread.type == THREAD_REPORTER && libtrace->reporter_cbs;
275}
276
277/**
278 * When running the number of perpkt threads in use.
279 * TODO what if the trace is not running yet, or has finished??
280 *
281 * @brief libtrace_perpkt_thread_nb
282 * @param t The trace
283 * @return
284 */
285DLLEXPORT int trace_get_perpkt_threads(libtrace_t * t) {
286        return t->perpkt_thread_count;
287}
288
289DLLEXPORT int trace_get_perpkt_thread_id(libtrace_thread_t *thread) {
290        return thread->perpkt_num;
291}
292
293/**
294 * Changes the overall traces state and signals the condition.
295 *
296 * @param trace A pointer to the trace
297 * @param new_state The new state of the trace
298 * @param need_lock Set to true if libtrace_lock is not held, otherwise
299 *        false in the case the lock is currently held by this thread.
300 */
301static inline void libtrace_change_state(libtrace_t *trace,
302        const enum trace_state new_state, const bool need_lock)
303{
304        UNUSED enum trace_state prev_state;
305        if (need_lock)
306                pthread_mutex_lock(&trace->libtrace_lock);
307        prev_state = trace->state;
308        trace->state = new_state;
309
310        if (trace->config.debug_state)
311                fprintf(stderr, "Trace(%s) state changed from %s to %s\n",
312                        trace->uridata, get_trace_state_name(prev_state),
313                        get_trace_state_name(trace->state));
314
315        pthread_cond_broadcast(&trace->perpkt_cond);
316        if (need_lock)
317                pthread_mutex_unlock(&trace->libtrace_lock);
318}
319
320/**
321 * Changes a thread's state and broadcasts the condition variable. This
322 * should always be done when the lock is held.
323 *
324 * Additionally for perpkt threads the state counts are updated.
325 *
326 * @param trace A pointer to the trace
327 * @param t A pointer to the thread to modify
328 * @param new_state The new state of the thread
329 * @param need_lock Set to true if libtrace_lock is not held, otherwise
330 *        false in the case the lock is currently held by this thread.
331 */
332static inline void thread_change_state(libtrace_t *trace, libtrace_thread_t *t,
333        const enum thread_states new_state, const bool need_lock)
334{
335        enum thread_states prev_state;
336        if (need_lock)
337                pthread_mutex_lock(&trace->libtrace_lock);
338        prev_state = t->state;
339        t->state = new_state;
340        if (t->type == THREAD_PERPKT) {
341                --trace->perpkt_thread_states[prev_state];
342                ++trace->perpkt_thread_states[new_state];
343        }
344
345        if (trace->config.debug_state)
346                fprintf(stderr, "Thread %d state changed from %d to %d\n",
347                        (int) t->tid, prev_state, t->state);
348
349        if (trace->perpkt_thread_states[THREAD_FINISHED] == trace->perpkt_thread_count) {
350                /* Make sure we save our final stats in case someone wants
351                 * them at the end of their program.
352                 */
353
354                trace_get_statistics(trace, NULL);
355                libtrace_change_state(trace, STATE_FINISHED, false);
356        }
357
358        pthread_cond_broadcast(&trace->perpkt_cond);
359        if (need_lock)
360                pthread_mutex_unlock(&trace->libtrace_lock);
361}
362
363/**
364 * This is valid once a trace is initialised
365 *
366 * @return True if the format supports parallel threads.
367 */
368static inline bool trace_supports_parallel(libtrace_t *trace)
369{
370        assert(trace);
371        assert(trace->format);
372        if (trace->format->pstart_input)
373                return true;
374        else
375                return false;
376}
377
378void libtrace_zero_thread(libtrace_thread_t * t) {
379        t->accepted_packets = 0;
380        t->filtered_packets = 0;
381        t->recorded_first = false;
382        t->tracetime_offset_usec = 0;
383        t->user_data = 0;
384        t->format_data = 0;
385        libtrace_zero_ringbuffer(&t->rbuffer);
386        t->trace = NULL;
387        t->ret = NULL;
388        t->type = THREAD_EMPTY;
389        t->perpkt_num = -1;
390}
391
392// Ints are aligned int is atomic so safe to read and write at same time
393// However write must be locked, read doesn't (We never try read before written to table)
394libtrace_thread_t * get_thread_table(libtrace_t *libtrace) {
395        int i = 0;
396        pthread_t tid = pthread_self();
397
398        for (;i<libtrace->perpkt_thread_count ;++i) {
399                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
400                        return &libtrace->perpkt_threads[i];
401        }
402        return NULL;
403}
404
405static libtrace_thread_t * get_thread_descriptor(libtrace_t *libtrace) {
406        libtrace_thread_t *ret;
407        if (!(ret = get_thread_table(libtrace))) {
408                pthread_t tid = pthread_self();
409                // Check if we are reporter or something else
410                if (libtrace->hasher_thread.type == THREAD_REPORTER &&
411                                pthread_equal(tid, libtrace->reporter_thread.tid))
412                        ret = &libtrace->reporter_thread;
413                else if (libtrace->hasher_thread.type == THREAD_HASHER &&
414                         pthread_equal(tid, libtrace->hasher_thread.tid))
415                        ret = &libtrace->hasher_thread;
416                else
417                        ret = NULL;
418        }
419        return ret;
420}
421
422DLLEXPORT void libtrace_make_packet_safe(libtrace_packet_t *pkt) {
423        // Duplicate the packet in standard malloc'd memory and free the
424        // original, This is a 1:1 exchange so the ocache count remains unchanged.
425        if (pkt->buf_control != TRACE_CTRL_PACKET) {
426                libtrace_packet_t *dup;
427                dup = trace_copy_packet(pkt);
428                /* Release the external buffer */
429                trace_fin_packet(pkt);
430                /* Copy the duplicated packet over the existing */
431                memcpy(pkt, dup, sizeof(libtrace_packet_t));
432                /* Free the packet structure */
433                free(dup);
434        }
435}
436
437/**
438 * Makes a libtrace_result_t safe, used when pausing a trace.
439 * This will call libtrace_make_packet_safe if the result is
440 * a packet.
441 */
442DLLEXPORT void libtrace_make_result_safe(libtrace_result_t *res) {
443        if (res->type == RESULT_PACKET) {
444                libtrace_make_packet_safe(res->value.pkt);
445        }
446}
447
448/**
449 * Holds threads in a paused state, until released by broadcasting
450 * the condition mutex.
451 */
452static void trace_thread_pause(libtrace_t *trace, libtrace_thread_t *t) {
453        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
454        thread_change_state(trace, t, THREAD_PAUSED, false);
455        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
456                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
457        }
458        thread_change_state(trace, t, THREAD_RUNNING, false);
459        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
460}
461
462/**
463 * Sends a packet to the user, expects either a valid packet or a TICK packet.
464 *
465 * @param trace The trace
466 * @param t The current thread
467 * @param packet A pointer to the packet storage, which may be set to null upon
468 *               return, or a packet to be finished.
469 * @param tracetime If true packets are delayed to match with tracetime
470 * @return 0 is successful, otherwise if playing back in tracetime
471 *         READ_MESSAGE(-2) can be returned in which case the packet is not sent.
472 *
473 * @note READ_MESSAGE will only be returned if tracetime is true.
474 */
475static inline int dispatch_packet(libtrace_t *trace,
476                                  libtrace_thread_t *t,
477                                  libtrace_packet_t **packet,
478                                  bool tracetime) {
479
480        if ((*packet)->error > 0) {
481                if (tracetime) {
482                        if (delay_tracetime(trace, packet[0], t) == READ_MESSAGE)
483                                return READ_MESSAGE;
484                }
485                if (!IS_LIBTRACE_META_PACKET((*packet))) {
486                        t->accepted_packets++;
487                }
488                if (trace->perpkt_cbs->message_packet)
489                        *packet = (*trace->perpkt_cbs->message_packet)(trace, t, trace->global_blob, t->user_data, *packet);
490                trace_fin_packet(*packet);
491        } else {
492                assert((*packet)->error == READ_TICK);
493                libtrace_generic_t data = {.uint64 = trace_packet_get_order(*packet)};
494                send_message(trace, t, MESSAGE_TICK_COUNT, data, t);
495        }
496        return 0;
497}
498
499/**
500 * Sends a batch of packets to the user, expects either a valid packet or a
501 * TICK packet.
502 *
503 * @param trace The trace
504 * @param t The current thread
505 * @param packets [in,out] An array of packets, these may be null upon return
506 * @param nb_packets The total number of packets in the list
507 * @param empty [in,out] A pointer to an integer storing the first empty slot,
508 * upon return this is updated
509 * @param offset [in,out] The offset into the array, upon return this is updated
510 * @param tracetime If true packets are delayed to match with tracetime
511 * @return 0 is successful, otherwise if playing back in tracetime
512 *         READ_MESSAGE(-2) can be returned in which case the packet is not sent.
513 *
514 * @note READ_MESSAGE will only be returned if tracetime is true.
515 */
516static inline int dispatch_packets(libtrace_t *trace,
517                                  libtrace_thread_t *t,
518                                  libtrace_packet_t *packets[],
519                                  int nb_packets, int *empty, int *offset,
520                                  bool tracetime) {
521        for (;*offset < nb_packets; ++*offset) {
522                int ret;
523                ret = dispatch_packet(trace, t, &packets[*offset], tracetime);
524                if (ret == 0) {
525                        /* Move full slots to front as we go */
526                        if (packets[*offset]) {
527                                if (*empty != *offset) {
528                                        packets[*empty] = packets[*offset];
529                                        packets[*offset] = NULL;
530                                }
531                                ++*empty;
532                        }
533                } else {
534                        /* Break early */
535                        assert(ret == READ_MESSAGE);
536                        return READ_MESSAGE;
537                }
538        }
539
540        return 0;
541}
542
543/**
544 * Pauses a per packet thread, messages will not be processed when the thread
545 * is paused.
546 *
547 * This process involves reading packets if a hasher thread is used. As such
548 * this function can fail to pause due to errors when reading in which case
549 * the thread should be stopped instead.
550 *
551 *
552 * @brief trace_perpkt_thread_pause
553 * @return READ_ERROR(-1) or READ_EOF(0) or 1 if successfull
554 */
555static int trace_perpkt_thread_pause(libtrace_t *trace, libtrace_thread_t *t,
556                                     libtrace_packet_t *packets[],
557                                     int nb_packets, int *empty, int *offset) {
558        libtrace_packet_t * packet = NULL;
559
560        /* Let the user thread know we are going to pause */
561        send_message(trace, t, MESSAGE_PAUSING, gen_zero, t);
562
563        /* Send through any remaining packets (or messages) without delay */
564
565        /* First send those packets already read, as fast as possible
566         * This should never fail or check for messages etc. */
567        ASSERT_RET(dispatch_packets(trace, t, packets, nb_packets, empty,
568                                    offset, false), == 0);
569
570        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
571        /* If a hasher thread is running, empty input queues so we don't lose data */
572        if (trace_has_dedicated_hasher(trace)) {
573                // The hasher has stopped by this point, so the queue shouldn't be filling
574                while(!libtrace_ringbuffer_is_empty(&t->rbuffer) || t->format_data) {
575                        int ret = trace->pread(trace, t, &packet, 1);
576                        if (ret == 1) {
577                                if (packet->error > 0) {
578                                        store_first_packet(trace, packet, t);
579                                }
580                                ASSERT_RET(dispatch_packet(trace, t, &packet, false), == 0);
581                                if (packet == NULL)
582                                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
583                        } else if (ret != READ_MESSAGE) {
584                                /* Ignore messages we pick these up next loop */
585                                assert (ret == READ_EOF || ret == READ_ERROR);
586                                /* Verify no packets are remaining */
587                                /* TODO refactor this sanity check out!! */
588                                while (!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
589                                        ASSERT_RET(trace->pread(trace, t, &packet, 1), <= 0);
590                                        // No packets after this should have any data in them
591                                        assert(packet->error <= 0);
592                                }
593                                libtrace_ocache_free(&trace->packet_freelist, (void **) &packet, 1, 1);
594                                return -1;
595                        }
596                }
597        }
598        libtrace_ocache_free(&trace->packet_freelist, (void **) &packet, 1, 1);
599
600        /* Now we do the actual pause, this returns when we resumed */
601        trace_thread_pause(trace, t);
602        send_message(trace, t, MESSAGE_RESUMING, gen_zero, t);
603        return 1;
604}
605
606/**
607 * The is the entry point for our packet processing threads.
608 */
609static void* perpkt_threads_entry(void *data) {
610        libtrace_t *trace = (libtrace_t *)data;
611        libtrace_thread_t *t;
612        libtrace_message_t message = {0, {.uint64=0}, NULL};
613        libtrace_packet_t *packets[trace->config.burst_size];
614        size_t i;
615        //int ret;
616        /* The current reading position into the packets */
617        int offset = 0;
618        /* The number of packets last read */
619        int nb_packets = 0;
620        /* The offset to the first NULL packet upto offset */
621        int empty = 0;
622        int j;
623
624        /* Wait until trace_pstart has been completed */
625        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
626        t = get_thread_table(trace);
627        assert(t);
628        if (trace->state == STATE_ERROR) {
629                thread_change_state(trace, t, THREAD_FINISHED, false);
630                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
631                pthread_exit(NULL);
632        }
633        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
634
635        if (trace->format->pregister_thread) {
636                if (trace->format->pregister_thread(trace, t, 
637                                trace_is_parallel(trace)) < 0) {
638                        thread_change_state(trace, t, THREAD_FINISHED, false);
639                        pthread_exit(NULL);
640                }
641        }
642
643        /* Fill our buffer with empty packets */
644        memset(&packets, 0, sizeof(void*) * trace->config.burst_size);
645        libtrace_ocache_alloc(&trace->packet_freelist, (void **) packets,
646                              trace->config.burst_size,
647                              trace->config.burst_size);
648
649        /* ~~~~~~~~~~~ Setup complete now we loop ~~~~~~~~~~~~~~~ */
650
651        /* Let the per_packet function know we have started */
652        send_message(trace, t, MESSAGE_STARTING, gen_zero, t);
653        send_message(trace, t, MESSAGE_RESUMING, gen_zero, t);
654
655        for (;;) {
656
657                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
658                        int ret;
659                        switch (message.code) {
660                                case MESSAGE_DO_PAUSE: // This is internal
661                                        ret = trace_perpkt_thread_pause(trace, t,
662                                              packets, nb_packets, &empty, &offset);
663                                        if (ret == READ_EOF) {
664                                                goto eof;
665                                        } else if (ret == READ_ERROR) {
666                                                goto error;
667                                        }
668                                        assert(ret == 1);
669                                        continue;
670                                case MESSAGE_DO_STOP: // This is internal
671                                        goto eof;
672                        }
673                        send_message(trace, t, message.code, message.data, 
674                                        message.sender);
675                        /* Continue and the empty messages out before packets */
676                        continue;
677                }
678
679
680                /* Do we need to read a new set of packets MOST LIKELY we do */
681                if (offset == nb_packets) {
682                        /* Refill the packet buffer */
683                        if (empty != nb_packets) {
684                                // Refill the empty packets
685                                libtrace_ocache_alloc(&trace->packet_freelist,
686                                                      (void **) &packets[empty],
687                                                      nb_packets - empty,
688                                                      nb_packets - empty);
689                        }
690                        if (!trace->pread) {
691                                assert(packets[0]);
692                                ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
693                                nb_packets = trace_read_packet(trace, packets[0]);
694                                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
695                                packets[0]->error = nb_packets;
696                                if (nb_packets > 0)
697                                        nb_packets = 1;
698                        } else {
699                                nb_packets = trace->pread(trace, t, packets, trace->config.burst_size);
700                        }
701                        offset = 0;
702                        empty = 0;
703                }
704
705                /* Handle error/message cases */
706                if (nb_packets > 0) {
707                        /* Store the first non-meta packet */
708                        for (j = 0; j < nb_packets; j++) {
709                                if (t->recorded_first)
710                                        break;
711                                if (packets[j]->error > 0) {
712                                        store_first_packet(trace, packets[j], t);
713                                }
714                        }
715                        dispatch_packets(trace, t, packets, nb_packets, &empty,
716                                         &offset, trace->tracetime);
717                } else {
718                        switch (nb_packets) {
719                        case READ_EOF:
720                                goto eof;
721                        case READ_ERROR:
722                                goto error;
723                        case READ_MESSAGE:
724                                nb_packets = 0;
725                                continue;
726                        default:
727                                fprintf(stderr, "Unexpected error %d!!\n", nb_packets);
728                                goto error;
729                        }
730                }
731
732        }
733
734error:
735        message.code = MESSAGE_DO_STOP;
736        message.sender = t;
737        message.data.uint64 = 0;
738        trace_message_perpkts(trace, &message);
739eof:
740        /* ~~~~~~~~~~~~~~ Trace is finished do tear down ~~~~~~~~~~~~~~~~~~~~~ */
741
742        // Let the per_packet function know we have stopped
743        send_message(trace, t, MESSAGE_PAUSING, gen_zero, t);
744        send_message(trace, t, MESSAGE_STOPPING, gen_zero, t);
745
746        // Free any remaining packets
747        for (i = 0; i < trace->config.burst_size; i++) {
748                if (packets[i]) {
749                        libtrace_ocache_free(&trace->packet_freelist, (void **) &packets[i], 1, 1);
750                        packets[i] = NULL;
751                }
752        }
753
754        thread_change_state(trace, t, THREAD_FINISHED, true);
755
756        /* Make sure the reporter sees we have finished */
757        if (trace_has_reporter(trace))
758                trace_post_reporter(trace);
759
760        // Release all ocache memory before unregistering with the format
761        // because this might(it does in DPDK) unlink the formats mempool
762        // causing destroy/finish packet to fail.
763        libtrace_ocache_unregister_thread(&trace->packet_freelist);
764        if (trace->format->punregister_thread) {
765                trace->format->punregister_thread(trace, t);
766        }
767        print_memory_stats();
768
769        pthread_exit(NULL);
770}
771
772/**
773 * The start point for our single threaded hasher thread, this will read
774 * and hash a packet from a data source and queue it against the correct
775 * core to process it.
776 */
777static void* hasher_entry(void *data) {
778        libtrace_t *trace = (libtrace_t *)data;
779        libtrace_thread_t * t;
780        int i;
781        libtrace_packet_t * packet;
782        libtrace_message_t message = {0, {.uint64=0}, NULL};
783        int pkt_skipped = 0;
784
785        assert(trace_has_dedicated_hasher(trace));
786        /* Wait until all threads are started and objects are initialised (ring buffers) */
787        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
788        t = &trace->hasher_thread;
789        assert(t->type == THREAD_HASHER && pthread_equal(pthread_self(), t->tid));
790        if (trace->state == STATE_ERROR) {
791                thread_change_state(trace, t, THREAD_FINISHED, false);
792                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
793                pthread_exit(NULL);
794        }
795        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
796
797        /* We are reading but it is not the parallel API */
798        if (trace->format->pregister_thread) {
799                trace->format->pregister_thread(trace, t, true);
800        }
801
802        /* Read all packets in then hash and queue against the correct thread */
803        while (1) {
804                int thread;
805                if (!pkt_skipped)
806                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
807                assert(packet);
808
809                // Check for messages that we expect MESSAGE_DO_PAUSE, (internal messages only)
810                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
811                        switch(message.code) {
812                                case MESSAGE_DO_PAUSE:
813                                        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
814                                        thread_change_state(trace, t, THREAD_PAUSED, false);
815                                        pthread_cond_broadcast(&trace->perpkt_cond);
816                                        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
817                                                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
818                                        }
819                                        thread_change_state(trace, t, THREAD_RUNNING, false);
820                                        pthread_cond_broadcast(&trace->perpkt_cond);
821                                        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
822                                        break;
823                                case MESSAGE_DO_STOP:
824                                        /* Either FINISHED or FINISHING */
825                                        assert(trace->started == false);
826                                        /* Mark the current packet as EOF */
827                                        packet->error = 0;
828                                        goto hasher_eof;
829                                default:
830                                        fprintf(stderr, "Hasher thread didn't expect message code=%d\n", message.code);
831                        }
832                        pkt_skipped = 1;
833                        continue;
834                }
835
836                if ((packet->error = trace_read_packet(trace, packet)) <1) {
837                        if (packet->error == READ_MESSAGE) {
838                                pkt_skipped = 1;
839                                continue;
840                        } else {
841                                break; /* We are EOF or error'd either way we stop  */
842                        }
843                }
844
845                /* We are guaranteed to have a hash function i.e. != NULL */
846                trace_packet_set_hash(packet, (*trace->hasher)(packet, trace->hasher_data));
847                thread = trace_packet_get_hash(packet) % trace->perpkt_thread_count;
848                /* Blocking write to the correct queue - I'm the only writer */
849                if (trace->perpkt_threads[thread].state != THREAD_FINISHED) {
850                        uint64_t order = trace_packet_get_order(packet);
851                        libtrace_ringbuffer_write(&trace->perpkt_threads[thread].rbuffer, packet);
852                        if (trace->config.tick_count && order % trace->config.tick_count == 0) {
853                                // Write ticks to everyone else
854                                libtrace_packet_t * pkts[trace->perpkt_thread_count];
855                                memset(pkts, 0, sizeof(void *) * trace->perpkt_thread_count);
856                                libtrace_ocache_alloc(&trace->packet_freelist, (void **) pkts, trace->perpkt_thread_count, trace->perpkt_thread_count);
857                                for (i = 0; i < trace->perpkt_thread_count; i++) {
858                                        pkts[i]->error = READ_TICK;
859                                        trace_packet_set_order(pkts[i], order);
860                                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, pkts[i]);
861                                }
862                        }
863                        pkt_skipped = 0;
864                } else {
865                        assert(!"Dropping a packet!!");
866                        pkt_skipped = 1; // Reuse that packet no one read it
867                }
868        }
869hasher_eof:
870        /* Broadcast our last failed read to all threads */
871        for (i = 0; i < trace->perpkt_thread_count; i++) {
872                libtrace_packet_t * bcast;
873                if (i == trace->perpkt_thread_count - 1) {
874                        bcast = packet;
875                } else {
876                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &bcast, 1, 1);
877                        bcast->error = packet->error;
878                }
879                ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
880                if (trace->perpkt_threads[i].state != THREAD_FINISHED) {
881                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, bcast);
882                } else {
883                        libtrace_ocache_free(&trace->packet_freelist, (void **) &bcast, 1, 1);
884                }
885                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
886        }
887
888        // We don't need to free the packet
889        thread_change_state(trace, t, THREAD_FINISHED, true);
890
891        libtrace_ocache_unregister_thread(&trace->packet_freelist);
892        if (trace->format->punregister_thread) {
893                trace->format->punregister_thread(trace, t);
894        }
895        print_memory_stats();
896
897        // TODO remove from TTABLE t sometime
898        pthread_exit(NULL);
899}
900
901/* Our simplest case when a thread becomes ready it can obtain an exclusive
902 * lock to read packets from the underlying trace.
903 */
904static int trace_pread_packet_first_in_first_served(libtrace_t *libtrace,
905                                                    libtrace_thread_t *t,
906                                                    libtrace_packet_t *packets[],
907                                                    size_t nb_packets) {
908        size_t i = 0;
909        //bool tick_hit = false;
910
911        ASSERT_RET(pthread_mutex_lock(&libtrace->read_packet_lock), == 0);
912        /* Read nb_packets */
913        for (i = 0; i < nb_packets; ++i) {
914                if (libtrace_message_queue_count(&t->messages) > 0) {
915                        if ( i==0 ) {
916                                ASSERT_RET(pthread_mutex_unlock(&libtrace->read_packet_lock), == 0);
917                                return READ_MESSAGE;
918                        } else {
919                                break;
920                        }
921                }
922                packets[i]->error = trace_read_packet(libtrace, packets[i]);
923
924                if (packets[i]->error <= 0) {
925                        /* We'll catch this next time if we have already got packets */
926                        if ( i==0 ) {
927                                ASSERT_RET(pthread_mutex_unlock(&libtrace->read_packet_lock), == 0);
928                                return packets[i]->error;
929                        } else {
930                                break;
931                        }
932                }
933                /*
934                if (libtrace->config.tick_count && trace_packet_get_order(packets[i]) % libtrace->config.tick_count == 0) {
935                        tick_hit = true;
936                }*/
937
938                // Doing this inside the lock ensures the first packet is
939                // always recorded first
940                if (!t->recorded_first && packets[0]->error > 0) {
941                        store_first_packet(libtrace, packets[0], t);
942                }
943        }
944        ASSERT_RET(pthread_mutex_unlock(&libtrace->read_packet_lock), == 0);
945        /* XXX TODO this needs to be inband with packets, or we don't bother in this case
946        if (tick_hit) {
947                libtrace_message_t tick;
948                tick.additional.uint64 = trace_packet_get_order(packets[i]);
949                tick.code = MESSAGE_TICK;
950                trace_send_message_to_perpkts(libtrace, &tick);
951        } */
952        return i;
953}
954
955/**
956 * For the case that we have a dedicated hasher thread
957 * 1. We read a packet from our buffer
958 * 2. Move that into the packet provided (packet)
959 */
960inline static int trace_pread_packet_hasher_thread(libtrace_t *libtrace,
961                                                   libtrace_thread_t *t,
962                                                   libtrace_packet_t *packets[],
963                                                   size_t nb_packets) {
964        size_t i;
965
966        /* We store the last error message here */
967        if (t->format_data) {
968                return ((libtrace_packet_t *)t->format_data)->error;
969        }
970
971        // Always grab at least one
972        if (packets[0]) // Recycle the old get the new
973                libtrace_ocache_free(&libtrace->packet_freelist, (void **) packets, 1, 1);
974        packets[0] = libtrace_ringbuffer_read(&t->rbuffer);
975
976        if (packets[0]->error <= 0 && packets[0]->error != READ_TICK) {
977                return packets[0]->error;
978        }
979
980        for (i = 1; i < nb_packets; i++) {
981                if (packets[i]) // Recycle the old get the new
982                        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packets[i], 1, 1);
983                if (!libtrace_ringbuffer_try_read(&t->rbuffer, (void **) &packets[i])) {
984                        packets[i] = NULL;
985                        break;
986                }
987
988                /* We will return an error or EOF the next time around */
989                if (packets[i]->error <= 0 && packets[0]->error != READ_TICK) {
990                        /* The message case will be checked automatically -
991                           However other cases like EOF and error will only be
992                           sent once*/
993                        if (packets[i]->error != READ_MESSAGE) {
994                                assert(t->format_data == NULL);
995                                t->format_data = packets[i];
996                        }
997                        break;
998                }
999        }
1000
1001        return i;
1002}
1003
1004/**
1005 * For the first packet of each queue we keep a copy and note the system
1006 * time it was received at.
1007 *
1008 * This is used for finding the first packet when playing back a trace
1009 * in trace time. And can be used by real time applications to print
1010 * results out every XXX seconds.
1011 */
1012void store_first_packet(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t)
1013{
1014
1015        libtrace_message_t mesg = {0, {.uint64=0}, NULL};
1016        struct timeval tv;
1017        libtrace_packet_t * dup;
1018
1019        if (t->recorded_first) {
1020                return;
1021        }
1022
1023        if (IS_LIBTRACE_META_PACKET(packet)) {
1024                return;
1025        }
1026
1027        /* We mark system time against a copy of the packet */
1028        gettimeofday(&tv, NULL);
1029        dup = trace_copy_packet(packet);
1030
1031        ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
1032        libtrace->first_packets.packets[t->perpkt_num].packet = dup;
1033        memcpy(&libtrace->first_packets.packets[t->perpkt_num].tv, &tv, sizeof(tv));
1034        libtrace->first_packets.count++;
1035
1036        /* Now update the first */
1037        if (libtrace->first_packets.count == 1) {
1038                /* We the first entry hence also the first known packet */
1039                libtrace->first_packets.first = t->perpkt_num;
1040        } else {
1041                /* Check if we are newer than the previous 'first' packet */
1042                size_t first = libtrace->first_packets.first;
1043                struct timeval cur_ts = trace_get_timeval(dup);
1044                struct timeval first_ts = trace_get_timeval(libtrace->first_packets.packets[first].packet);
1045                if (timercmp(&cur_ts, &first_ts, <))
1046                        libtrace->first_packets.first = t->perpkt_num;
1047        }
1048        ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
1049
1050        mesg.code = MESSAGE_FIRST_PACKET;
1051        trace_message_reporter(libtrace, &mesg);
1052        trace_message_perpkts(libtrace, &mesg);
1053        t->recorded_first = true;
1054}
1055
1056DLLEXPORT int trace_get_first_packet(libtrace_t *libtrace,
1057                                     libtrace_thread_t *t,
1058                                     const libtrace_packet_t **packet,
1059                                     const struct timeval **tv)
1060{
1061        void * tmp;
1062        int ret = 0;
1063
1064        if (t) {
1065                if (t->type != THREAD_PERPKT || t->trace != libtrace)
1066                        return -1;
1067        }
1068
1069        /* Throw away these which we don't use */
1070        if (!packet)
1071                packet = (const libtrace_packet_t **) &tmp;
1072        if (!tv)
1073                tv = (const struct timeval **) &tmp;
1074
1075        ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
1076        if (t) {
1077                /* Get the requested thread */
1078                *packet = libtrace->first_packets.packets[t->perpkt_num].packet;
1079                *tv = &libtrace->first_packets.packets[t->perpkt_num].tv;
1080        } else if (libtrace->first_packets.count) {
1081                /* Get the first packet across all threads */
1082                *packet = libtrace->first_packets.packets[libtrace->first_packets.first].packet;
1083                *tv = &libtrace->first_packets.packets[libtrace->first_packets.first].tv;
1084                if (libtrace->first_packets.count == (size_t) libtrace->perpkt_thread_count) {
1085                        ret = 1;
1086                } else {
1087                        struct timeval curr_tv;
1088                        // If a second has passed since the first entry we will assume this is the very first packet
1089                        gettimeofday(&curr_tv, NULL);
1090                        if (curr_tv.tv_sec > (*tv)->tv_sec) {
1091                                if(curr_tv.tv_usec > (*tv)->tv_usec || curr_tv.tv_sec - (*tv)->tv_sec > 1) {
1092                                        ret = 1;
1093                                }
1094                        }
1095                }
1096        } else {
1097                *packet = NULL;
1098                *tv = NULL;
1099        }
1100        ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
1101        return ret;
1102}
1103
1104
1105DLLEXPORT uint64_t tv_to_usec(const struct timeval *tv)
1106{
1107        return (uint64_t) tv->tv_sec*1000000ull + (uint64_t) tv->tv_usec;
1108}
1109
1110inline static struct timeval usec_to_tv(uint64_t usec)
1111{
1112        struct timeval tv;
1113        tv.tv_sec = usec / 1000000;
1114        tv.tv_usec = usec % 1000000;
1115        return tv;
1116}
1117
1118/** Similar to delay_tracetime but send messages to all threads periodically */
1119static void* reporter_entry(void *data) {
1120        libtrace_message_t message = {0, {.uint64=0}, NULL};
1121        libtrace_t *trace = (libtrace_t *)data;
1122        libtrace_thread_t *t = &trace->reporter_thread;
1123
1124        /* Wait until all threads are started */
1125        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
1126        if (trace->state == STATE_ERROR) {
1127                thread_change_state(trace, t, THREAD_FINISHED, false);
1128                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1129                pthread_exit(NULL);
1130        }
1131        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1132
1133        if (trace->format->pregister_thread) {
1134                trace->format->pregister_thread(trace, t, false);
1135        }
1136
1137        send_message(trace, t, MESSAGE_STARTING, (libtrace_generic_t){0}, t);
1138        send_message(trace, t, MESSAGE_RESUMING, (libtrace_generic_t){0}, t);
1139
1140        while (!trace_has_finished(trace)) {
1141                if (trace->config.reporter_polling) {
1142                        if (libtrace_message_queue_try_get(&t->messages, &message) == LIBTRACE_MQ_FAILED)
1143                                message.code = MESSAGE_POST_REPORTER;
1144                } else {
1145                        libtrace_message_queue_get(&t->messages, &message);
1146                }
1147                switch (message.code) {
1148                        // Check for results
1149                        case MESSAGE_POST_REPORTER:
1150                                trace->combiner.read(trace, &trace->combiner);
1151                                break;
1152                        case MESSAGE_DO_PAUSE:
1153                                assert(trace->combiner.pause);
1154                                trace->combiner.pause(trace, &trace->combiner);
1155                                send_message(trace, t, MESSAGE_PAUSING,
1156                                                (libtrace_generic_t) {0}, t);
1157                                trace_thread_pause(trace, t);
1158                                send_message(trace, t, MESSAGE_RESUMING,
1159                                                (libtrace_generic_t) {0}, t);
1160                                break;
1161                default:
1162                        send_message(trace, t, message.code, message.data,
1163                                        message.sender);
1164                }
1165        }
1166
1167        // Flush out whats left now all our threads have finished
1168        trace->combiner.read_final(trace, &trace->combiner);
1169
1170        // GOODBYE
1171        send_message(trace, t, MESSAGE_PAUSING,(libtrace_generic_t) {0}, t);
1172        send_message(trace, t, MESSAGE_STOPPING,(libtrace_generic_t) {0}, t);
1173
1174        thread_change_state(trace, &trace->reporter_thread, THREAD_FINISHED, true);
1175        print_memory_stats();
1176        return NULL;
1177}
1178
1179/** Similar to delay_tracetime but send messages to all threads periodically */
1180static void* keepalive_entry(void *data) {
1181        struct timeval prev, next;
1182        libtrace_message_t message = {0, {.uint64=0}, NULL};
1183        libtrace_t *trace = (libtrace_t *)data;
1184        uint64_t next_release;
1185        libtrace_thread_t *t = &trace->keepalive_thread;
1186
1187        /* Wait until all threads are started */
1188        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
1189        if (trace->state == STATE_ERROR) {
1190                thread_change_state(trace, t, THREAD_FINISHED, false);
1191                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1192                pthread_exit(NULL);
1193        }
1194        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1195
1196        gettimeofday(&prev, NULL);
1197        message.code = MESSAGE_TICK_INTERVAL;
1198
1199        while (trace->state != STATE_FINISHED) {
1200                fd_set rfds;
1201                next_release = tv_to_usec(&prev) + (trace->config.tick_interval * 1000);
1202                gettimeofday(&next, NULL);
1203                if (next_release > tv_to_usec(&next)) {
1204                        next = usec_to_tv(next_release - tv_to_usec(&next));
1205                        // Wait for timeout or a message
1206                        FD_ZERO(&rfds);
1207                        FD_SET(libtrace_message_queue_get_fd(&t->messages), &rfds);
1208                        if (select(libtrace_message_queue_get_fd(&t->messages)+1, &rfds, NULL, NULL, &next) == 1) {
1209                                libtrace_message_t msg;
1210                                libtrace_message_queue_get(&t->messages, &msg);
1211                                assert(msg.code == MESSAGE_DO_STOP);
1212                                goto done;
1213                        }
1214                }
1215                prev = usec_to_tv(next_release);
1216                if (trace->state == STATE_RUNNING) {
1217                        message.data.uint64 = ((((uint64_t)prev.tv_sec) << 32) +
1218                                               (((uint64_t)prev.tv_usec << 32)/1000000));
1219                        trace_message_perpkts(trace, &message);
1220                }
1221        }
1222done:
1223
1224        thread_change_state(trace, t, THREAD_FINISHED, true);
1225        return NULL;
1226}
1227
1228/**
1229 * Delays a packets playback so the playback will be in trace time.
1230 * This may break early if a message becomes available.
1231 *
1232 * Requires the first packet for this thread to be received.
1233 * @param libtrace  The trace
1234 * @param packet    The packet to delay
1235 * @param t         The current thread
1236 * @return Either READ_MESSAGE(-2) or 0 is successful
1237 */
1238static inline int delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t) {
1239        struct timeval curr_tv, pkt_tv;
1240        uint64_t next_release = t->tracetime_offset_usec;
1241        uint64_t curr_usec;
1242
1243        if (!t->tracetime_offset_usec) {
1244                const libtrace_packet_t *first_pkt;
1245                const struct timeval *sys_tv;
1246                int64_t initial_offset;
1247                int stable = trace_get_first_packet(libtrace, NULL, &first_pkt, &sys_tv);
1248                if (!first_pkt)
1249                        return 0;
1250                pkt_tv = trace_get_timeval(first_pkt);
1251                initial_offset = (int64_t)tv_to_usec(sys_tv) - (int64_t)tv_to_usec(&pkt_tv);
1252                /* In the unlikely case offset is 0, change it to 1 */
1253                if (stable)
1254                        t->tracetime_offset_usec = initial_offset ? initial_offset: 1;
1255                next_release = initial_offset;
1256        }
1257        /* next_release == offset */
1258        pkt_tv = trace_get_timeval(packet);
1259        next_release += tv_to_usec(&pkt_tv);
1260        gettimeofday(&curr_tv, NULL);
1261        curr_usec = tv_to_usec(&curr_tv);
1262        if (next_release > curr_usec) {
1263                int ret, mesg_fd = libtrace_message_queue_get_fd(&t->messages);
1264                struct timeval delay_tv = usec_to_tv(next_release-curr_usec);
1265                fd_set rfds;
1266                FD_ZERO(&rfds);
1267                FD_SET(mesg_fd, &rfds);
1268                // We need to wait
1269                ret = select(mesg_fd+1, &rfds, NULL, NULL, &delay_tv);
1270                if (ret == 0) {
1271                        return 0;
1272                } else if (ret > 0) {
1273                        return READ_MESSAGE;
1274                } else {
1275                        assert(!"trace_delay_packet: Unexpected return from select");
1276                }
1277        }
1278        return 0;
1279}
1280
1281/* Discards packets that don't match the filter.
1282 * Discarded packets are emptied and then moved to the end of the packet list.
1283 *
1284 * @param trace       The trace format, containing the filter
1285 * @param packets     An array of packets
1286 * @param nb_packets  The number of valid items in packets
1287 *
1288 * @return The number of packets that passed the filter, which are moved to
1289 *          the start of the packets array
1290 */
1291static inline size_t filter_packets(libtrace_t *trace,
1292                                    libtrace_packet_t **packets,
1293                                    size_t nb_packets) {
1294        size_t offset = 0;
1295        size_t i;
1296
1297        for (i = 0; i < nb_packets; ++i) {
1298                // The filter needs the trace attached to receive the link type
1299                packets[i]->trace = trace;
1300                if (trace_apply_filter(trace->filter, packets[i])) {
1301                        libtrace_packet_t *tmp;
1302                        tmp = packets[offset];
1303                        packets[offset++] = packets[i];
1304                        packets[i] = tmp;
1305                } else {
1306                        trace_fin_packet(packets[i]);
1307                }
1308        }
1309
1310        return offset;
1311}
1312
1313/* Read a batch of packets from the trace into a buffer.
1314 * Note that this function will block until a packet is read (or EOF is reached)
1315 *
1316 * @param libtrace    The trace
1317 * @param t           The thread
1318 * @param packets     An array of packets
1319 * @param nb_packets  The number of empty packets in packets
1320 * @return The number of packets read, 0 on EOF (or an error/message -1,-2).
1321 */
1322static int trace_pread_packet_wrapper(libtrace_t *libtrace,
1323                                      libtrace_thread_t *t,
1324                                      libtrace_packet_t *packets[],
1325                                      size_t nb_packets) {
1326        int i;
1327        assert(nb_packets);
1328        assert(libtrace && "libtrace is NULL in trace_read_packet()");
1329        if (trace_is_err(libtrace))
1330                return -1;
1331        if (!libtrace->started) {
1332                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
1333                              "You must call libtrace_start() before trace_read_packet()\n");
1334                return -1;
1335        }
1336
1337        if (libtrace->format->pread_packets) {
1338                int ret;
1339                for (i = 0; i < (int) nb_packets; ++i) {
1340                        assert(i[packets]);
1341                        if (!(packets[i]->buf_control==TRACE_CTRL_PACKET ||
1342                              packets[i]->buf_control==TRACE_CTRL_EXTERNAL)) {
1343                                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,
1344                                              "Packet passed to trace_read_packet() is invalid\n");
1345                                return -1;
1346                        }
1347                }
1348                do {
1349                        ret=libtrace->format->pread_packets(libtrace, t,
1350                                                            packets,
1351                                                            nb_packets);
1352                        /* Error, EOF or message? */
1353                        if (ret <= 0) {
1354                                return ret;
1355                        }
1356
1357                        if (libtrace->filter) {
1358                                int remaining;
1359                                remaining = filter_packets(libtrace,
1360                                                           packets, ret);
1361                                t->filtered_packets += ret - remaining;
1362                                ret = remaining;
1363                        }
1364                        for (i = 0; i < ret; ++i) {
1365                                /* We do not mark the packet against the trace,
1366                                 * before hand or after. After breaks DAG meta
1367                                 * packets and before is inefficient */
1368                                //packets[i]->trace = libtrace;
1369                                /* TODO IN FORMAT?? Like traditional libtrace */
1370                                if (libtrace->snaplen>0)
1371                                        trace_set_capture_length(packets[i],
1372                                                        libtrace->snaplen);
1373                        }
1374                } while(ret == 0);
1375                return ret;
1376        }
1377        trace_set_err(libtrace, TRACE_ERR_UNSUPPORTED,
1378                      "This format does not support reading packets\n");
1379        return ~0U;
1380}
1381
1382/* Restarts a parallel trace, this is called from trace_pstart.
1383 * The libtrace lock is held upon calling this function.
1384 * Typically with a parallel trace the threads are not
1385 * killed rather.
1386 */
1387static int trace_prestart(libtrace_t * libtrace, void *global_blob,
1388                          libtrace_callback_set_t *per_packet_cbs, 
1389                          libtrace_callback_set_t *reporter_cbs) {
1390        int i, err = 0;
1391        if (libtrace->state != STATE_PAUSED) {
1392                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
1393                        "trace(%s) is not currently paused",
1394                              libtrace->uridata);
1395                return -1;
1396        }
1397
1398        assert(libtrace_parallel);
1399        assert(!libtrace->perpkt_thread_states[THREAD_RUNNING]);
1400
1401        /* Reset first packets */
1402        pthread_spin_lock(&libtrace->first_packets.lock);
1403        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1404                assert(!!libtrace->perpkt_threads[i].recorded_first == !!libtrace->first_packets.packets[i].packet);
1405                if (libtrace->first_packets.packets[i].packet) {
1406                        trace_destroy_packet(libtrace->first_packets.packets[i].packet);
1407                        libtrace->first_packets.packets[i].packet = NULL;
1408                        libtrace->first_packets.packets[i].tv.tv_sec = 0;
1409                        libtrace->first_packets.packets[i].tv.tv_usec = 0;
1410                        libtrace->first_packets.count--;
1411                        libtrace->perpkt_threads[i].recorded_first = false;
1412                }
1413        }
1414        assert(libtrace->first_packets.count == 0);
1415        libtrace->first_packets.first = 0;
1416        pthread_spin_unlock(&libtrace->first_packets.lock);
1417
1418        /* Reset delay */
1419        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1420                libtrace->perpkt_threads[i].tracetime_offset_usec = 0;
1421        }
1422
1423        /* Reset statistics */
1424        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1425                libtrace->perpkt_threads[i].accepted_packets = 0;
1426                libtrace->perpkt_threads[i].filtered_packets = 0;
1427        }
1428        libtrace->accepted_packets = 0;
1429        libtrace->filtered_packets = 0;
1430
1431        /* Update functions if requested */
1432        if(global_blob)
1433                libtrace->global_blob = global_blob;
1434
1435        if (per_packet_cbs) {
1436                if (libtrace->perpkt_cbs)
1437                        trace_destroy_callback_set(libtrace->perpkt_cbs);
1438                libtrace->perpkt_cbs = trace_create_callback_set();
1439                memcpy(libtrace->perpkt_cbs, per_packet_cbs, 
1440                                sizeof(libtrace_callback_set_t));
1441        }
1442
1443        if (reporter_cbs) {
1444                if (libtrace->reporter_cbs)
1445                        trace_destroy_callback_set(libtrace->reporter_cbs);
1446
1447                libtrace->reporter_cbs = trace_create_callback_set();
1448                memcpy(libtrace->reporter_cbs, reporter_cbs, 
1449                                sizeof(libtrace_callback_set_t));
1450        }
1451
1452        if (trace_is_parallel(libtrace)) {
1453                err = libtrace->format->pstart_input(libtrace);
1454        } else {
1455                if (libtrace->format->start_input) {
1456                        err = libtrace->format->start_input(libtrace);
1457                }
1458        }
1459
1460        if (err == 0) {
1461                libtrace->started = true;
1462                libtrace_change_state(libtrace, STATE_RUNNING, false);
1463        }
1464        return err;
1465}
1466
1467/**
1468 * @return the number of CPU cores on the machine. -1 if unknown.
1469 */
1470SIMPLE_FUNCTION static int get_nb_cores() {
1471        int numCPU;
1472#ifdef _SC_NPROCESSORS_ONLN
1473        /* Most systems do this now */
1474        numCPU = sysconf(_SC_NPROCESSORS_ONLN);
1475
1476#else
1477        int mib[] = {CTL_HW, HW_AVAILCPU};
1478        size_t len = sizeof(numCPU);
1479
1480        /* get the number of CPUs from the system */
1481        sysctl(mib, 2, &numCPU, &len, NULL, 0);
1482#endif
1483        return numCPU <= 0 ? 1 : numCPU;
1484}
1485
1486/**
1487 * Verifies the configuration and sets default values for any values not
1488 * specified by the user.
1489 */
1490static void verify_configuration(libtrace_t *libtrace) {
1491
1492        if (libtrace->config.hasher_queue_size <= 0)
1493                libtrace->config.hasher_queue_size = 1000;
1494
1495        if (libtrace->config.perpkt_threads <= 0) {
1496                libtrace->perpkt_thread_count = get_nb_cores();
1497                if (libtrace->perpkt_thread_count <= 0)
1498                        // Lets just use one
1499                        libtrace->perpkt_thread_count = 1;
1500        } else {
1501                libtrace->perpkt_thread_count = libtrace->config.perpkt_threads;
1502        }
1503
1504        if (libtrace->config.reporter_thold <= 0)
1505                libtrace->config.reporter_thold = 100;
1506        if (libtrace->config.burst_size <= 0)
1507                libtrace->config.burst_size = 32;
1508        if (libtrace->config.thread_cache_size <= 0)
1509                libtrace->config.thread_cache_size = 64;
1510        if (libtrace->config.cache_size <= 0)
1511                libtrace->config.cache_size = (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count;
1512
1513        if (libtrace->config.cache_size <
1514                (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count)
1515                fprintf(stderr, "WARNING deadlocks may occur and extra memory allocating buffer sizes (packet_freelist_size) mismatched\n");
1516
1517        if (libtrace->combiner.initialise == NULL && libtrace->combiner.publish == NULL)
1518                libtrace->combiner = combiner_unordered;
1519
1520        /* Figure out if we are using a dedicated hasher thread? */
1521        if (libtrace->hasher && libtrace->perpkt_thread_count > 1) {
1522                libtrace->hasher_thread.type = THREAD_HASHER;
1523        }
1524}
1525
1526/**
1527 * Starts a libtrace_thread, including allocating memory for messaging.
1528 * Threads are expected to wait until the libtrace look is released.
1529 * Hence why we don't init structures until later.
1530 *
1531 * @param trace The trace the thread is associated with
1532 * @param t The thread that is filled when the thread is started
1533 * @param type The type of thread
1534 * @param start_routine The entry location of the thread
1535 * @param perpkt_num The perpkt thread number (should be set -1 if not perpkt)
1536 * @param name For debugging purposes set the threads name (Optional)
1537 *
1538 * @return 0 on success or -1 upon error in which case the libtrace error is set.
1539 *         In this situation the thread structure is zeroed.
1540 */
1541static int trace_start_thread(libtrace_t *trace,
1542                       libtrace_thread_t *t,
1543                       enum thread_types type,
1544                       void *(*start_routine) (void *),
1545                       int perpkt_num,
1546                       const char *name) {
1547#ifdef __linux__
1548        pthread_attr_t attrib;
1549        cpu_set_t cpus;
1550        int i;
1551#endif
1552        int ret;
1553        assert(t->type == THREAD_EMPTY);
1554        t->trace = trace;
1555        t->ret = NULL;
1556        t->user_data = NULL;
1557        t->type = type;
1558        t->state = THREAD_RUNNING;
1559
1560        assert(name);
1561
1562#ifdef __linux__
1563        CPU_ZERO(&cpus);
1564        for (i = 0; i < get_nb_cores(); i++)
1565                CPU_SET(i, &cpus);
1566        pthread_attr_init(&attrib);
1567        pthread_attr_setaffinity_np(&attrib, sizeof(cpus), &cpus);
1568        ret = pthread_create(&t->tid, &attrib, start_routine, (void *) trace);
1569        pthread_attr_destroy(&attrib);
1570#else
1571        ret = pthread_create(&t->tid, NULL, start_routine, (void *) trace);
1572#endif
1573        if (ret != 0) {
1574                libtrace_zero_thread(t);
1575                trace_set_err(trace, ret, "Failed to create a thread of type=%d\n", type);
1576                return -1;
1577        }
1578        libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
1579        if (trace_has_dedicated_hasher(trace) && type == THREAD_PERPKT) {
1580                libtrace_ringbuffer_init(&t->rbuffer,
1581                                         trace->config.hasher_queue_size,
1582                                         trace->config.hasher_polling?
1583                                                 LIBTRACE_RINGBUFFER_POLLING:
1584                                                 LIBTRACE_RINGBUFFER_BLOCKING);
1585        }
1586#if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__)
1587        if(name)
1588                pthread_setname_np(t->tid, name);
1589#endif
1590        t->perpkt_num = perpkt_num;
1591        return 0;
1592}
1593
1594/** Parses the environment variable LIBTRACE_CONF into the supplied
1595 * configuration structure.
1596 *
1597 * @param[in,out] libtrace The trace from which we determine the URI and set
1598 * the configuration.
1599 *
1600 * We search for 3 environment variables and apply them to the config in the
1601 * following order. Such that the first has the lowest priority.
1602 *
1603 * 1. LIBTRACE_CONF, The global environment configuration
1604 * 2. LIBTRACE_CONF_<FORMAT>, Applied to a given format
1605 * 3. LIBTRACE_CONF_<FORMAT_URI>, Applied the specified trace
1606 *
1607 * E.g.
1608 * - int:eth0 would match LIBTRACE_CONF, LIBTRACE_CONF_INT, LIBTRACE_CONF_INT_ETH0
1609 * - dag:/dev/dag0,0 would match LIBTRACE_CONF, LIBTRACE_CONF_DAG, LIBTRACE_CONF_DAG__DEV_DAG0_0
1610 * - test.erf would match LIBTRACE_CONF, LIBTRACE_CONF_ERF, LIBTRACE_CONF_ERF_TEST_ERF
1611 *
1612 * @note All environment variables names MUST only contian
1613 * [A-Z], [0-9] and [_] (underscore) and not start with a number. Any characters
1614 * outside of this range should be captilised if possible or replaced with an
1615 * underscore.
1616 */
1617static void parse_env_config (libtrace_t *libtrace) {
1618        char env_name[1024] = "LIBTRACE_CONF_";
1619        size_t len = strlen(env_name);
1620        size_t mark = 0;
1621        size_t i;
1622        char * env;
1623
1624        /* Make our compound string */
1625        strncpy(&env_name[len], libtrace->format->name, sizeof(env_name) - len);
1626        len += strlen(libtrace->format->name);
1627        strncpy(&env_name[len], ":", sizeof(env_name) - len);
1628        len += 1;
1629        strncpy(&env_name[len], libtrace->uridata, sizeof(env_name) - len);
1630
1631        /* env names are allowed to be A-Z (CAPS) 0-9 and _ */
1632        for (i = 0; env_name[i] != 0; ++i) {
1633                env_name[i] = toupper(env_name[i]);
1634                if(env_name[i] == ':') {
1635                        mark = i;
1636                }
1637                if (!( (env_name[i] >= 'A' && env_name[i] <= 'Z') ||
1638                       (env_name[i] >= '0' && env_name[i] <= '9') )) {
1639                        env_name[i] = '_';
1640                }
1641        }
1642
1643        /* First apply global env settings LIBTRACE_CONF */
1644        env = getenv("LIBTRACE_CONF");
1645        if (env)
1646        {
1647                printf("Got env %s", env);
1648                trace_set_configuration(libtrace, env);
1649        }
1650
1651        /* Then format settings LIBTRACE_CONF_<FORMAT> */
1652        if (mark != 0) {
1653                env_name[mark] = 0;
1654                env = getenv(env_name);
1655                if (env) {
1656                        trace_set_configuration(libtrace, env);
1657                }
1658                env_name[mark] = '_';
1659        }
1660
1661        /* Finally this specific trace LIBTRACE_CONF_<FORMAT_URI> */
1662        env = getenv(env_name);
1663        if (env) {
1664                trace_set_configuration(libtrace, env);
1665        }
1666}
1667
1668DLLEXPORT bool trace_is_parallel(libtrace_t * libtrace) {
1669        if (libtrace->state == STATE_NEW)
1670                return trace_supports_parallel(libtrace);
1671        return libtrace->pread == trace_pread_packet_wrapper;
1672}
1673
1674DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob,
1675                           libtrace_callback_set_t *per_packet_cbs,
1676                           libtrace_callback_set_t *reporter_cbs) {
1677        int i;
1678        int ret = -1;
1679        char name[24];
1680        sigset_t sig_before, sig_block_all;
1681        assert(libtrace);
1682
1683        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1684        if (trace_is_err(libtrace)) {
1685                goto cleanup_none;
1686        }
1687
1688        if (libtrace->state == STATE_PAUSED) {
1689                ret = trace_prestart(libtrace, global_blob, per_packet_cbs, 
1690                                reporter_cbs);
1691                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1692                return ret;
1693        }
1694
1695        if (libtrace->state != STATE_NEW) {
1696                trace_set_err(libtrace, TRACE_ERR_BAD_STATE, "trace_pstart "
1697                              "should be called on a NEW or PAUSED trace but "
1698                              "instead was called from %s",
1699                              get_trace_state_name(libtrace->state));
1700                goto cleanup_none;
1701        }
1702
1703        /* Store the user defined things against the trace */
1704        libtrace->global_blob = global_blob;
1705
1706        /* Save a copy of the callbacks in case the user tries to change them
1707         * on us later */
1708        if (!per_packet_cbs) {
1709                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "trace_pstart "
1710                                "requires a non-NULL set of per packet "
1711                                "callbacks.");
1712                goto cleanup_none;
1713        }
1714
1715        if (per_packet_cbs->message_packet == NULL) {
1716                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "The per "
1717                                "packet callbacks must include a handler "
1718                                "for a packet. Please set this using "
1719                                "trace_set_packet_cb().");
1720                goto cleanup_none;
1721        }
1722
1723        libtrace->perpkt_cbs = trace_create_callback_set();
1724        memcpy(libtrace->perpkt_cbs, per_packet_cbs, sizeof(libtrace_callback_set_t));
1725       
1726        if (reporter_cbs) {
1727                libtrace->reporter_cbs = trace_create_callback_set();
1728                memcpy(libtrace->reporter_cbs, reporter_cbs, sizeof(libtrace_callback_set_t));
1729        }
1730
1731       
1732
1733
1734        /* And zero other fields */
1735        for (i = 0; i < THREAD_STATE_MAX; ++i) {
1736                libtrace->perpkt_thread_states[i] = 0;
1737        }
1738        libtrace->first_packets.first = 0;
1739        libtrace->first_packets.count = 0;
1740        libtrace->first_packets.packets = NULL;
1741        libtrace->perpkt_threads = NULL;
1742        /* Set a global which says we are using a parallel trace. This is
1743         * for backwards compatibility due to changes when destroying packets */
1744        libtrace_parallel = 1;
1745
1746        /* Parses configuration passed through environment variables */
1747        parse_env_config(libtrace);
1748        verify_configuration(libtrace);
1749
1750        ret = -1;
1751        /* Try start the format - we prefer parallel over single threaded, as
1752         * these formats should support messages better */
1753
1754        if (trace_supports_parallel(libtrace) &&
1755            !trace_has_dedicated_hasher(libtrace)) {
1756                ret = libtrace->format->pstart_input(libtrace);
1757                libtrace->pread = trace_pread_packet_wrapper;
1758        }
1759        if (ret != 0) {
1760                if (libtrace->format->start_input) {
1761                        ret = libtrace->format->start_input(libtrace);
1762                }
1763                if (libtrace->perpkt_thread_count > 1) {
1764                        libtrace->pread = trace_pread_packet_first_in_first_served;
1765                        /* Don't wait for a burst of packets if the format is
1766                         * live as this could block ring based formats and
1767                         * introduces delay. */
1768                        if (libtrace->format->info.live) {
1769                                libtrace->config.burst_size = 1;
1770                        }
1771                }
1772                else {
1773                        /* Use standard read_packet */
1774                        libtrace->pread = NULL;
1775                }
1776        }
1777
1778        if (ret != 0) {
1779                goto cleanup_none;
1780        }
1781
1782        /* --- Start all the threads we need --- */
1783        /* Disable signals because it is inherited by the threads we start */
1784        sigemptyset(&sig_block_all);
1785        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_block_all, &sig_before), == 0);
1786
1787        /* If we need a hasher thread start it
1788         * Special Case: If single threaded we don't need a hasher
1789         */
1790        if (trace_has_dedicated_hasher(libtrace)) {
1791                libtrace->hasher_thread.type = THREAD_EMPTY;
1792                ret = trace_start_thread(libtrace, &libtrace->hasher_thread,
1793                                   THREAD_HASHER, hasher_entry, -1,
1794                                   "hasher-thread");
1795                if (ret != 0)
1796                        goto cleanup_started;
1797                libtrace->pread = trace_pread_packet_hasher_thread;
1798        } else {
1799                libtrace->hasher_thread.type = THREAD_EMPTY;
1800        }
1801
1802        /* Start up our perpkt threads */
1803        libtrace->perpkt_threads = calloc(sizeof(libtrace_thread_t),
1804                                          libtrace->perpkt_thread_count);
1805        if (!libtrace->perpkt_threads) {
1806                trace_set_err(libtrace, errno, "trace_pstart "
1807                              "failed to allocate memory.");
1808                goto cleanup_threads;
1809        }
1810        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1811                snprintf(name, sizeof(name), "perpkt-%d", i);
1812                libtrace_zero_thread(&libtrace->perpkt_threads[i]);
1813                ret = trace_start_thread(libtrace, &libtrace->perpkt_threads[i],
1814                                   THREAD_PERPKT, perpkt_threads_entry, i,
1815                                   name);
1816                if (ret != 0)
1817                        goto cleanup_threads;
1818        }
1819
1820        /* Start the reporter thread */
1821        if (reporter_cbs) {
1822                if (libtrace->combiner.initialise)
1823                        libtrace->combiner.initialise(libtrace, &libtrace->combiner);
1824                ret = trace_start_thread(libtrace, &libtrace->reporter_thread,
1825                                   THREAD_REPORTER, reporter_entry, -1,
1826                                   "reporter_thread");
1827                if (ret != 0)
1828                        goto cleanup_threads;
1829        }
1830
1831        /* Start the keepalive thread */
1832        if (libtrace->config.tick_interval > 0) {
1833                ret = trace_start_thread(libtrace, &libtrace->keepalive_thread,
1834                                   THREAD_KEEPALIVE, keepalive_entry, -1,
1835                                   "keepalive_thread");
1836                if (ret != 0)
1837                        goto cleanup_threads;
1838        }
1839
1840        /* Init other data structures */
1841        libtrace->perpkt_thread_states[THREAD_RUNNING] = libtrace->perpkt_thread_count;
1842        ASSERT_RET(pthread_spin_init(&libtrace->first_packets.lock, 0), == 0);
1843        libtrace->first_packets.packets = calloc(libtrace->perpkt_thread_count,
1844                                                 sizeof(*libtrace->first_packets.packets));
1845        if (libtrace->first_packets.packets == NULL) {
1846                trace_set_err(libtrace, errno, "trace_pstart "
1847                              "failed to allocate memory.");
1848                goto cleanup_threads;
1849        }
1850
1851        if (libtrace_ocache_init(&libtrace->packet_freelist,
1852                             (void* (*)()) trace_create_packet,
1853                             (void (*)(void *))trace_destroy_packet,
1854                             libtrace->config.thread_cache_size,
1855                             libtrace->config.cache_size * 4,
1856                             libtrace->config.fixed_count) != 0) {
1857                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "trace_pstart "
1858                              "failed to allocate ocache.");
1859                goto cleanup_threads;
1860        }
1861
1862        /* Threads don't start */
1863        libtrace->started = true;
1864        libtrace_change_state(libtrace, STATE_RUNNING, false);
1865
1866        ret = 0;
1867        goto success;
1868cleanup_threads:
1869        if (libtrace->first_packets.packets) {
1870                free(libtrace->first_packets.packets);
1871                libtrace->first_packets.packets = NULL;
1872        }
1873        libtrace_change_state(libtrace, STATE_ERROR, false);
1874        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1875        if (libtrace->hasher_thread.type == THREAD_HASHER) {
1876                pthread_join(libtrace->hasher_thread.tid, NULL);
1877                libtrace_zero_thread(&libtrace->hasher_thread);
1878        }
1879
1880        if (libtrace->perpkt_threads) {
1881                for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1882                        if (libtrace->perpkt_threads[i].type == THREAD_PERPKT) {
1883                                pthread_join(libtrace->perpkt_threads[i].tid, NULL);
1884                                libtrace_zero_thread(&libtrace->perpkt_threads[i]);
1885                        } else break;
1886                }
1887                free(libtrace->perpkt_threads);
1888                libtrace->perpkt_threads = NULL;
1889        }
1890
1891        if (libtrace->reporter_thread.type == THREAD_REPORTER) {
1892                pthread_join(libtrace->reporter_thread.tid, NULL);
1893                libtrace_zero_thread(&libtrace->reporter_thread);
1894        }
1895
1896        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
1897                pthread_join(libtrace->keepalive_thread.tid, NULL);
1898                libtrace_zero_thread(&libtrace->keepalive_thread);
1899        }
1900        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1901        libtrace_change_state(libtrace, STATE_NEW, false);
1902        assert(libtrace->perpkt_thread_states[THREAD_RUNNING] == 0);
1903        libtrace->perpkt_thread_states[THREAD_FINISHED] = 0;
1904cleanup_started:
1905        if (libtrace->pread == trace_pread_packet_wrapper) {
1906                if (libtrace->format->ppause_input)
1907                        libtrace->format->ppause_input(libtrace);
1908        } else {
1909                if (libtrace->format->pause_input)
1910                        libtrace->format->pause_input(libtrace);
1911        }
1912        ret = -1;
1913success:
1914        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_before, NULL), == 0);
1915cleanup_none:
1916        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1917        return ret;
1918}
1919
1920DLLEXPORT int trace_set_starting_cb(libtrace_callback_set_t *cbset,
1921                fn_cb_starting handler) {
1922        cbset->message_starting = handler;
1923        return 0;
1924}
1925
1926DLLEXPORT int trace_set_pausing_cb(libtrace_callback_set_t *cbset,
1927                fn_cb_dataless handler) {
1928        cbset->message_pausing = handler;
1929        return 0;
1930}
1931
1932DLLEXPORT int trace_set_resuming_cb(libtrace_callback_set_t *cbset,
1933                fn_cb_dataless handler) {
1934        cbset->message_resuming = handler;
1935        return 0;
1936}
1937
1938DLLEXPORT int trace_set_stopping_cb(libtrace_callback_set_t *cbset,
1939                fn_cb_dataless handler) {
1940        cbset->message_stopping = handler;
1941        return 0;
1942}
1943
1944DLLEXPORT int trace_set_packet_cb(libtrace_callback_set_t *cbset,
1945                fn_cb_packet handler) {
1946        cbset->message_packet = handler;
1947        return 0;
1948}
1949
1950DLLEXPORT int trace_set_first_packet_cb(libtrace_callback_set_t *cbset,
1951                fn_cb_first_packet handler) {
1952        cbset->message_first_packet = handler;
1953        return 0;
1954}
1955
1956DLLEXPORT int trace_set_tick_count_cb(libtrace_callback_set_t *cbset,
1957                fn_cb_tick handler) {
1958        cbset->message_tick_count = handler;
1959        return 0;
1960}
1961
1962DLLEXPORT int trace_set_tick_interval_cb(libtrace_callback_set_t *cbset,
1963                fn_cb_tick handler) {
1964        cbset->message_tick_interval = handler;
1965        return 0;
1966}
1967
1968DLLEXPORT int trace_set_result_cb(libtrace_callback_set_t *cbset,
1969                fn_cb_result handler) {
1970        cbset->message_result = handler;
1971        return 0;
1972}
1973
1974DLLEXPORT int trace_set_user_message_cb(libtrace_callback_set_t *cbset,
1975                fn_cb_usermessage handler) {
1976        cbset->message_user = handler;
1977        return 0;
1978}
1979
1980/*
1981 * Pauses a trace, this should only be called by the main thread
1982 * 1. Set started = false
1983 * 2. All perpkt threads are paused waiting on a condition var
1984 * 3. Then call ppause on the underlying format if found
1985 * 4. The traces state is paused
1986 *
1987 * Once done you should be able to modify the trace setup and call pstart again
1988 * TODO add support to change the number of threads.
1989 */
1990DLLEXPORT int trace_ppause(libtrace_t *libtrace)
1991{
1992        libtrace_thread_t *t;
1993        int i;
1994        assert(libtrace);
1995
1996        t = get_thread_table(libtrace);
1997        // Check state from within the lock if we are going to change it
1998        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1999
2000        /* If we are already paused, just treat this as a NOOP */
2001        if (libtrace->state == STATE_PAUSED) {
2002                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2003                return 0;
2004        }
2005        if (!libtrace->started || libtrace->state != STATE_RUNNING) {
2006                trace_set_err(libtrace,TRACE_ERR_BAD_STATE, "You must call trace_start() before calling trace_ppause()");
2007                return -1;
2008        }
2009
2010        libtrace_change_state(libtrace, STATE_PAUSING, false);
2011        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2012
2013        // Special case handle the hasher thread case
2014        if (trace_has_dedicated_hasher(libtrace)) {
2015                if (libtrace->config.debug_state)
2016                        fprintf(stderr, "Hasher thread is running, asking it to pause ...");
2017                libtrace_message_t message = {0, {.uint64=0}, NULL};
2018                message.code = MESSAGE_DO_PAUSE;
2019                trace_message_thread(libtrace, &libtrace->hasher_thread, &message);
2020                // Wait for it to pause
2021                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
2022                while (libtrace->hasher_thread.state == THREAD_RUNNING) {
2023                        ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
2024                }
2025                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2026                if (libtrace->config.debug_state)
2027                        fprintf(stderr, " DONE\n");
2028        }
2029
2030        if (libtrace->config.debug_state)
2031                fprintf(stderr, "Asking perpkt threads to pause ...");
2032        // Stop threads, skip this one if it's a perpkt
2033        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2034                if (&libtrace->perpkt_threads[i] != t) {
2035                        libtrace_message_t message = {0, {.uint64=0}, NULL};
2036                        message.code = MESSAGE_DO_PAUSE;
2037                        ASSERT_RET(trace_message_thread(libtrace, &libtrace->perpkt_threads[i], &message), != -1);
2038                        if(trace_has_dedicated_hasher(libtrace)) {
2039                                // The hasher has stopped and other threads have messages waiting therefore
2040                                // If the queues are empty the other threads would have no data
2041                                // So send some message packets to simply ask the threads to check
2042                                // We are the only writer since hasher has paused
2043                                libtrace_packet_t *pkt;
2044                                libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &pkt, 1, 1);
2045                                pkt->error = READ_MESSAGE;
2046                                libtrace_ringbuffer_write(&libtrace->perpkt_threads[i].rbuffer, pkt);
2047                        }
2048                } else {
2049                        fprintf(stderr, "Mapper threads should not be used to pause a trace this could cause any number of problems!!\n");
2050                }
2051        }
2052
2053        if (t) {
2054                // A perpkt is doing the pausing, interesting, fake an extra thread paused
2055                // We rely on the user to *not* return before starting the trace again
2056                thread_change_state(libtrace, t, THREAD_PAUSED, true);
2057        }
2058
2059        // Wait for all threads to pause
2060        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
2061        while(libtrace->perpkt_thread_states[THREAD_RUNNING]) {
2062                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
2063        }
2064        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2065
2066        if (libtrace->config.debug_state)
2067                fprintf(stderr, " DONE\n");
2068
2069        // Deal with the reporter
2070        if (trace_has_reporter(libtrace)) {
2071                if (libtrace->config.debug_state)
2072                        fprintf(stderr, "Reporter thread is running, asking it to pause ...");
2073                if (pthread_equal(pthread_self(), libtrace->reporter_thread.tid)) {
2074                        libtrace->combiner.pause(libtrace, &libtrace->combiner);
2075                        thread_change_state(libtrace, &libtrace->reporter_thread, THREAD_PAUSED, true);
2076               
2077                } else {
2078                        libtrace_message_t message = {0, {.uint64=0}, NULL};
2079                        message.code = MESSAGE_DO_PAUSE;
2080                        trace_message_thread(libtrace, &libtrace->reporter_thread, &message);
2081                        // Wait for it to pause
2082                        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
2083                        while (libtrace->reporter_thread.state == THREAD_RUNNING) {
2084                                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
2085                        }
2086                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2087                }
2088                if (libtrace->config.debug_state)
2089                        fprintf(stderr, " DONE\n");
2090        }
2091
2092        /* Cache values before we pause */
2093        if (libtrace->stats == NULL)
2094                libtrace->stats = trace_create_statistics();
2095        // Save the statistics against the trace
2096        trace_get_statistics(libtrace, NULL);
2097        if (trace_is_parallel(libtrace)) {
2098                libtrace->started = false;
2099                if (libtrace->format->ppause_input)
2100                        libtrace->format->ppause_input(libtrace);
2101                // TODO What happens if we don't have pause input??
2102        } else {
2103                int err;
2104                err = trace_pause(libtrace);
2105                // We should handle this a bit better
2106                if (err)
2107                        return err;
2108        }
2109
2110        // Only set as paused after the pause has been called on the trace
2111        libtrace_change_state(libtrace, STATE_PAUSED, true);
2112        return 0;
2113}
2114
2115/**
2116 * Stop trace finish prematurely as though it meet an EOF
2117 * This should only be called by the main thread
2118 * 1. Calls ppause
2119 * 2. Sends a message asking for threads to finish
2120 * 3. Releases threads which will pause
2121 */
2122DLLEXPORT int trace_pstop(libtrace_t *libtrace)
2123{
2124        int i, err;
2125        libtrace_message_t message = {0, {.uint64=0}, NULL};
2126        assert(libtrace);
2127
2128        // Ensure all threads have paused and the underlying trace format has
2129        // been closed and all packets associated are cleaned up
2130        // Pause will do any state checks for us
2131        err = trace_ppause(libtrace);
2132        if (err)
2133                return err;
2134
2135        // Now send a message asking the threads to stop
2136        // This will be retrieved before trying to read another packet
2137
2138        message.code = MESSAGE_DO_STOP;
2139        trace_message_perpkts(libtrace, &message);
2140        if (trace_has_dedicated_hasher(libtrace))
2141                trace_message_thread(libtrace, &libtrace->hasher_thread, &message);
2142
2143        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2144                trace_message_thread(libtrace, &libtrace->perpkt_threads[i], &message);
2145        }
2146
2147        /* Now release the threads and let them stop - when the threads finish
2148         * the state will be set to finished */
2149        libtrace_change_state(libtrace, STATE_FINISHING, true);
2150        return 0;
2151}
2152
2153DLLEXPORT int trace_set_hasher(libtrace_t *trace, enum hasher_types type, fn_hasher hasher, void *data) {
2154        int ret = -1;
2155        if ((type == HASHER_CUSTOM && !hasher) || (type == HASHER_BALANCE && hasher)) {
2156                return -1;
2157        }
2158
2159        // Save the requirements
2160        trace->hasher_type = type;
2161        if (hasher) {
2162                if (trace->hasher_owner == HASH_OWNED_LIBTRACE) {
2163                        if (trace->hasher_data) {
2164                                free(trace->hasher_data);
2165                        }
2166                }
2167                trace->hasher = hasher;
2168                trace->hasher_data = data;
2169                trace->hasher_owner = HASH_OWNED_EXTERNAL;
2170        } else {
2171                trace->hasher = NULL;
2172                trace->hasher_data = NULL;
2173                trace->hasher_owner = HASH_OWNED_LIBTRACE;
2174        }
2175
2176        // Try push this to hardware - NOTE hardware could do custom if
2177        // there is a more efficient way to apply it, in this case
2178        // it will simply grab the function out of libtrace_t
2179        if (trace_supports_parallel(trace) && trace->format->config_input)
2180                ret = trace->format->config_input(trace, TRACE_OPTION_HASHER, &type);
2181
2182        if (ret == -1) {
2183                /* We have to deal with this ourself */
2184                if (!hasher) {
2185                        switch (type)
2186                        {
2187                                case HASHER_CUSTOM:
2188                                case HASHER_BALANCE:
2189                                        return 0;
2190                                case HASHER_BIDIRECTIONAL:
2191                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
2192                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
2193                                        toeplitz_init_config(trace->hasher_data, 1);
2194                                        return 0;
2195                                case HASHER_UNIDIRECTIONAL:
2196                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
2197                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
2198                                        toeplitz_init_config(trace->hasher_data, 0);
2199                                        return 0;
2200                        }
2201                        return -1;
2202                }
2203        } else {
2204                /* If the hasher is hardware we zero out the hasher and hasher
2205                 * data fields - only if we need a hasher do we do this */
2206                trace->hasher = NULL;
2207                trace->hasher_data = NULL;
2208        }
2209
2210        return 0;
2211}
2212
2213// Waits for all threads to finish
2214DLLEXPORT void trace_join(libtrace_t *libtrace) {
2215        int i;
2216
2217        /* Firstly wait for the perpkt threads to finish, since these are
2218         * user controlled */
2219        for (i=0; i< libtrace->perpkt_thread_count; i++) {
2220                ASSERT_RET(pthread_join(libtrace->perpkt_threads[i].tid, NULL), == 0);
2221                // So we must do our best effort to empty the queue - so
2222                // the producer (or any other threads) don't block.
2223                libtrace_packet_t * packet;
2224                assert(libtrace->perpkt_threads[i].state == THREAD_FINISHED);
2225                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
2226                        if (packet) // This could be NULL iff the perpkt finishes early
2227                                trace_destroy_packet(packet);
2228        }
2229
2230        /* Now the hasher */
2231        if (trace_has_dedicated_hasher(libtrace)) {
2232                pthread_join(libtrace->hasher_thread.tid, NULL);
2233                assert(libtrace->hasher_thread.state == THREAD_FINISHED);
2234        }
2235
2236        // Now that everything is finished nothing can be touching our
2237        // buffers so clean them up
2238        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2239                // Its possible 1 packet got added by the reporter (or 1 per any other thread) since we cleaned up
2240                // if they lost timeslice before-during a write
2241                libtrace_packet_t * packet;
2242                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
2243                        trace_destroy_packet(packet);
2244                if (trace_has_dedicated_hasher(libtrace)) {
2245                        assert(libtrace_ringbuffer_is_empty(&libtrace->perpkt_threads[i].rbuffer));
2246                        libtrace_ringbuffer_destroy(&libtrace->perpkt_threads[i].rbuffer);
2247                }
2248                // Cannot destroy vector yet, this happens with trace_destroy
2249        }
2250
2251        if (trace_has_reporter(libtrace)) {
2252                pthread_join(libtrace->reporter_thread.tid, NULL);
2253                assert(libtrace->reporter_thread.state == THREAD_FINISHED);
2254        }
2255
2256        // Wait for the tick (keepalive) thread if it has been started
2257        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
2258                libtrace_message_t msg = {0, {.uint64=0}, NULL};
2259                msg.code = MESSAGE_DO_STOP;
2260                trace_message_thread(libtrace, &libtrace->keepalive_thread, &msg);
2261                pthread_join(libtrace->keepalive_thread.tid, NULL);
2262        }
2263
2264        libtrace_change_state(libtrace, STATE_JOINED, true);
2265        print_memory_stats();
2266}
2267
2268DLLEXPORT int libtrace_thread_get_message_count(libtrace_t * libtrace,
2269                                                libtrace_thread_t *t)
2270{
2271        int ret;
2272        if (t == NULL)
2273                t = get_thread_descriptor(libtrace);
2274        if (t == NULL)
2275                return -1;
2276        ret = libtrace_message_queue_count(&t->messages);
2277        return ret < 0 ? 0 : ret;
2278}
2279
2280DLLEXPORT int libtrace_thread_get_message(libtrace_t * libtrace,
2281                                          libtrace_thread_t *t,
2282                                          libtrace_message_t * message)
2283{
2284        int ret;
2285        if (t == NULL)
2286                t = get_thread_descriptor(libtrace);
2287        if (t == NULL)
2288                return -1;
2289        ret = libtrace_message_queue_get(&t->messages, message);
2290        return ret < 0 ? 0 : ret;
2291}
2292
2293DLLEXPORT int libtrace_thread_try_get_message(libtrace_t * libtrace,
2294                                              libtrace_thread_t *t,
2295                                              libtrace_message_t * message)
2296{
2297        if (t == NULL)
2298                t = get_thread_descriptor(libtrace);
2299        if (t == NULL)
2300                return -1;
2301        if (libtrace_message_queue_try_get(&t->messages, message) != LIBTRACE_MQ_FAILED)
2302                return 0;
2303        else
2304                return -1;
2305}
2306
2307DLLEXPORT int trace_message_thread(libtrace_t * libtrace, libtrace_thread_t *t, libtrace_message_t * message)
2308{
2309        int ret;
2310        if (!message->sender)
2311                message->sender = get_thread_descriptor(libtrace);
2312
2313        ret = libtrace_message_queue_put(&t->messages, message);
2314        return ret < 0 ? 0 : ret;
2315}
2316
2317DLLEXPORT int trace_message_reporter(libtrace_t * libtrace, libtrace_message_t * message)
2318{
2319        if (!trace_has_reporter(libtrace) ||
2320            !(libtrace->reporter_thread.state == THREAD_RUNNING
2321              || libtrace->reporter_thread.state == THREAD_PAUSED))
2322                return -1;
2323
2324        return trace_message_thread(libtrace, &libtrace->reporter_thread, message);
2325}
2326
2327DLLEXPORT int trace_post_reporter(libtrace_t *libtrace)
2328{
2329        libtrace_message_t message = {0, {.uint64=0}, NULL};
2330        message.code = MESSAGE_POST_REPORTER;
2331        return trace_message_reporter(libtrace, (void *) &message);
2332}
2333
2334DLLEXPORT int trace_message_perpkts(libtrace_t * libtrace, libtrace_message_t * message)
2335{
2336        int i;
2337        int missed = 0;
2338        if (message->sender == NULL)
2339                message->sender = get_thread_descriptor(libtrace);
2340        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2341                if (libtrace->perpkt_threads[i].state == THREAD_RUNNING ||
2342                    libtrace->perpkt_threads[i].state == THREAD_PAUSED) {
2343                        libtrace_message_queue_put(&libtrace->perpkt_threads[i].messages, message);
2344                } else {
2345                        missed += 1;
2346                }
2347        }
2348        return -missed;
2349}
2350
2351/**
2352 * Publishes a result to the reduce queue
2353 * Should only be called by a perpkt thread, i.e. from a perpkt handler
2354 */
2355DLLEXPORT void trace_publish_result(libtrace_t *libtrace, libtrace_thread_t *t, uint64_t key, libtrace_generic_t value, int type) {
2356        libtrace_result_t res;
2357        res.type = type;
2358        res.key = key;
2359        res.value = value;
2360        assert(libtrace->combiner.publish);
2361        libtrace->combiner.publish(libtrace, t->perpkt_num, &libtrace->combiner, &res);
2362        return;
2363}
2364
2365DLLEXPORT void trace_set_combiner(libtrace_t *trace, const libtrace_combine_t *combiner, libtrace_generic_t config){
2366        if (combiner) {
2367                trace->combiner = *combiner;
2368                trace->combiner.configuration = config;
2369        } else {
2370                // No combiner, so don't try use it
2371                memset(&trace->combiner, 0, sizeof(trace->combiner));
2372        }
2373}
2374
2375DLLEXPORT uint64_t trace_packet_get_order(libtrace_packet_t * packet) {
2376        return packet->order;
2377}
2378
2379DLLEXPORT uint64_t trace_packet_get_hash(libtrace_packet_t * packet) {
2380        return packet->hash;
2381}
2382
2383DLLEXPORT void trace_packet_set_order(libtrace_packet_t * packet, uint64_t order) {
2384        packet->order = order;
2385}
2386
2387DLLEXPORT void trace_packet_set_hash(libtrace_packet_t * packet, uint64_t hash) {
2388        packet->hash = hash;
2389}
2390
2391DLLEXPORT bool trace_has_finished(libtrace_t * libtrace) {
2392        return libtrace->state == STATE_FINISHED || libtrace->state == STATE_JOINED;
2393}
2394
2395/**
2396 * @return True if the trace is not running such that it can be configured
2397 */
2398static inline bool trace_is_configurable(libtrace_t *trace) {
2399        return trace->state == STATE_NEW ||
2400                        trace->state == STATE_PAUSED;
2401}
2402
2403DLLEXPORT int trace_set_perpkt_threads(libtrace_t *trace, int nb) {
2404        if (!trace_is_configurable(trace)) return -1;
2405
2406        /* TODO consider allowing an offset from the total number of cores i.e.
2407         * -1 reserve 1 core */
2408        if (nb >= 0) {
2409                trace->config.perpkt_threads = nb;
2410                return 0;
2411        } else {
2412                return -1;
2413        }
2414}
2415
2416DLLEXPORT int trace_set_tick_interval(libtrace_t *trace, size_t millisec) {
2417        if (!trace_is_configurable(trace)) return -1;
2418
2419        trace->config.tick_interval = millisec;
2420        return 0;
2421}
2422
2423DLLEXPORT int trace_set_tick_count(libtrace_t *trace, size_t count) {
2424        if (!trace_is_configurable(trace)) return -1;
2425
2426        trace->config.tick_count = count;
2427        return 0;
2428}
2429
2430DLLEXPORT int trace_set_tracetime(libtrace_t *trace, bool tracetime) {
2431        if (!trace_is_configurable(trace)) return -1;
2432
2433        trace->tracetime = tracetime;
2434        return 0;
2435}
2436
2437DLLEXPORT int trace_set_cache_size(libtrace_t *trace, size_t size) {
2438        if (!trace_is_configurable(trace)) return -1;
2439
2440        trace->config.cache_size = size;
2441        return 0;
2442}
2443
2444DLLEXPORT int trace_set_thread_cache_size(libtrace_t *trace, size_t size) {
2445        if (!trace_is_configurable(trace)) return -1;
2446
2447        trace->config.thread_cache_size = size;
2448        return 0;
2449}
2450
2451DLLEXPORT int trace_set_fixed_count(libtrace_t *trace, bool fixed) {
2452        if (!trace_is_configurable(trace)) return -1;
2453
2454        trace->config.fixed_count = fixed;
2455        return 0;
2456}
2457
2458DLLEXPORT int trace_set_burst_size(libtrace_t *trace, size_t size) {
2459        if (!trace_is_configurable(trace)) return -1;
2460
2461        trace->config.burst_size = size;
2462        return 0;
2463}
2464
2465DLLEXPORT int trace_set_hasher_queue_size(libtrace_t *trace, size_t size) {
2466        if (!trace_is_configurable(trace)) return -1;
2467
2468        trace->config.hasher_queue_size = size;
2469        return 0;
2470}
2471
2472DLLEXPORT int trace_set_hasher_polling(libtrace_t *trace, bool polling) {
2473        if (!trace_is_configurable(trace)) return -1;
2474
2475        trace->config.hasher_polling = polling;
2476        return 0;
2477}
2478
2479DLLEXPORT int trace_set_reporter_polling(libtrace_t *trace, bool polling) {
2480        if (!trace_is_configurable(trace)) return -1;
2481
2482        trace->config.reporter_polling = polling;
2483        return 0;
2484}
2485
2486DLLEXPORT int trace_set_reporter_thold(libtrace_t *trace, size_t thold) {
2487        if (!trace_is_configurable(trace)) return -1;
2488
2489        trace->config.reporter_thold = thold;
2490        return 0;
2491}
2492
2493DLLEXPORT int trace_set_debug_state(libtrace_t *trace, bool debug_state) {
2494        if (!trace_is_configurable(trace)) return -1;
2495
2496        trace->config.debug_state = debug_state;
2497        return 0;
2498}
2499
2500static bool config_bool_parse(char *value, size_t nvalue) {
2501        if (strncmp(value, "true", nvalue) == 0)
2502                return true;
2503        else if (strncmp(value, "false", nvalue) == 0)
2504                return false;
2505        else
2506                return strtoll(value, NULL, 10) != 0;
2507}
2508
2509/* Note update documentation on trace_set_configuration */
2510static void config_string(struct user_configuration *uc, char *key, size_t nkey, char *value, size_t nvalue) {
2511        assert(key);
2512        assert(value);
2513        assert(uc);
2514        if (strncmp(key, "cache_size", nkey) == 0
2515            || strncmp(key, "cs", nkey) == 0) {
2516                uc->cache_size = strtoll(value, NULL, 10);
2517        } else if (strncmp(key, "thread_cache_size", nkey) == 0
2518                   || strncmp(key, "tcs", nkey) == 0) {
2519                uc->thread_cache_size = strtoll(value, NULL, 10);
2520        } else if (strncmp(key, "fixed_count", nkey) == 0
2521                   || strncmp(key, "fc", nkey) == 0) {
2522                uc->fixed_count = config_bool_parse(value, nvalue);
2523        } else if (strncmp(key, "burst_size", nkey) == 0
2524                   || strncmp(key, "bs", nkey) == 0) {
2525                uc->burst_size = strtoll(value, NULL, 10);
2526        } else if (strncmp(key, "tick_interval", nkey) == 0
2527                   || strncmp(key, "ti", nkey) == 0) {
2528                uc->tick_interval = strtoll(value, NULL, 10);
2529        } else if (strncmp(key, "tick_count", nkey) == 0
2530                   || strncmp(key, "tc", nkey) == 0) {
2531                uc->tick_count = strtoll(value, NULL, 10);
2532        } else if (strncmp(key, "perpkt_threads", nkey) == 0
2533                   || strncmp(key, "pt", nkey) == 0) {
2534                uc->perpkt_threads = strtoll(value, NULL, 10);
2535        } else if (strncmp(key, "hasher_queue_size", nkey) == 0
2536                   || strncmp(key, "hqs", nkey) == 0) {
2537                uc->hasher_queue_size = strtoll(value, NULL, 10);
2538        } else if (strncmp(key, "hasher_polling", nkey) == 0
2539                   || strncmp(key, "hp", nkey) == 0) {
2540                uc->hasher_polling = config_bool_parse(value, nvalue);
2541        } else if (strncmp(key, "reporter_polling", nkey) == 0
2542                   || strncmp(key, "rp", nkey) == 0) {
2543                uc->reporter_polling = config_bool_parse(value, nvalue);
2544        } else if (strncmp(key, "reporter_thold", nkey) == 0
2545                   || strncmp(key, "rt", nkey) == 0) {
2546                uc->reporter_thold = strtoll(value, NULL, 10);
2547        } else if (strncmp(key, "debug_state", nkey) == 0
2548                   || strncmp(key, "ds", nkey) == 0) {
2549                uc->debug_state = config_bool_parse(value, nvalue);
2550        } else {
2551                fprintf(stderr, "No matching option %s(=%s), ignoring\n", key, value);
2552        }
2553}
2554
2555DLLEXPORT int trace_set_configuration(libtrace_t *trace, const char *str) {
2556        char *pch;
2557        char key[100];
2558        char value[100];
2559        char *dup;
2560        assert(str);
2561        assert(trace);
2562
2563        if (!trace_is_configurable(trace)) return -1;
2564
2565        dup = strdup(str);
2566        pch = strtok (dup," ,.-");
2567        while (pch != NULL)
2568        {
2569                if (sscanf(pch, "%99[^=]=%99s", key, value) == 2) {
2570                        config_string(&trace->config, key, sizeof(key), value, sizeof(value));
2571                } else {
2572                        fprintf(stderr, "Error parsing option %s\n", pch);
2573                }
2574                pch = strtok (NULL," ,.-");
2575        }
2576        free(dup);
2577
2578        return 0;
2579}
2580
2581DLLEXPORT int trace_set_configuration_file(libtrace_t *trace, FILE *file) {
2582        char line[1024];
2583        if (!trace_is_configurable(trace)) return -1;
2584
2585        while (fgets(line, sizeof(line), file) != NULL)
2586        {
2587                trace_set_configuration(trace, line);
2588        }
2589
2590        if(ferror(file))
2591                return -1;
2592        else
2593                return 0;
2594}
2595
2596DLLEXPORT void trace_free_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
2597        assert(packet);
2598        /* Always release any resources this might be holding */
2599        trace_fin_packet(packet);
2600        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packet, 1, 1);
2601}
2602
2603DLLEXPORT void trace_increment_packet_refcount(libtrace_packet_t *packet) {
2604        pthread_mutex_lock(&(packet->ref_lock));
2605        if (packet->refcount < 0) {
2606                packet->refcount = 1;
2607        } else {
2608                packet->refcount ++;
2609        }
2610        pthread_mutex_unlock(&(packet->ref_lock));
2611}
2612
2613DLLEXPORT void trace_decrement_packet_refcount(libtrace_packet_t *packet) {
2614        pthread_mutex_lock(&(packet->ref_lock));
2615        packet->refcount --;
2616
2617        if (packet->refcount <= 0) {
2618                trace_free_packet(packet->trace, packet);
2619        }
2620        pthread_mutex_unlock(&(packet->ref_lock));
2621}
2622
2623
2624DLLEXPORT libtrace_info_t *trace_get_information(libtrace_t * libtrace) {
2625        if (libtrace->format)
2626                return &libtrace->format->info;
2627        else
2628                return NULL;
2629}
Note: See TracBrowser for help on using the repository browser.