source: lib/trace_parallel.c @ 5e3f16c

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivendag_formatrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 5e3f16c was 5e3f16c, checked in by Richard Sanger <rsanger@…>, 4 years ago

Fix for issue #39 - ring and int pstop() fails on older kernels when using threads

The problem here is that on old kernels without PACKET_FANOUT support
(added in v3.1) will only include the single threaded versions of int
and ring. When used with multiple threads the libtrace API will
fallback to using read rather than pread which does not check message
queues.

To fix this issue, in any format without pread support:

  • We check for new messages with each loop around read_packet as we fill the burst
  • Within read_packet we update the halt to include the pausing state
  • Use a seperate lock to the main lock when reading a burst of packets, otherwise trace_ppause has to wait for a burst to read.

This is not 100% perfect as a single packet might still need to be received
before a generic message can be received.
A proper fix in the future would be to move all format internals purely to the
parallel API.

  • Property mode set to 100644
File size: 82.4 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27#define _GNU_SOURCE
28#include "common.h"
29#include "config.h"
30#include <assert.h>
31#include <errno.h>
32#include <fcntl.h>
33#include <stdio.h>
34#include <stdlib.h>
35#include <string.h>
36#include <sys/stat.h>
37#include <sys/types.h>
38#ifndef WIN32
39#include <sys/socket.h>
40#endif
41#include <stdarg.h>
42#include <sys/param.h>
43
44#ifdef HAVE_LIMITS_H
45#  include <limits.h>
46#endif
47
48#ifdef HAVE_SYS_LIMITS_H
49#  include <sys/limits.h>
50#endif
51
52#ifdef HAVE_NET_IF_ARP_H
53#  include <net/if_arp.h>
54#endif
55
56#ifdef HAVE_NET_IF_H
57#  include <net/if.h>
58#endif
59
60#ifdef HAVE_NETINET_IN_H
61#  include <netinet/in.h>
62#endif
63
64#ifdef HAVE_NET_ETHERNET_H
65#  include <net/ethernet.h>
66#endif
67
68#ifdef HAVE_NETINET_IF_ETHER_H
69#  include <netinet/if_ether.h>
70#endif
71
72#include <time.h>
73#ifdef WIN32
74#include <sys/timeb.h>
75#endif
76
77#include "libtrace.h"
78#include "libtrace_parallel.h"
79
80#ifdef HAVE_NET_BPF_H
81#  include <net/bpf.h>
82#else
83#  ifdef HAVE_PCAP_BPF_H
84#    include <pcap-bpf.h>
85#  endif
86#endif
87
88
89#include "libtrace_int.h"
90#include "format_helper.h"
91#include "rt_protocol.h"
92#include "hash_toeplitz.h"
93
94#include <pthread.h>
95#include <signal.h>
96#include <unistd.h>
97#include <ctype.h>
98
99static inline int delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t);
100extern int libtrace_parallel;
101
102struct mem_stats {
103        struct memfail {
104           uint64_t cache_hit;
105           uint64_t ring_hit;
106           uint64_t miss;
107           uint64_t recycled;
108        } readbulk, read, write, writebulk;
109};
110
111
112#ifdef ENABLE_MEM_STATS
113// Grrr gcc wants this spelt out
114__thread struct mem_stats mem_hits = {{0},{0},{0},{0}};
115
116
117static void print_memory_stats() {
118        uint64_t total;
119#if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__)
120        char t_name[50];
121        pthread_getname_np(pthread_self(), t_name, sizeof(t_name));
122
123        fprintf(stderr, "Thread ID#%d - %s\n", (int) pthread_self(), t_name);
124#else
125        fprintf(stderr, "Thread ID#%d\n", (int) pthread_self());
126#endif
127
128        total = mem_hits.read.cache_hit + mem_hits.read.ring_hit + mem_hits.read.miss;
129        if (total) {
130                fprintf(stderr, "\tRead:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
131                                mem_hits.read.cache_hit, mem_hits.read.ring_hit, mem_hits.read.miss, mem_hits.read.recycled);
132                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
133                                total, (double) mem_hits.read.miss / (double) total * 100.0);
134        }
135
136        total = mem_hits.readbulk.cache_hit + mem_hits.readbulk.ring_hit + mem_hits.readbulk.miss;
137        if (total) {
138                fprintf(stderr, "\tReadbulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
139                                mem_hits.readbulk.cache_hit, mem_hits.readbulk.ring_hit, mem_hits.readbulk.miss, mem_hits.readbulk.recycled);
140
141
142                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
143                                total, (double) mem_hits.readbulk.miss / (double) total * 100.0);
144        }
145
146        total = mem_hits.write.cache_hit + mem_hits.write.ring_hit + mem_hits.write.miss;
147        if (total) {
148                fprintf(stderr, "\tWrite:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
149                                mem_hits.write.cache_hit, mem_hits.write.ring_hit, mem_hits.write.miss, mem_hits.write.recycled);
150
151                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
152                                total, (double) mem_hits.write.miss / (double) total * 100.0);
153        }
154
155        total = mem_hits.writebulk.cache_hit + mem_hits.writebulk.ring_hit + mem_hits.writebulk.miss;
156        if (total) {
157                fprintf(stderr, "\tWritebulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
158                                mem_hits.writebulk.cache_hit, mem_hits.writebulk.ring_hit, mem_hits.writebulk.miss, mem_hits.writebulk.recycled);
159
160                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
161                                total, (double) mem_hits.writebulk.miss / (double) total * 100.0);
162        }
163}
164#else
165static void print_memory_stats() {}
166#endif
167
168static const libtrace_generic_t gen_zero = {0};
169
170/* This should optimise away the switch to nothing in the explict cases */
171inline void send_message(libtrace_t *trace, libtrace_thread_t *thread,
172                const enum libtrace_messages type,
173                libtrace_generic_t data, libtrace_thread_t *sender) {
174
175        fn_cb_dataless fn = NULL;
176        enum libtrace_messages switchtype;
177        libtrace_callback_set_t *cbs = NULL;
178
179        if (thread == &trace->reporter_thread) {
180                cbs = trace->reporter_cbs;
181        } else {
182                cbs = trace->perpkt_cbs;
183        }
184
185        if (cbs == NULL)
186                return;
187
188        if (type >= MESSAGE_USER)
189                switchtype = MESSAGE_USER;
190        else
191                switchtype = (enum libtrace_messages) type;
192
193        switch (switchtype) {
194        case MESSAGE_STARTING:
195                if (cbs->message_starting)
196                        thread->user_data = (*cbs->message_starting)(trace,
197                                        thread, trace->global_blob);
198                return;
199        case MESSAGE_FIRST_PACKET:
200                if (cbs->message_first_packet)
201                                (*cbs->message_first_packet)(trace, thread,
202                                trace->global_blob, thread->user_data,
203                                sender);
204                return;
205        case MESSAGE_TICK_COUNT:
206                if (cbs->message_tick_count)
207                        (*cbs->message_tick_count)(trace, thread,
208                                        trace->global_blob, thread->user_data,
209                                        data.uint64);
210                return;
211        case MESSAGE_TICK_INTERVAL:
212                if (cbs->message_tick_interval)
213                        (*cbs->message_tick_interval)(trace, thread,
214                                        trace->global_blob, thread->user_data,
215                                        data.uint64);
216                return;
217        case MESSAGE_STOPPING:
218                fn = cbs->message_stopping;
219                break;
220        case MESSAGE_RESUMING:
221                fn = cbs->message_resuming;
222                break;
223        case MESSAGE_PAUSING:
224                fn = cbs->message_pausing;
225                break;
226        case MESSAGE_USER:
227                if (cbs->message_user)
228                        (*cbs->message_user)(trace, thread, trace->global_blob,
229                                        thread->user_data, type, data, sender);
230                return;
231        case MESSAGE_RESULT:
232                if (cbs->message_result)
233                        (*cbs->message_result)(trace, thread,
234                                        trace->global_blob, thread->user_data,
235                                        data.res);
236                return;
237
238        /* These should be unused */
239        case MESSAGE_DO_PAUSE:
240        case MESSAGE_DO_STOP:
241        case MESSAGE_POST_REPORTER:
242        case MESSAGE_PACKET:
243                return;
244        }
245
246        if (fn)
247                (*fn)(trace, thread, trace->global_blob, thread->user_data);
248}
249
250DLLEXPORT void trace_destroy_callback_set(libtrace_callback_set_t *cbset) {
251        free(cbset);
252}
253
254DLLEXPORT libtrace_callback_set_t *trace_create_callback_set() {
255        libtrace_callback_set_t *cbset;
256
257        cbset = (libtrace_callback_set_t *)malloc(sizeof(libtrace_callback_set_t));
258        memset(cbset, 0, sizeof(libtrace_callback_set_t));
259        return cbset;
260}
261
262/*
263 * This can be used once the hasher thread has been started and internally after
264 * verify_configuration.
265 */
266DLLEXPORT bool trace_has_dedicated_hasher(libtrace_t * libtrace)
267{
268        return libtrace->hasher_thread.type == THREAD_HASHER;
269}
270
271DLLEXPORT bool trace_has_reporter(libtrace_t * libtrace)
272{
273        assert(libtrace->state != STATE_NEW);
274        return libtrace->reporter_thread.type == THREAD_REPORTER && libtrace->reporter_cbs;
275}
276
277/**
278 * When running the number of perpkt threads in use.
279 * TODO what if the trace is not running yet, or has finished??
280 *
281 * @brief libtrace_perpkt_thread_nb
282 * @param t The trace
283 * @return
284 */
285DLLEXPORT int trace_get_perpkt_threads(libtrace_t * t) {
286        return t->perpkt_thread_count;
287}
288
289/**
290 * Changes the overall traces state and signals the condition.
291 *
292 * @param trace A pointer to the trace
293 * @param new_state The new state of the trace
294 * @param need_lock Set to true if libtrace_lock is not held, otherwise
295 *        false in the case the lock is currently held by this thread.
296 */
297static inline void libtrace_change_state(libtrace_t *trace,
298        const enum trace_state new_state, const bool need_lock)
299{
300        UNUSED enum trace_state prev_state;
301        if (need_lock)
302                pthread_mutex_lock(&trace->libtrace_lock);
303        prev_state = trace->state;
304        trace->state = new_state;
305
306        if (trace->config.debug_state)
307                fprintf(stderr, "Trace(%s) state changed from %s to %s\n",
308                        trace->uridata, get_trace_state_name(prev_state),
309                        get_trace_state_name(trace->state));
310
311        pthread_cond_broadcast(&trace->perpkt_cond);
312        if (need_lock)
313                pthread_mutex_unlock(&trace->libtrace_lock);
314}
315
316/**
317 * Changes a thread's state and broadcasts the condition variable. This
318 * should always be done when the lock is held.
319 *
320 * Additionally for perpkt threads the state counts are updated.
321 *
322 * @param trace A pointer to the trace
323 * @param t A pointer to the thread to modify
324 * @param new_state The new state of the thread
325 * @param need_lock Set to true if libtrace_lock is not held, otherwise
326 *        false in the case the lock is currently held by this thread.
327 */
328static inline void thread_change_state(libtrace_t *trace, libtrace_thread_t *t,
329        const enum thread_states new_state, const bool need_lock)
330{
331        enum thread_states prev_state;
332        if (need_lock)
333                pthread_mutex_lock(&trace->libtrace_lock);
334        prev_state = t->state;
335        t->state = new_state;
336        if (t->type == THREAD_PERPKT) {
337                --trace->perpkt_thread_states[prev_state];
338                ++trace->perpkt_thread_states[new_state];
339        }
340
341        if (trace->config.debug_state)
342                fprintf(stderr, "Thread %d state changed from %d to %d\n",
343                        (int) t->tid, prev_state, t->state);
344
345        if (trace->perpkt_thread_states[THREAD_FINISHED] == trace->perpkt_thread_count)
346                libtrace_change_state(trace, STATE_FINISHED, false);
347
348        pthread_cond_broadcast(&trace->perpkt_cond);
349        if (need_lock)
350                pthread_mutex_unlock(&trace->libtrace_lock);
351}
352
353/**
354 * This is valid once a trace is initialised
355 *
356 * @return True if the format supports parallel threads.
357 */
358static inline bool trace_supports_parallel(libtrace_t *trace)
359{
360        assert(trace);
361        assert(trace->format);
362        if (trace->format->pstart_input)
363                return true;
364        else
365                return false;
366}
367
368void libtrace_zero_thread(libtrace_thread_t * t) {
369        t->accepted_packets = 0;
370        t->filtered_packets = 0;
371        t->recorded_first = false;
372        t->tracetime_offset_usec = 0;
373        t->user_data = 0;
374        t->format_data = 0;
375        libtrace_zero_ringbuffer(&t->rbuffer);
376        t->trace = NULL;
377        t->ret = NULL;
378        t->type = THREAD_EMPTY;
379        t->perpkt_num = -1;
380}
381
382// Ints are aligned int is atomic so safe to read and write at same time
383// However write must be locked, read doesn't (We never try read before written to table)
384libtrace_thread_t * get_thread_table(libtrace_t *libtrace) {
385        int i = 0;
386        pthread_t tid = pthread_self();
387
388        for (;i<libtrace->perpkt_thread_count ;++i) {
389                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
390                        return &libtrace->perpkt_threads[i];
391        }
392        return NULL;
393}
394
395static libtrace_thread_t * get_thread_descriptor(libtrace_t *libtrace) {
396        libtrace_thread_t *ret;
397        if (!(ret = get_thread_table(libtrace))) {
398                pthread_t tid = pthread_self();
399                // Check if we are reporter or something else
400                if (libtrace->hasher_thread.type == THREAD_REPORTER &&
401                                pthread_equal(tid, libtrace->reporter_thread.tid))
402                        ret = &libtrace->reporter_thread;
403                else if (libtrace->hasher_thread.type == THREAD_HASHER &&
404                         pthread_equal(tid, libtrace->hasher_thread.tid))
405                        ret = &libtrace->hasher_thread;
406                else
407                        ret = NULL;
408        }
409        return ret;
410}
411
412DLLEXPORT void libtrace_make_packet_safe(libtrace_packet_t *pkt) {
413        // Duplicate the packet in standard malloc'd memory and free the
414        // original, This is a 1:1 exchange so the ocache count remains unchanged.
415        if (pkt->buf_control != TRACE_CTRL_PACKET) {
416                libtrace_packet_t *dup;
417                dup = trace_copy_packet(pkt);
418                /* Release the external buffer */
419                trace_fin_packet(pkt);
420                /* Copy the duplicated packet over the existing */
421                memcpy(pkt, dup, sizeof(libtrace_packet_t));
422                /* Free the packet structure */
423                free(dup);
424        }
425}
426
427/**
428 * Makes a libtrace_result_t safe, used when pausing a trace.
429 * This will call libtrace_make_packet_safe if the result is
430 * a packet.
431 */
432DLLEXPORT void libtrace_make_result_safe(libtrace_result_t *res) {
433        if (res->type == RESULT_PACKET) {
434                libtrace_make_packet_safe(res->value.pkt);
435        }
436}
437
438/**
439 * Holds threads in a paused state, until released by broadcasting
440 * the condition mutex.
441 */
442static void trace_thread_pause(libtrace_t *trace, libtrace_thread_t *t) {
443        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
444        thread_change_state(trace, t, THREAD_PAUSED, false);
445        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
446                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
447        }
448        thread_change_state(trace, t, THREAD_RUNNING, false);
449        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
450}
451
452/**
453 * Sends a packet to the user, expects either a valid packet or a TICK packet.
454 *
455 * @param trace The trace
456 * @param t The current thread
457 * @param packet A pointer to the packet storage, which may be set to null upon
458 *               return, or a packet to be finished.
459 * @param tracetime If true packets are delayed to match with tracetime
460 * @return 0 is successful, otherwise if playing back in tracetime
461 *         READ_MESSAGE(-2) can be returned in which case the packet is not sent.
462 *
463 * @note READ_MESSAGE will only be returned if tracetime is true.
464 */
465static inline int dispatch_packet(libtrace_t *trace,
466                                  libtrace_thread_t *t,
467                                  libtrace_packet_t **packet,
468                                  bool tracetime) {
469
470        if ((*packet)->error > 0) {
471                if (tracetime) {
472                        if (delay_tracetime(trace, packet[0], t) == READ_MESSAGE)
473                                return READ_MESSAGE;
474                }
475                t->accepted_packets++;
476                if (trace->perpkt_cbs->message_packet)
477                        *packet = (*trace->perpkt_cbs->message_packet)(trace, t, trace->global_blob, t->user_data, *packet);
478                trace_fin_packet(*packet);
479        } else {
480                assert((*packet)->error == READ_TICK);
481                libtrace_generic_t data = {.uint64 = trace_packet_get_order(*packet)};
482                send_message(trace, t, MESSAGE_TICK_COUNT, data, t);
483        }
484        return 0;
485}
486
487/**
488 * Sends a batch of packets to the user, expects either a valid packet or a
489 * TICK packet.
490 *
491 * @param trace The trace
492 * @param t The current thread
493 * @param packets [in,out] An array of packets, these may be null upon return
494 * @param nb_packets The total number of packets in the list
495 * @param empty [in,out] A pointer to an integer storing the first empty slot,
496 * upon return this is updated
497 * @param offset [in,out] The offset into the array, upon return this is updated
498 * @param tracetime If true packets are delayed to match with tracetime
499 * @return 0 is successful, otherwise if playing back in tracetime
500 *         READ_MESSAGE(-2) can be returned in which case the packet is not sent.
501 *
502 * @note READ_MESSAGE will only be returned if tracetime is true.
503 */
504static inline int dispatch_packets(libtrace_t *trace,
505                                  libtrace_thread_t *t,
506                                  libtrace_packet_t *packets[],
507                                  int nb_packets, int *empty, int *offset,
508                                  bool tracetime) {
509        for (;*offset < nb_packets; ++*offset) {
510                int ret;
511                ret = dispatch_packet(trace, t, &packets[*offset], tracetime);
512                if (ret == 0) {
513                        /* Move full slots to front as we go */
514                        if (packets[*offset]) {
515                                if (*empty != *offset) {
516                                        packets[*empty] = packets[*offset];
517                                        packets[*offset] = NULL;
518                                }
519                                ++*empty;
520                        }
521                } else {
522                        /* Break early */
523                        assert(ret == READ_MESSAGE);
524                        return READ_MESSAGE;
525                }
526        }
527
528        return 0;
529}
530
531/**
532 * Pauses a per packet thread, messages will not be processed when the thread
533 * is paused.
534 *
535 * This process involves reading packets if a hasher thread is used. As such
536 * this function can fail to pause due to errors when reading in which case
537 * the thread should be stopped instead.
538 *
539 *
540 * @brief trace_perpkt_thread_pause
541 * @return READ_ERROR(-1) or READ_EOF(0) or 1 if successfull
542 */
543static int trace_perpkt_thread_pause(libtrace_t *trace, libtrace_thread_t *t,
544                                     libtrace_packet_t *packets[],
545                                     int nb_packets, int *empty, int *offset) {
546        libtrace_packet_t * packet = NULL;
547
548        /* Let the user thread know we are going to pause */
549        send_message(trace, t, MESSAGE_PAUSING, gen_zero, t);
550
551        /* Send through any remaining packets (or messages) without delay */
552
553        /* First send those packets already read, as fast as possible
554         * This should never fail or check for messages etc. */
555        ASSERT_RET(dispatch_packets(trace, t, packets, nb_packets, empty,
556                                    offset, false), == 0);
557
558        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
559        /* If a hasher thread is running, empty input queues so we don't lose data */
560        if (trace_has_dedicated_hasher(trace)) {
561                // The hasher has stopped by this point, so the queue shouldn't be filling
562                while(!libtrace_ringbuffer_is_empty(&t->rbuffer) || t->format_data) {
563                        int ret = trace->pread(trace, t, &packet, 1);
564                        if (ret == 1) {
565                                if (packet->error > 0) {
566                                        store_first_packet(trace, packet, t);
567                                }
568                                ASSERT_RET(dispatch_packet(trace, t, &packet, false), == 0);
569                                if (packet == NULL)
570                                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
571                        } else if (ret != READ_MESSAGE) {
572                                /* Ignore messages we pick these up next loop */
573                                assert (ret == READ_EOF || ret == READ_ERROR);
574                                /* Verify no packets are remaining */
575                                /* TODO refactor this sanity check out!! */
576                                while (!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
577                                        ASSERT_RET(trace->pread(trace, t, &packet, 1), <= 0);
578                                        // No packets after this should have any data in them
579                                        assert(packet->error <= 0);
580                                }
581                                libtrace_ocache_free(&trace->packet_freelist, (void **) &packet, 1, 1);
582                                return -1;
583                        }
584                }
585        }
586        libtrace_ocache_free(&trace->packet_freelist, (void **) &packet, 1, 1);
587
588        /* Now we do the actual pause, this returns when we resumed */
589        trace_thread_pause(trace, t);
590        send_message(trace, t, MESSAGE_RESUMING, gen_zero, t);
591        return 1;
592}
593
594/**
595 * The is the entry point for our packet processing threads.
596 */
597static void* perpkt_threads_entry(void *data) {
598        libtrace_t *trace = (libtrace_t *)data;
599        libtrace_thread_t *t;
600        libtrace_message_t message = {0, {.uint64=0}, NULL};
601        libtrace_packet_t *packets[trace->config.burst_size];
602        size_t i;
603        //int ret;
604        /* The current reading position into the packets */
605        int offset = 0;
606        /* The number of packets last read */
607        int nb_packets = 0;
608        /* The offset to the first NULL packet upto offset */
609        int empty = 0;
610
611        /* Wait until trace_pstart has been completed */
612        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
613        t = get_thread_table(trace);
614        assert(t);
615        if (trace->state == STATE_ERROR) {
616                thread_change_state(trace, t, THREAD_FINISHED, false);
617                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
618                pthread_exit(NULL);
619        }
620        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
621
622        if (trace->format->pregister_thread) {
623                if (trace->format->pregister_thread(trace, t, 
624                                trace_is_parallel(trace)) < 0) {
625                        thread_change_state(trace, t, THREAD_FINISHED, false);
626                        pthread_exit(NULL);
627                }
628        }
629
630        /* Fill our buffer with empty packets */
631        memset(&packets, 0, sizeof(void*) * trace->config.burst_size);
632        libtrace_ocache_alloc(&trace->packet_freelist, (void **) packets,
633                              trace->config.burst_size,
634                              trace->config.burst_size);
635
636        /* ~~~~~~~~~~~ Setup complete now we loop ~~~~~~~~~~~~~~~ */
637
638        /* Let the per_packet function know we have started */
639        send_message(trace, t, MESSAGE_STARTING, gen_zero, t);
640        send_message(trace, t, MESSAGE_RESUMING, gen_zero, t);
641
642        for (;;) {
643
644                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
645                        int ret;
646                        switch (message.code) {
647                                case MESSAGE_DO_PAUSE: // This is internal
648                                        ret = trace_perpkt_thread_pause(trace, t,
649                                              packets, nb_packets, &empty, &offset);
650                                        if (ret == READ_EOF) {
651                                                goto eof;
652                                        } else if (ret == READ_ERROR) {
653                                                goto error;
654                                        }
655                                        assert(ret == 1);
656                                        continue;
657                                case MESSAGE_DO_STOP: // This is internal
658                                        goto eof;
659                        }
660                        send_message(trace, t, message.code, message.data, 
661                                        message.sender);
662                        /* Continue and the empty messages out before packets */
663                        continue;
664                }
665
666
667                /* Do we need to read a new set of packets MOST LIKELY we do */
668                if (offset == nb_packets) {
669                        /* Refill the packet buffer */
670                        if (empty != nb_packets) {
671                                // Refill the empty packets
672                                libtrace_ocache_alloc(&trace->packet_freelist,
673                                                      (void **) &packets[empty],
674                                                      nb_packets - empty,
675                                                      nb_packets - empty);
676                        }
677                        if (!trace->pread) {
678                                assert(packets[0]);
679                                nb_packets = trace_read_packet(trace, packets[0]);
680                                packets[0]->error = nb_packets;
681                                if (nb_packets > 0)
682                                        nb_packets = 1;
683                        } else {
684                                nb_packets = trace->pread(trace, t, packets, trace->config.burst_size);
685                        }
686                        offset = 0;
687                        empty = 0;
688                }
689
690                /* Handle error/message cases */
691                if (nb_packets > 0) {
692                        /* Store the first packet */
693                        if (packets[0]->error > 0) {
694                                store_first_packet(trace, packets[0], t);
695                        }
696                        dispatch_packets(trace, t, packets, nb_packets, &empty,
697                                         &offset, trace->tracetime);
698                } else {
699                        switch (nb_packets) {
700                        case READ_EOF:
701                                goto eof;
702                        case READ_ERROR:
703                                goto error;
704                        case READ_MESSAGE:
705                                nb_packets = 0;
706                                continue;
707                        default:
708                                fprintf(stderr, "Unexpected error %d!!\n", nb_packets);
709                                goto error;
710                        }
711                }
712
713        }
714
715error:
716        message.code = MESSAGE_DO_STOP;
717        message.sender = t;
718        message.data.uint64 = 0;
719        trace_message_perpkts(trace, &message);
720eof:
721        /* ~~~~~~~~~~~~~~ Trace is finished do tear down ~~~~~~~~~~~~~~~~~~~~~ */
722
723        // Let the per_packet function know we have stopped
724        send_message(trace, t, MESSAGE_PAUSING, gen_zero, t);
725        send_message(trace, t, MESSAGE_STOPPING, gen_zero, t);
726
727        // Free any remaining packets
728        for (i = 0; i < trace->config.burst_size; i++) {
729                if (packets[i]) {
730                        libtrace_ocache_free(&trace->packet_freelist, (void **) &packets[i], 1, 1);
731                        packets[i] = NULL;
732                }
733        }
734
735        thread_change_state(trace, t, THREAD_FINISHED, true);
736
737        /* Make sure the reporter sees we have finished */
738        if (trace_has_reporter(trace))
739                trace_post_reporter(trace);
740
741        // Release all ocache memory before unregistering with the format
742        // because this might(it does in DPDK) unlink the formats mempool
743        // causing destroy/finish packet to fail.
744        libtrace_ocache_unregister_thread(&trace->packet_freelist);
745        if (trace->format->punregister_thread) {
746                trace->format->punregister_thread(trace, t);
747        }
748        print_memory_stats();
749
750        pthread_exit(NULL);
751}
752
753/**
754 * The start point for our single threaded hasher thread, this will read
755 * and hash a packet from a data source and queue it against the correct
756 * core to process it.
757 */
758static void* hasher_entry(void *data) {
759        libtrace_t *trace = (libtrace_t *)data;
760        libtrace_thread_t * t;
761        int i;
762        libtrace_packet_t * packet;
763        libtrace_message_t message = {0, {.uint64=0}, NULL};
764        int pkt_skipped = 0;
765
766        assert(trace_has_dedicated_hasher(trace));
767        /* Wait until all threads are started and objects are initialised (ring buffers) */
768        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
769        t = &trace->hasher_thread;
770        assert(t->type == THREAD_HASHER && pthread_equal(pthread_self(), t->tid));
771        if (trace->state == STATE_ERROR) {
772                thread_change_state(trace, t, THREAD_FINISHED, false);
773                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
774                pthread_exit(NULL);
775        }
776        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
777
778        /* We are reading but it is not the parallel API */
779        if (trace->format->pregister_thread) {
780                trace->format->pregister_thread(trace, t, true);
781        }
782
783        /* Read all packets in then hash and queue against the correct thread */
784        while (1) {
785                int thread;
786                if (!pkt_skipped)
787                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
788                assert(packet);
789
790                // Check for messages that we expect MESSAGE_DO_PAUSE, (internal messages only)
791                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
792                        switch(message.code) {
793                                case MESSAGE_DO_PAUSE:
794                                        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
795                                        thread_change_state(trace, t, THREAD_PAUSED, false);
796                                        pthread_cond_broadcast(&trace->perpkt_cond);
797                                        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
798                                                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
799                                        }
800                                        thread_change_state(trace, t, THREAD_RUNNING, false);
801                                        pthread_cond_broadcast(&trace->perpkt_cond);
802                                        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
803                                        break;
804                                case MESSAGE_DO_STOP:
805                                        assert(trace->started == false);
806                                        assert(trace->state == STATE_FINISHED);
807                                        /* Mark the current packet as EOF */
808                                        packet->error = 0;
809                                        break;
810                                default:
811                                        fprintf(stderr, "Hasher thread didn't expect message code=%d\n", message.code);
812                        }
813                        pkt_skipped = 1;
814                        continue;
815                }
816
817                if ((packet->error = trace_read_packet(trace, packet)) <1) {
818                        if (packet->error == READ_MESSAGE) {
819                                pkt_skipped = 1;
820                                continue;
821                        } else {
822                                break; /* We are EOF or error'd either way we stop  */
823                        }
824                }
825
826                /* We are guaranteed to have a hash function i.e. != NULL */
827                trace_packet_set_hash(packet, (*trace->hasher)(packet, trace->hasher_data));
828                thread = trace_packet_get_hash(packet) % trace->perpkt_thread_count;
829                /* Blocking write to the correct queue - I'm the only writer */
830                if (trace->perpkt_threads[thread].state != THREAD_FINISHED) {
831                        uint64_t order = trace_packet_get_order(packet);
832                        libtrace_ringbuffer_write(&trace->perpkt_threads[thread].rbuffer, packet);
833                        if (trace->config.tick_count && order % trace->config.tick_count == 0) {
834                                // Write ticks to everyone else
835                                libtrace_packet_t * pkts[trace->perpkt_thread_count];
836                                memset(pkts, 0, sizeof(void *) * trace->perpkt_thread_count);
837                                libtrace_ocache_alloc(&trace->packet_freelist, (void **) pkts, trace->perpkt_thread_count, trace->perpkt_thread_count);
838                                for (i = 0; i < trace->perpkt_thread_count; i++) {
839                                        pkts[i]->error = READ_TICK;
840                                        trace_packet_set_order(pkts[i], order);
841                                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, pkts[i]);
842                                }
843                        }
844                        pkt_skipped = 0;
845                } else {
846                        assert(!"Dropping a packet!!");
847                        pkt_skipped = 1; // Reuse that packet no one read it
848                }
849        }
850
851        /* Broadcast our last failed read to all threads */
852        for (i = 0; i < trace->perpkt_thread_count; i++) {
853                libtrace_packet_t * bcast;
854                if (i == trace->perpkt_thread_count - 1) {
855                        bcast = packet;
856                } else {
857                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &bcast, 1, 1);
858                        bcast->error = packet->error;
859                }
860                ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
861                if (trace->perpkt_threads[i].state != THREAD_FINISHED) {
862                        // Unlock early otherwise we could deadlock
863                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, bcast);
864                }
865                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
866        }
867
868        // We don't need to free the packet
869        thread_change_state(trace, t, THREAD_FINISHED, true);
870
871        libtrace_ocache_unregister_thread(&trace->packet_freelist);
872        if (trace->format->punregister_thread) {
873                trace->format->punregister_thread(trace, t);
874        }
875        print_memory_stats();
876
877        // TODO remove from TTABLE t sometime
878        pthread_exit(NULL);
879}
880
881/* Our simplest case when a thread becomes ready it can obtain an exclusive
882 * lock to read packets from the underlying trace.
883 */
884static int trace_pread_packet_first_in_first_served(libtrace_t *libtrace,
885                                                    libtrace_thread_t *t,
886                                                    libtrace_packet_t *packets[],
887                                                    size_t nb_packets) {
888        size_t i = 0;
889        //bool tick_hit = false;
890
891        ASSERT_RET(pthread_mutex_lock(&libtrace->read_packet_lock), == 0);
892        /* Read nb_packets */
893        for (i = 0; i < nb_packets; ++i) {
894                if (libtrace_message_queue_count(&t->messages) > 0) {
895                        if ( i==0 ) {
896                                ASSERT_RET(pthread_mutex_unlock(&libtrace->read_packet_lock), == 0);
897                                return READ_MESSAGE;
898                        } else {
899                                break;
900                        }
901                }
902                packets[i]->error = trace_read_packet(libtrace, packets[i]);
903
904                if (packets[i]->error <= 0) {
905                        /* We'll catch this next time if we have already got packets */
906                        if ( i==0 ) {
907                                ASSERT_RET(pthread_mutex_unlock(&libtrace->read_packet_lock), == 0);
908                                return packets[i]->error;
909                        } else {
910                                break;
911                        }
912                }
913                /*
914                if (libtrace->config.tick_count && trace_packet_get_order(packets[i]) % libtrace->config.tick_count == 0) {
915                        tick_hit = true;
916                }*/
917        }
918        // Doing this inside the lock ensures the first packet is always
919        // recorded first
920        if (packets[0]->error > 0) {
921                store_first_packet(libtrace, packets[0], t);
922        }
923        ASSERT_RET(pthread_mutex_unlock(&libtrace->read_packet_lock), == 0);
924        /* XXX TODO this needs to be inband with packets, or we don't bother in this case
925        if (tick_hit) {
926                libtrace_message_t tick;
927                tick.additional.uint64 = trace_packet_get_order(packets[i]);
928                tick.code = MESSAGE_TICK;
929                trace_send_message_to_perpkts(libtrace, &tick);
930        } */
931        return i;
932}
933
934/**
935 * For the case that we have a dedicated hasher thread
936 * 1. We read a packet from our buffer
937 * 2. Move that into the packet provided (packet)
938 */
939inline static int trace_pread_packet_hasher_thread(libtrace_t *libtrace,
940                                                   libtrace_thread_t *t,
941                                                   libtrace_packet_t *packets[],
942                                                   size_t nb_packets) {
943        size_t i;
944
945        /* We store the last error message here */
946        if (t->format_data) {
947                return ((libtrace_packet_t *)t->format_data)->error;
948        }
949
950        // Always grab at least one
951        if (packets[0]) // Recycle the old get the new
952                libtrace_ocache_free(&libtrace->packet_freelist, (void **) packets, 1, 1);
953        packets[0] = libtrace_ringbuffer_read(&t->rbuffer);
954
955        if (packets[0]->error <= 0 && packets[0]->error != READ_TICK) {
956                return packets[0]->error;
957        }
958
959        for (i = 1; i < nb_packets; i++) {
960                if (packets[i]) // Recycle the old get the new
961                        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packets[i], 1, 1);
962                if (!libtrace_ringbuffer_try_read(&t->rbuffer, (void **) &packets[i])) {
963                        packets[i] = NULL;
964                        break;
965                }
966
967                /* We will return an error or EOF the next time around */
968                if (packets[i]->error <= 0 && packets[0]->error != READ_TICK) {
969                        /* The message case will be checked automatically -
970                           However other cases like EOF and error will only be
971                           sent once*/
972                        if (packets[i]->error != READ_MESSAGE) {
973                                assert(t->format_data == NULL);
974                                t->format_data = packets[i];
975                        }
976                        break;
977                }
978        }
979
980        return i;
981}
982
983/**
984 * For the first packet of each queue we keep a copy and note the system
985 * time it was received at.
986 *
987 * This is used for finding the first packet when playing back a trace
988 * in trace time. And can be used by real time applications to print
989 * results out every XXX seconds.
990 */
991void store_first_packet(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t)
992{
993        if (!t->recorded_first) {
994                libtrace_message_t mesg = {0, {.uint64=0}, NULL};
995                struct timeval tv;
996                libtrace_packet_t * dup;
997
998                /* We mark system time against a copy of the packet */
999                gettimeofday(&tv, NULL);
1000                dup = trace_copy_packet(packet);
1001
1002                ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
1003                libtrace->first_packets.packets[t->perpkt_num].packet = dup;
1004                memcpy(&libtrace->first_packets.packets[t->perpkt_num].tv, &tv, sizeof(tv));
1005                libtrace->first_packets.count++;
1006
1007                /* Now update the first */
1008                if (libtrace->first_packets.count == 1) {
1009                        /* We the first entry hence also the first known packet */
1010                        libtrace->first_packets.first = t->perpkt_num;
1011                } else {
1012                        /* Check if we are newer than the previous 'first' packet */
1013                        size_t first = libtrace->first_packets.first;
1014                        if (trace_get_seconds(dup) <
1015                                trace_get_seconds(libtrace->first_packets.packets[first].packet))
1016                                libtrace->first_packets.first = t->perpkt_num;
1017                }
1018                ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
1019
1020                mesg.code = MESSAGE_FIRST_PACKET;
1021                trace_message_reporter(libtrace, &mesg);
1022                trace_message_perpkts(libtrace, &mesg);
1023                t->recorded_first = true;
1024        }
1025}
1026
1027DLLEXPORT int trace_get_first_packet(libtrace_t *libtrace,
1028                                     libtrace_thread_t *t,
1029                                     const libtrace_packet_t **packet,
1030                                     const struct timeval **tv)
1031{
1032        void * tmp;
1033        int ret = 0;
1034
1035        if (t) {
1036                if (t->type != THREAD_PERPKT || t->trace != libtrace)
1037                        return -1;
1038        }
1039
1040        /* Throw away these which we don't use */
1041        if (!packet)
1042                packet = (const libtrace_packet_t **) &tmp;
1043        if (!tv)
1044                tv = (const struct timeval **) &tmp;
1045
1046        ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
1047        if (t) {
1048                /* Get the requested thread */
1049                *packet = libtrace->first_packets.packets[t->perpkt_num].packet;
1050                *tv = &libtrace->first_packets.packets[t->perpkt_num].tv;
1051        } else if (libtrace->first_packets.count) {
1052                /* Get the first packet across all threads */
1053                *packet = libtrace->first_packets.packets[libtrace->first_packets.first].packet;
1054                *tv = &libtrace->first_packets.packets[libtrace->first_packets.first].tv;
1055                if (libtrace->first_packets.count == (size_t) libtrace->perpkt_thread_count) {
1056                        ret = 1;
1057                } else {
1058                        struct timeval curr_tv;
1059                        // If a second has passed since the first entry we will assume this is the very first packet
1060                        gettimeofday(&curr_tv, NULL);
1061                        if (curr_tv.tv_sec > (*tv)->tv_sec) {
1062                                if(curr_tv.tv_usec > (*tv)->tv_usec || curr_tv.tv_sec - (*tv)->tv_sec > 1) {
1063                                        ret = 1;
1064                                }
1065                        }
1066                }
1067        } else {
1068                *packet = NULL;
1069                *tv = NULL;
1070        }
1071        ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
1072        return ret;
1073}
1074
1075
1076DLLEXPORT uint64_t tv_to_usec(const struct timeval *tv)
1077{
1078        return (uint64_t) tv->tv_sec*1000000ull + (uint64_t) tv->tv_usec;
1079}
1080
1081inline static struct timeval usec_to_tv(uint64_t usec)
1082{
1083        struct timeval tv;
1084        tv.tv_sec = usec / 1000000;
1085        tv.tv_usec = usec % 1000000;
1086        return tv;
1087}
1088
1089/** Similar to delay_tracetime but send messages to all threads periodically */
1090static void* reporter_entry(void *data) {
1091        libtrace_message_t message = {0, {.uint64=0}, NULL};
1092        libtrace_t *trace = (libtrace_t *)data;
1093        libtrace_thread_t *t = &trace->reporter_thread;
1094
1095        /* Wait until all threads are started */
1096        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
1097        if (trace->state == STATE_ERROR) {
1098                thread_change_state(trace, t, THREAD_FINISHED, false);
1099                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1100                pthread_exit(NULL);
1101        }
1102        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1103
1104        if (trace->format->pregister_thread) {
1105                trace->format->pregister_thread(trace, t, false);
1106        }
1107
1108        send_message(trace, t, MESSAGE_STARTING, (libtrace_generic_t){0}, t);
1109        send_message(trace, t, MESSAGE_RESUMING, (libtrace_generic_t){0}, t);
1110
1111        while (!trace_has_finished(trace)) {
1112                if (trace->config.reporter_polling) {
1113                        if (libtrace_message_queue_try_get(&t->messages, &message) == LIBTRACE_MQ_FAILED)
1114                                message.code = MESSAGE_POST_REPORTER;
1115                } else {
1116                        libtrace_message_queue_get(&t->messages, &message);
1117                }
1118                switch (message.code) {
1119                        // Check for results
1120                        case MESSAGE_POST_REPORTER:
1121                                trace->combiner.read(trace, &trace->combiner);
1122                                break;
1123                        case MESSAGE_DO_PAUSE:
1124                                assert(trace->combiner.pause);
1125                                trace->combiner.pause(trace, &trace->combiner);
1126                                send_message(trace, t, MESSAGE_PAUSING,
1127                                                (libtrace_generic_t) {0}, t);
1128                                trace_thread_pause(trace, t);
1129                                send_message(trace, t, MESSAGE_RESUMING,
1130                                                (libtrace_generic_t) {0}, t);
1131                                break;
1132                default:
1133                        send_message(trace, t, message.code, message.data,
1134                                        message.sender);
1135                }
1136        }
1137
1138        // Flush out whats left now all our threads have finished
1139        trace->combiner.read_final(trace, &trace->combiner);
1140
1141        // GOODBYE
1142        send_message(trace, t, MESSAGE_PAUSING,(libtrace_generic_t) {0}, t);
1143        send_message(trace, t, MESSAGE_STOPPING,(libtrace_generic_t) {0}, t);
1144
1145        thread_change_state(trace, &trace->reporter_thread, THREAD_FINISHED, true);
1146        print_memory_stats();
1147        return NULL;
1148}
1149
1150/** Similar to delay_tracetime but send messages to all threads periodically */
1151static void* keepalive_entry(void *data) {
1152        struct timeval prev, next;
1153        libtrace_message_t message = {0, {.uint64=0}, NULL};
1154        libtrace_t *trace = (libtrace_t *)data;
1155        uint64_t next_release;
1156        libtrace_thread_t *t = &trace->keepalive_thread;
1157
1158        /* Wait until all threads are started */
1159        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
1160        if (trace->state == STATE_ERROR) {
1161                thread_change_state(trace, t, THREAD_FINISHED, false);
1162                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1163                pthread_exit(NULL);
1164        }
1165        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1166
1167        gettimeofday(&prev, NULL);
1168        message.code = MESSAGE_TICK_INTERVAL;
1169
1170        while (trace->state != STATE_FINISHED) {
1171                fd_set rfds;
1172                next_release = tv_to_usec(&prev) + (trace->config.tick_interval * 1000);
1173                gettimeofday(&next, NULL);
1174                if (next_release > tv_to_usec(&next)) {
1175                        next = usec_to_tv(next_release - tv_to_usec(&next));
1176                        // Wait for timeout or a message
1177                        FD_ZERO(&rfds);
1178                        FD_SET(libtrace_message_queue_get_fd(&t->messages), &rfds);
1179                        if (select(libtrace_message_queue_get_fd(&t->messages)+1, &rfds, NULL, NULL, &next) == 1) {
1180                                libtrace_message_t msg;
1181                                libtrace_message_queue_get(&t->messages, &msg);
1182                                assert(msg.code == MESSAGE_DO_STOP);
1183                                goto done;
1184                        }
1185                }
1186                prev = usec_to_tv(next_release);
1187                if (trace->state == STATE_RUNNING) {
1188                        message.data.uint64 = ((((uint64_t)prev.tv_sec) << 32) +
1189                                               (((uint64_t)prev.tv_usec << 32)/1000000));
1190                        trace_message_perpkts(trace, &message);
1191                }
1192        }
1193done:
1194
1195        thread_change_state(trace, t, THREAD_FINISHED, true);
1196        return NULL;
1197}
1198
1199/**
1200 * Delays a packets playback so the playback will be in trace time.
1201 * This may break early if a message becomes available.
1202 *
1203 * Requires the first packet for this thread to be received.
1204 * @param libtrace  The trace
1205 * @param packet    The packet to delay
1206 * @param t         The current thread
1207 * @return Either READ_MESSAGE(-2) or 0 is successful
1208 */
1209static inline int delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t) {
1210        struct timeval curr_tv, pkt_tv;
1211        uint64_t next_release = t->tracetime_offset_usec;
1212        uint64_t curr_usec;
1213
1214        if (!t->tracetime_offset_usec) {
1215                const libtrace_packet_t *first_pkt;
1216                const struct timeval *sys_tv;
1217                int64_t initial_offset;
1218                int stable = trace_get_first_packet(libtrace, NULL, &first_pkt, &sys_tv);
1219                assert(first_pkt);
1220                pkt_tv = trace_get_timeval(first_pkt);
1221                initial_offset = (int64_t)tv_to_usec(sys_tv) - (int64_t)tv_to_usec(&pkt_tv);
1222                /* In the unlikely case offset is 0, change it to 1 */
1223                if (stable)
1224                        t->tracetime_offset_usec = initial_offset ? initial_offset: 1;
1225                next_release = initial_offset;
1226        }
1227        /* next_release == offset */
1228        pkt_tv = trace_get_timeval(packet);
1229        next_release += tv_to_usec(&pkt_tv);
1230        gettimeofday(&curr_tv, NULL);
1231        curr_usec = tv_to_usec(&curr_tv);
1232        if (next_release > curr_usec) {
1233                int ret, mesg_fd = libtrace_message_queue_get_fd(&t->messages);
1234                struct timeval delay_tv = usec_to_tv(next_release-curr_usec);
1235                fd_set rfds;
1236                FD_ZERO(&rfds);
1237                FD_SET(mesg_fd, &rfds);
1238                // We need to wait
1239                ret = select(mesg_fd+1, &rfds, NULL, NULL, &delay_tv);
1240                if (ret == 0) {
1241                        return 0;
1242                } else if (ret > 0) {
1243                        return READ_MESSAGE;
1244                } else {
1245                        assert(!"trace_delay_packet: Unexpected return from select");
1246                }
1247        }
1248        return 0;
1249}
1250
1251/* Discards packets that don't match the filter.
1252 * Discarded packets are emptied and then moved to the end of the packet list.
1253 *
1254 * @param trace       The trace format, containing the filter
1255 * @param packets     An array of packets
1256 * @param nb_packets  The number of valid items in packets
1257 *
1258 * @return The number of packets that passed the filter, which are moved to
1259 *          the start of the packets array
1260 */
1261static inline size_t filter_packets(libtrace_t *trace,
1262                                    libtrace_packet_t **packets,
1263                                    size_t nb_packets) {
1264        size_t offset = 0;
1265        size_t i;
1266
1267        for (i = 0; i < nb_packets; ++i) {
1268                // The filter needs the trace attached to receive the link type
1269                packets[i]->trace = trace;
1270                if (trace_apply_filter(trace->filter, packets[i])) {
1271                        libtrace_packet_t *tmp;
1272                        tmp = packets[offset];
1273                        packets[offset++] = packets[i];
1274                        packets[i] = tmp;
1275                } else {
1276                        trace_fin_packet(packets[i]);
1277                }
1278        }
1279
1280        return offset;
1281}
1282
1283/* Read a batch of packets from the trace into a buffer.
1284 * Note that this function will block until a packet is read (or EOF is reached)
1285 *
1286 * @param libtrace    The trace
1287 * @param t           The thread
1288 * @param packets     An array of packets
1289 * @param nb_packets  The number of empty packets in packets
1290 * @return The number of packets read, 0 on EOF (or an error/message -1,-2).
1291 */
1292static int trace_pread_packet_wrapper(libtrace_t *libtrace,
1293                                      libtrace_thread_t *t,
1294                                      libtrace_packet_t *packets[],
1295                                      size_t nb_packets) {
1296        int i;
1297        assert(nb_packets);
1298        assert(libtrace && "libtrace is NULL in trace_read_packet()");
1299        if (trace_is_err(libtrace))
1300                return -1;
1301        if (!libtrace->started) {
1302                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
1303                              "You must call libtrace_start() before trace_read_packet()\n");
1304                return -1;
1305        }
1306
1307        if (libtrace->format->pread_packets) {
1308                int ret;
1309                for (i = 0; i < (int) nb_packets; ++i) {
1310                        assert(i[packets]);
1311                        if (!(packets[i]->buf_control==TRACE_CTRL_PACKET ||
1312                              packets[i]->buf_control==TRACE_CTRL_EXTERNAL)) {
1313                                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,
1314                                              "Packet passed to trace_read_packet() is invalid\n");
1315                                return -1;
1316                        }
1317                }
1318                do {
1319                        ret=libtrace->format->pread_packets(libtrace, t,
1320                                                            packets,
1321                                                            nb_packets);
1322                        /* Error, EOF or message? */
1323                        if (ret <= 0) {
1324                                return ret;
1325                        }
1326
1327                        if (libtrace->filter) {
1328                                int remaining;
1329                                remaining = filter_packets(libtrace,
1330                                                           packets, ret);
1331                                t->filtered_packets += ret - remaining;
1332                                ret = remaining;
1333                        }
1334                        for (i = 0; i < ret; ++i) {
1335                                /* We do not mark the packet against the trace,
1336                                 * before hand or after. After breaks DAG meta
1337                                 * packets and before is inefficient */
1338                                //packets[i]->trace = libtrace;
1339                                /* TODO IN FORMAT?? Like traditional libtrace */
1340                                if (libtrace->snaplen>0)
1341                                        trace_set_capture_length(packets[i],
1342                                                        libtrace->snaplen);
1343                                trace_packet_set_order(packets[i], trace_get_erf_timestamp(packets[i]));
1344                        }
1345                } while(ret == 0);
1346                return ret;
1347        }
1348        trace_set_err(libtrace, TRACE_ERR_UNSUPPORTED,
1349                      "This format does not support reading packets\n");
1350        return ~0U;
1351}
1352
1353/* Restarts a parallel trace, this is called from trace_pstart.
1354 * The libtrace lock is held upon calling this function.
1355 * Typically with a parallel trace the threads are not
1356 * killed rather.
1357 */
1358static int trace_prestart(libtrace_t * libtrace, void *global_blob,
1359                          libtrace_callback_set_t *per_packet_cbs, 
1360                          libtrace_callback_set_t *reporter_cbs) {
1361        int i, err = 0;
1362        if (libtrace->state != STATE_PAUSED) {
1363                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
1364                        "trace(%s) is not currently paused",
1365                              libtrace->uridata);
1366                return -1;
1367        }
1368
1369        assert(libtrace_parallel);
1370        assert(!libtrace->perpkt_thread_states[THREAD_RUNNING]);
1371
1372        /* Reset first packets */
1373        pthread_spin_lock(&libtrace->first_packets.lock);
1374        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1375                assert(!!libtrace->perpkt_threads[i].recorded_first == !!libtrace->first_packets.packets[i].packet);
1376                if (libtrace->first_packets.packets[i].packet) {
1377                        trace_destroy_packet(libtrace->first_packets.packets[i].packet);
1378                        libtrace->first_packets.packets[i].packet = NULL;
1379                        libtrace->first_packets.packets[i].tv.tv_sec = 0;
1380                        libtrace->first_packets.packets[i].tv.tv_usec = 0;
1381                        libtrace->first_packets.count--;
1382                        libtrace->perpkt_threads[i].recorded_first = false;
1383                }
1384        }
1385        assert(libtrace->first_packets.count == 0);
1386        libtrace->first_packets.first = 0;
1387        pthread_spin_unlock(&libtrace->first_packets.lock);
1388
1389        /* Reset delay */
1390        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1391                libtrace->perpkt_threads[i].tracetime_offset_usec = 0;
1392        }
1393
1394        /* Reset statistics */
1395        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1396                libtrace->perpkt_threads[i].accepted_packets = 0;
1397                libtrace->perpkt_threads[i].filtered_packets = 0;
1398        }
1399        libtrace->accepted_packets = 0;
1400        libtrace->filtered_packets = 0;
1401
1402        /* Update functions if requested */
1403        if(global_blob)
1404                libtrace->global_blob = global_blob;
1405
1406        if (per_packet_cbs) {
1407                if (libtrace->perpkt_cbs)
1408                        trace_destroy_callback_set(libtrace->perpkt_cbs);
1409                libtrace->perpkt_cbs = trace_create_callback_set();
1410                memcpy(libtrace->perpkt_cbs, per_packet_cbs, 
1411                                sizeof(libtrace_callback_set_t));
1412        }
1413
1414        if (reporter_cbs) {
1415                if (libtrace->reporter_cbs)
1416                        trace_destroy_callback_set(libtrace->reporter_cbs);
1417
1418                libtrace->reporter_cbs = trace_create_callback_set();
1419                memcpy(libtrace->reporter_cbs, reporter_cbs, 
1420                                sizeof(libtrace_callback_set_t));
1421        }
1422
1423        if (trace_is_parallel(libtrace)) {
1424                err = libtrace->format->pstart_input(libtrace);
1425        } else {
1426                if (libtrace->format->start_input) {
1427                        err = libtrace->format->start_input(libtrace);
1428                }
1429        }
1430
1431        if (err == 0) {
1432                libtrace->started = true;
1433                libtrace_change_state(libtrace, STATE_RUNNING, false);
1434        }
1435        return err;
1436}
1437
1438/**
1439 * @return the number of CPU cores on the machine. -1 if unknown.
1440 */
1441SIMPLE_FUNCTION static int get_nb_cores() {
1442        int numCPU;
1443#ifdef _SC_NPROCESSORS_ONLN
1444        /* Most systems do this now */
1445        numCPU = sysconf(_SC_NPROCESSORS_ONLN);
1446
1447#else
1448        int mib[] = {CTL_HW, HW_AVAILCPU};
1449        size_t len = sizeof(numCPU);
1450
1451        /* get the number of CPUs from the system */
1452        sysctl(mib, 2, &numCPU, &len, NULL, 0);
1453#endif
1454        return numCPU <= 0 ? 1 : numCPU;
1455}
1456
1457/**
1458 * Verifies the configuration and sets default values for any values not
1459 * specified by the user.
1460 */
1461static void verify_configuration(libtrace_t *libtrace) {
1462
1463        if (libtrace->config.hasher_queue_size <= 0)
1464                libtrace->config.hasher_queue_size = 1000;
1465
1466        if (libtrace->config.perpkt_threads <= 0) {
1467                libtrace->perpkt_thread_count = get_nb_cores();
1468                if (libtrace->perpkt_thread_count <= 0)
1469                        // Lets just use one
1470                        libtrace->perpkt_thread_count = 1;
1471        } else {
1472                libtrace->perpkt_thread_count = libtrace->config.perpkt_threads;
1473        }
1474
1475        if (libtrace->config.reporter_thold <= 0)
1476                libtrace->config.reporter_thold = 100;
1477        if (libtrace->config.burst_size <= 0)
1478                libtrace->config.burst_size = 32;
1479        if (libtrace->config.thread_cache_size <= 0)
1480                libtrace->config.thread_cache_size = 64;
1481        if (libtrace->config.cache_size <= 0)
1482                libtrace->config.cache_size = (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count;
1483
1484        if (libtrace->config.cache_size <
1485                (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count)
1486                fprintf(stderr, "WARNING deadlocks may occur and extra memory allocating buffer sizes (packet_freelist_size) mismatched\n");
1487
1488        if (libtrace->combiner.initialise == NULL && libtrace->combiner.publish == NULL)
1489                libtrace->combiner = combiner_unordered;
1490
1491        /* Figure out if we are using a dedicated hasher thread? */
1492        if (libtrace->hasher && libtrace->perpkt_thread_count > 1) {
1493                libtrace->hasher_thread.type = THREAD_HASHER;
1494        }
1495}
1496
1497/**
1498 * Starts a libtrace_thread, including allocating memory for messaging.
1499 * Threads are expected to wait until the libtrace look is released.
1500 * Hence why we don't init structures until later.
1501 *
1502 * @param trace The trace the thread is associated with
1503 * @param t The thread that is filled when the thread is started
1504 * @param type The type of thread
1505 * @param start_routine The entry location of the thread
1506 * @param perpkt_num The perpkt thread number (should be set -1 if not perpkt)
1507 * @param name For debugging purposes set the threads name (Optional)
1508 *
1509 * @return 0 on success or -1 upon error in which case the libtrace error is set.
1510 *         In this situation the thread structure is zeroed.
1511 */
1512static int trace_start_thread(libtrace_t *trace,
1513                       libtrace_thread_t *t,
1514                       enum thread_types type,
1515                       void *(*start_routine) (void *),
1516                       int perpkt_num,
1517                       const char *name) {
1518#ifdef __linux__
1519        pthread_attr_t attrib;
1520        cpu_set_t cpus;
1521        int i;
1522#endif
1523        int ret;
1524        assert(t->type == THREAD_EMPTY);
1525        t->trace = trace;
1526        t->ret = NULL;
1527        t->user_data = NULL;
1528        t->type = type;
1529        t->state = THREAD_RUNNING;
1530
1531        assert(name);
1532
1533#ifdef __linux__
1534        CPU_ZERO(&cpus);
1535        for (i = 0; i < get_nb_cores(); i++)
1536                CPU_SET(i, &cpus);
1537        pthread_attr_init(&attrib);
1538        pthread_attr_setaffinity_np(&attrib, sizeof(cpus), &cpus);
1539        ret = pthread_create(&t->tid, &attrib, start_routine, (void *) trace);
1540        pthread_attr_destroy(&attrib);
1541#else
1542        ret = pthread_create(&t->tid, NULL, start_routine, (void *) trace);
1543#endif
1544        if (ret != 0) {
1545                libtrace_zero_thread(t);
1546                trace_set_err(trace, ret, "Failed to create a thread of type=%d\n", type);
1547                return -1;
1548        }
1549        libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
1550        if (trace_has_dedicated_hasher(trace) && type == THREAD_PERPKT) {
1551                libtrace_ringbuffer_init(&t->rbuffer,
1552                                         trace->config.hasher_queue_size,
1553                                         trace->config.hasher_polling?
1554                                                 LIBTRACE_RINGBUFFER_POLLING:
1555                                                 LIBTRACE_RINGBUFFER_BLOCKING);
1556        }
1557#if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__)
1558        if(name)
1559                pthread_setname_np(t->tid, name);
1560#endif
1561        t->perpkt_num = perpkt_num;
1562        return 0;
1563}
1564
1565/** Parses the environment variable LIBTRACE_CONF into the supplied
1566 * configuration structure.
1567 *
1568 * @param[in,out] libtrace The trace from which we determine the URI and set
1569 * the configuration.
1570 *
1571 * We search for 3 environment variables and apply them to the config in the
1572 * following order. Such that the first has the lowest priority.
1573 *
1574 * 1. LIBTRACE_CONF, The global environment configuration
1575 * 2. LIBTRACE_CONF_<FORMAT>, Applied to a given format
1576 * 3. LIBTRACE_CONF_<FORMAT_URI>, Applied the specified trace
1577 *
1578 * E.g.
1579 * - int:eth0 would match LIBTRACE_CONF, LIBTRACE_CONF_INT, LIBTRACE_CONF_INT_ETH0
1580 * - dag:/dev/dag0,0 would match LIBTRACE_CONF, LIBTRACE_CONF_DAG, LIBTRACE_CONF_DAG__DEV_DAG0_0
1581 * - test.erf would match LIBTRACE_CONF, LIBTRACE_CONF_ERF, LIBTRACE_CONF_ERF_TEST_ERF
1582 *
1583 * @note All environment variables names MUST only contian
1584 * [A-Z], [0-9] and [_] (underscore) and not start with a number. Any characters
1585 * outside of this range should be captilised if possible or replaced with an
1586 * underscore.
1587 */
1588static void parse_env_config (libtrace_t *libtrace) {
1589        char env_name[1024] = "LIBTRACE_CONF_";
1590        size_t len = strlen(env_name);
1591        size_t mark = 0;
1592        size_t i;
1593        char * env;
1594
1595        /* Make our compound string */
1596        strncpy(&env_name[len], libtrace->format->name, sizeof(env_name) - len);
1597        len += strlen(libtrace->format->name);
1598        strncpy(&env_name[len], ":", sizeof(env_name) - len);
1599        len += 1;
1600        strncpy(&env_name[len], libtrace->uridata, sizeof(env_name) - len);
1601
1602        /* env names are allowed to be A-Z (CAPS) 0-9 and _ */
1603        for (i = 0; env_name[i] != 0; ++i) {
1604                env_name[i] = toupper(env_name[i]);
1605                if(env_name[i] == ':') {
1606                        mark = i;
1607                }
1608                if (!( (env_name[i] >= 'A' && env_name[i] <= 'Z') ||
1609                       (env_name[i] >= '0' && env_name[i] <= '9') )) {
1610                        env_name[i] = '_';
1611                }
1612        }
1613
1614        /* First apply global env settings LIBTRACE_CONF */
1615        env = getenv("LIBTRACE_CONF");
1616        if (env)
1617        {
1618                printf("Got env %s", env);
1619                trace_set_configuration(libtrace, env);
1620        }
1621
1622        /* Then format settings LIBTRACE_CONF_<FORMAT> */
1623        if (mark != 0) {
1624                env_name[mark] = 0;
1625                env = getenv(env_name);
1626                if (env) {
1627                        trace_set_configuration(libtrace, env);
1628                }
1629                env_name[mark] = '_';
1630        }
1631
1632        /* Finally this specific trace LIBTRACE_CONF_<FORMAT_URI> */
1633        env = getenv(env_name);
1634        if (env) {
1635                trace_set_configuration(libtrace, env);
1636        }
1637}
1638
1639DLLEXPORT bool trace_is_parallel(libtrace_t * libtrace) {
1640        if (libtrace->state == STATE_NEW)
1641                return trace_supports_parallel(libtrace);
1642        return libtrace->pread == trace_pread_packet_wrapper;
1643}
1644
1645DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob,
1646                           libtrace_callback_set_t *per_packet_cbs,
1647                           libtrace_callback_set_t *reporter_cbs) {
1648        int i;
1649        int ret = -1;
1650        char name[16];
1651        sigset_t sig_before, sig_block_all;
1652        assert(libtrace);
1653
1654        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1655        if (trace_is_err(libtrace)) {
1656                goto cleanup_none;
1657        }
1658
1659        if (libtrace->state == STATE_PAUSED) {
1660                ret = trace_prestart(libtrace, global_blob, per_packet_cbs, 
1661                                reporter_cbs);
1662                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1663                return ret;
1664        }
1665
1666        if (libtrace->state != STATE_NEW) {
1667                trace_set_err(libtrace, TRACE_ERR_BAD_STATE, "trace_pstart "
1668                              "should be called on a NEW or PAUSED trace but "
1669                              "instead was called from %s",
1670                              get_trace_state_name(libtrace->state));
1671                goto cleanup_none;
1672        }
1673
1674        /* Store the user defined things against the trace */
1675        libtrace->global_blob = global_blob;
1676
1677        /* Save a copy of the callbacks in case the user tries to change them
1678         * on us later */
1679        if (!per_packet_cbs) {
1680                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "trace_pstart "
1681                                "requires a non-NULL set of per packet "
1682                                "callbacks.");
1683                goto cleanup_none;
1684        }
1685
1686        if (per_packet_cbs->message_packet == NULL) {
1687                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "The per "
1688                                "packet callbacks must include a handler "
1689                                "for a packet. Please set this using "
1690                                "trace_set_packet_cb().");
1691                goto cleanup_none;
1692        }
1693
1694        libtrace->perpkt_cbs = trace_create_callback_set();
1695        memcpy(libtrace->perpkt_cbs, per_packet_cbs, sizeof(libtrace_callback_set_t));
1696       
1697        if (reporter_cbs) {
1698                libtrace->reporter_cbs = trace_create_callback_set();
1699                memcpy(libtrace->reporter_cbs, reporter_cbs, sizeof(libtrace_callback_set_t));
1700        }
1701
1702       
1703
1704
1705        /* And zero other fields */
1706        for (i = 0; i < THREAD_STATE_MAX; ++i) {
1707                libtrace->perpkt_thread_states[i] = 0;
1708        }
1709        libtrace->first_packets.first = 0;
1710        libtrace->first_packets.count = 0;
1711        libtrace->first_packets.packets = NULL;
1712        libtrace->perpkt_threads = NULL;
1713        /* Set a global which says we are using a parallel trace. This is
1714         * for backwards compatibility due to changes when destroying packets */
1715        libtrace_parallel = 1;
1716
1717        /* Parses configuration passed through environment variables */
1718        parse_env_config(libtrace);
1719        verify_configuration(libtrace);
1720
1721        ret = -1;
1722        /* Try start the format - we prefer parallel over single threaded, as
1723         * these formats should support messages better */
1724        if (trace_supports_parallel(libtrace) &&
1725            !trace_has_dedicated_hasher(libtrace)) {
1726                ret = libtrace->format->pstart_input(libtrace);
1727                libtrace->pread = trace_pread_packet_wrapper;
1728        }
1729        if (ret != 0) {
1730                if (libtrace->format->start_input) {
1731                        ret = libtrace->format->start_input(libtrace);
1732                }
1733                if (libtrace->perpkt_thread_count > 1)
1734                        libtrace->pread = trace_pread_packet_first_in_first_served;
1735                else
1736                        /* Use standard read_packet */
1737                        libtrace->pread = NULL;
1738        }
1739
1740        if (ret != 0) {
1741                goto cleanup_none;
1742        }
1743
1744        /* --- Start all the threads we need --- */
1745        /* Disable signals because it is inherited by the threads we start */
1746        sigemptyset(&sig_block_all);
1747        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_block_all, &sig_before), == 0);
1748
1749        /* If we need a hasher thread start it
1750         * Special Case: If single threaded we don't need a hasher
1751         */
1752        if (trace_has_dedicated_hasher(libtrace)) {
1753                libtrace->hasher_thread.type = THREAD_EMPTY;
1754                ret = trace_start_thread(libtrace, &libtrace->hasher_thread,
1755                                   THREAD_HASHER, hasher_entry, -1,
1756                                   "hasher-thread");
1757                if (ret != 0)
1758                        goto cleanup_started;
1759                libtrace->pread = trace_pread_packet_hasher_thread;
1760        } else {
1761                libtrace->hasher_thread.type = THREAD_EMPTY;
1762        }
1763
1764        /* Start up our perpkt threads */
1765        libtrace->perpkt_threads = calloc(sizeof(libtrace_thread_t),
1766                                          libtrace->perpkt_thread_count);
1767        if (!libtrace->perpkt_threads) {
1768                trace_set_err(libtrace, errno, "trace_pstart "
1769                              "failed to allocate memory.");
1770                goto cleanup_threads;
1771        }
1772        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1773                snprintf(name, sizeof(name), "perpkt-%d", i);
1774                libtrace_zero_thread(&libtrace->perpkt_threads[i]);
1775                ret = trace_start_thread(libtrace, &libtrace->perpkt_threads[i],
1776                                   THREAD_PERPKT, perpkt_threads_entry, i,
1777                                   name);
1778                if (ret != 0)
1779                        goto cleanup_threads;
1780        }
1781
1782        /* Start the reporter thread */
1783        if (reporter_cbs) {
1784                if (libtrace->combiner.initialise)
1785                        libtrace->combiner.initialise(libtrace, &libtrace->combiner);
1786                ret = trace_start_thread(libtrace, &libtrace->reporter_thread,
1787                                   THREAD_REPORTER, reporter_entry, -1,
1788                                   "reporter_thread");
1789                if (ret != 0)
1790                        goto cleanup_threads;
1791        }
1792
1793        /* Start the keepalive thread */
1794        if (libtrace->config.tick_interval > 0) {
1795                ret = trace_start_thread(libtrace, &libtrace->keepalive_thread,
1796                                   THREAD_KEEPALIVE, keepalive_entry, -1,
1797                                   "keepalive_thread");
1798                if (ret != 0)
1799                        goto cleanup_threads;
1800        }
1801
1802        /* Init other data structures */
1803        libtrace->perpkt_thread_states[THREAD_RUNNING] = libtrace->perpkt_thread_count;
1804        ASSERT_RET(pthread_spin_init(&libtrace->first_packets.lock, 0), == 0);
1805        libtrace->first_packets.packets = calloc(libtrace->perpkt_thread_count,
1806                                                 sizeof(*libtrace->first_packets.packets));
1807        if (libtrace->first_packets.packets == NULL) {
1808                trace_set_err(libtrace, errno, "trace_pstart "
1809                              "failed to allocate memory.");
1810                goto cleanup_threads;
1811        }
1812
1813        if (libtrace_ocache_init(&libtrace->packet_freelist,
1814                             (void* (*)()) trace_create_packet,
1815                             (void (*)(void *))trace_destroy_packet,
1816                             libtrace->config.thread_cache_size,
1817                             libtrace->config.cache_size * 4,
1818                             libtrace->config.fixed_count) != 0) {
1819                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "trace_pstart "
1820                              "failed to allocate ocache.");
1821                goto cleanup_threads;
1822        }
1823
1824        /* Threads don't start */
1825        libtrace->started = true;
1826        libtrace_change_state(libtrace, STATE_RUNNING, false);
1827
1828        ret = 0;
1829        goto success;
1830cleanup_threads:
1831        if (libtrace->first_packets.packets) {
1832                free(libtrace->first_packets.packets);
1833                libtrace->first_packets.packets = NULL;
1834        }
1835        libtrace_change_state(libtrace, STATE_ERROR, false);
1836        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1837        if (libtrace->hasher_thread.type == THREAD_HASHER) {
1838                pthread_join(libtrace->hasher_thread.tid, NULL);
1839                libtrace_zero_thread(&libtrace->hasher_thread);
1840        }
1841
1842        if (libtrace->perpkt_threads) {
1843                for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1844                        if (libtrace->perpkt_threads[i].type == THREAD_PERPKT) {
1845                                pthread_join(libtrace->perpkt_threads[i].tid, NULL);
1846                                libtrace_zero_thread(&libtrace->perpkt_threads[i]);
1847                        } else break;
1848                }
1849                free(libtrace->perpkt_threads);
1850                libtrace->perpkt_threads = NULL;
1851        }
1852
1853        if (libtrace->reporter_thread.type == THREAD_REPORTER) {
1854                pthread_join(libtrace->reporter_thread.tid, NULL);
1855                libtrace_zero_thread(&libtrace->reporter_thread);
1856        }
1857
1858        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
1859                pthread_join(libtrace->keepalive_thread.tid, NULL);
1860                libtrace_zero_thread(&libtrace->keepalive_thread);
1861        }
1862        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1863        libtrace_change_state(libtrace, STATE_NEW, false);
1864        assert(libtrace->perpkt_thread_states[THREAD_RUNNING] == 0);
1865        libtrace->perpkt_thread_states[THREAD_FINISHED] = 0;
1866cleanup_started:
1867        if (libtrace->pread == trace_pread_packet_wrapper) {
1868                if (libtrace->format->ppause_input)
1869                        libtrace->format->ppause_input(libtrace);
1870        } else {
1871                if (libtrace->format->pause_input)
1872                        libtrace->format->pause_input(libtrace);
1873        }
1874        ret = -1;
1875success:
1876        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_before, NULL), == 0);
1877cleanup_none:
1878        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1879        return ret;
1880}
1881
1882DLLEXPORT int trace_set_starting_cb(libtrace_callback_set_t *cbset,
1883                fn_cb_starting handler) {
1884        cbset->message_starting = handler;
1885        return 0;
1886}
1887
1888DLLEXPORT int trace_set_pausing_cb(libtrace_callback_set_t *cbset,
1889                fn_cb_dataless handler) {
1890        cbset->message_pausing = handler;
1891        return 0;
1892}
1893
1894DLLEXPORT int trace_set_resuming_cb(libtrace_callback_set_t *cbset,
1895                fn_cb_dataless handler) {
1896        cbset->message_resuming = handler;
1897        return 0;
1898}
1899
1900DLLEXPORT int trace_set_stopping_cb(libtrace_callback_set_t *cbset,
1901                fn_cb_dataless handler) {
1902        cbset->message_stopping = handler;
1903        return 0;
1904}
1905
1906DLLEXPORT int trace_set_packet_cb(libtrace_callback_set_t *cbset,
1907                fn_cb_packet handler) {
1908        cbset->message_packet = handler;
1909        return 0;
1910}
1911
1912DLLEXPORT int trace_set_first_packet_cb(libtrace_callback_set_t *cbset,
1913                fn_cb_first_packet handler) {
1914        cbset->message_first_packet = handler;
1915        return 0;
1916}
1917
1918DLLEXPORT int trace_set_tick_count_cb(libtrace_callback_set_t *cbset,
1919                fn_cb_tick handler) {
1920        cbset->message_tick_count = handler;
1921        return 0;
1922}
1923
1924DLLEXPORT int trace_set_tick_interval_cb(libtrace_callback_set_t *cbset,
1925                fn_cb_tick handler) {
1926        cbset->message_tick_interval = handler;
1927        return 0;
1928}
1929
1930DLLEXPORT int trace_set_result_cb(libtrace_callback_set_t *cbset,
1931                fn_cb_result handler) {
1932        cbset->message_result = handler;
1933        return 0;
1934}
1935
1936DLLEXPORT int trace_set_user_message_cb(libtrace_callback_set_t *cbset,
1937                fn_cb_usermessage handler) {
1938        cbset->message_user = handler;
1939        return 0;
1940}
1941
1942/*
1943 * Pauses a trace, this should only be called by the main thread
1944 * 1. Set started = false
1945 * 2. All perpkt threads are paused waiting on a condition var
1946 * 3. Then call ppause on the underlying format if found
1947 * 4. The traces state is paused
1948 *
1949 * Once done you should be able to modify the trace setup and call pstart again
1950 * TODO add support to change the number of threads.
1951 */
1952DLLEXPORT int trace_ppause(libtrace_t *libtrace)
1953{
1954        libtrace_thread_t *t;
1955        int i;
1956        assert(libtrace);
1957
1958        t = get_thread_table(libtrace);
1959        // Check state from within the lock if we are going to change it
1960        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1961
1962        /* If we are already paused, just treat this as a NOOP */
1963        if (libtrace->state == STATE_PAUSED) {
1964                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1965                return 0;
1966        }
1967        if (!libtrace->started || libtrace->state != STATE_RUNNING) {
1968                trace_set_err(libtrace,TRACE_ERR_BAD_STATE, "You must call trace_start() before calling trace_ppause()");
1969                return -1;
1970        }
1971
1972        libtrace_change_state(libtrace, STATE_PAUSING, false);
1973        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1974
1975        // Special case handle the hasher thread case
1976        if (trace_has_dedicated_hasher(libtrace)) {
1977                if (libtrace->config.debug_state)
1978                        fprintf(stderr, "Hasher thread is running, asking it to pause ...");
1979                libtrace_message_t message = {0, {.uint64=0}, NULL};
1980                message.code = MESSAGE_DO_PAUSE;
1981                trace_message_thread(libtrace, &libtrace->hasher_thread, &message);
1982                // Wait for it to pause
1983                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1984                while (libtrace->hasher_thread.state == THREAD_RUNNING) {
1985                        ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
1986                }
1987                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1988                if (libtrace->config.debug_state)
1989                        fprintf(stderr, " DONE\n");
1990        }
1991
1992        if (libtrace->config.debug_state)
1993                fprintf(stderr, "Asking perpkt threads to pause ...");
1994        // Stop threads, skip this one if it's a perpkt
1995        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1996                if (&libtrace->perpkt_threads[i] != t) {
1997                        libtrace_message_t message = {0, {.uint64=0}, NULL};
1998                        message.code = MESSAGE_DO_PAUSE;
1999                        ASSERT_RET(trace_message_thread(libtrace, &libtrace->perpkt_threads[i], &message), != -1);
2000                        if(trace_has_dedicated_hasher(libtrace)) {
2001                                // The hasher has stopped and other threads have messages waiting therefore
2002                                // If the queues are empty the other threads would have no data
2003                                // So send some message packets to simply ask the threads to check
2004                                // We are the only writer since hasher has paused
2005                                libtrace_packet_t *pkt;
2006                                libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &pkt, 1, 1);
2007                                pkt->error = READ_MESSAGE;
2008                                libtrace_ringbuffer_write(&libtrace->perpkt_threads[i].rbuffer, pkt);
2009                        }
2010                } else {
2011                        fprintf(stderr, "Mapper threads should not be used to pause a trace this could cause any number of problems!!\n");
2012                }
2013        }
2014
2015        if (t) {
2016                // A perpkt is doing the pausing, interesting, fake an extra thread paused
2017                // We rely on the user to *not* return before starting the trace again
2018                thread_change_state(libtrace, t, THREAD_PAUSED, true);
2019        }
2020
2021        // Wait for all threads to pause
2022        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
2023        while(libtrace->perpkt_thread_states[THREAD_RUNNING]) {
2024                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
2025        }
2026        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2027
2028        if (libtrace->config.debug_state)
2029                fprintf(stderr, " DONE\n");
2030
2031        // Deal with the reporter
2032        if (trace_has_reporter(libtrace)) {
2033                if (libtrace->config.debug_state)
2034                        fprintf(stderr, "Reporter thread is running, asking it to pause ...");
2035                if (pthread_equal(pthread_self(), libtrace->reporter_thread.tid)) {
2036                        libtrace->combiner.pause(libtrace, &libtrace->combiner);
2037                        thread_change_state(libtrace, &libtrace->reporter_thread, THREAD_PAUSED, true);
2038               
2039                } else {
2040                        libtrace_message_t message = {0, {.uint64=0}, NULL};
2041                        message.code = MESSAGE_DO_PAUSE;
2042                        trace_message_thread(libtrace, &libtrace->reporter_thread, &message);
2043                        // Wait for it to pause
2044                        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
2045                        while (libtrace->reporter_thread.state == THREAD_RUNNING) {
2046                                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
2047                        }
2048                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2049                }
2050                if (libtrace->config.debug_state)
2051                        fprintf(stderr, " DONE\n");
2052        }
2053
2054        /* Cache values before we pause */
2055        if (libtrace->stats == NULL)
2056                libtrace->stats = trace_create_statistics();
2057        // Save the statistics against the trace
2058        trace_get_statistics(libtrace, NULL);
2059        if (trace_is_parallel(libtrace)) {
2060                libtrace->started = false;
2061                if (libtrace->format->ppause_input)
2062                        libtrace->format->ppause_input(libtrace);
2063                // TODO What happens if we don't have pause input??
2064        } else {
2065                int err;
2066                err = trace_pause(libtrace);
2067                // We should handle this a bit better
2068                if (err)
2069                        return err;
2070        }
2071
2072        // Only set as paused after the pause has been called on the trace
2073        libtrace_change_state(libtrace, STATE_PAUSED, true);
2074        return 0;
2075}
2076
2077/**
2078 * Stop trace finish prematurely as though it meet an EOF
2079 * This should only be called by the main thread
2080 * 1. Calls ppause
2081 * 2. Sends a message asking for threads to finish
2082 * 3. Releases threads which will pause
2083 */
2084DLLEXPORT int trace_pstop(libtrace_t *libtrace)
2085{
2086        int i, err;
2087        libtrace_message_t message = {0, {.uint64=0}, NULL};
2088        assert(libtrace);
2089
2090        // Ensure all threads have paused and the underlying trace format has
2091        // been closed and all packets associated are cleaned up
2092        // Pause will do any state checks for us
2093        err = trace_ppause(libtrace);
2094        if (err)
2095                return err;
2096
2097        // Now send a message asking the threads to stop
2098        // This will be retrieved before trying to read another packet
2099
2100        message.code = MESSAGE_DO_STOP;
2101        trace_message_perpkts(libtrace, &message);
2102        if (trace_has_dedicated_hasher(libtrace))
2103                trace_message_thread(libtrace, &libtrace->hasher_thread, &message);
2104
2105        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2106                trace_message_thread(libtrace, &libtrace->perpkt_threads[i], &message);
2107        }
2108
2109        /* Now release the threads and let them stop - when the threads finish
2110         * the state will be set to finished */
2111        libtrace_change_state(libtrace, STATE_FINISHING, true);
2112        return 0;
2113}
2114
2115DLLEXPORT int trace_set_hasher(libtrace_t *trace, enum hasher_types type, fn_hasher hasher, void *data) {
2116        int ret = -1;
2117        if ((type == HASHER_CUSTOM && !hasher) || (type == HASHER_BALANCE && hasher)) {
2118                return -1;
2119        }
2120
2121        // Save the requirements
2122        trace->hasher_type = type;
2123        if (hasher) {
2124                trace->hasher = hasher;
2125                trace->hasher_data = data;
2126        } else {
2127                trace->hasher = NULL;
2128                trace->hasher_data = NULL;
2129        }
2130
2131        // Try push this to hardware - NOTE hardware could do custom if
2132        // there is a more efficient way to apply it, in this case
2133        // it will simply grab the function out of libtrace_t
2134        if (trace_supports_parallel(trace) && trace->format->config_input)
2135                ret = trace->format->config_input(trace, TRACE_OPTION_HASHER, &type);
2136
2137        if (ret == -1) {
2138                /* We have to deal with this ourself */
2139                if (!hasher) {
2140                        switch (type)
2141                        {
2142                                case HASHER_CUSTOM:
2143                                case HASHER_BALANCE:
2144                                        return 0;
2145                                case HASHER_BIDIRECTIONAL:
2146                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
2147                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
2148                                        toeplitz_init_config(trace->hasher_data, 1);
2149                                        return 0;
2150                                case HASHER_UNIDIRECTIONAL:
2151                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
2152                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
2153                                        toeplitz_init_config(trace->hasher_data, 0);
2154                                        return 0;
2155                        }
2156                        return -1;
2157                }
2158        } else {
2159                /* If the hasher is hardware we zero out the hasher and hasher
2160                 * data fields - only if we need a hasher do we do this */
2161                trace->hasher = NULL;
2162                trace->hasher_data = NULL;
2163        }
2164
2165        return 0;
2166}
2167
2168// Waits for all threads to finish
2169DLLEXPORT void trace_join(libtrace_t *libtrace) {
2170        int i;
2171
2172        /* Firstly wait for the perpkt threads to finish, since these are
2173         * user controlled */
2174        for (i=0; i< libtrace->perpkt_thread_count; i++) {
2175                ASSERT_RET(pthread_join(libtrace->perpkt_threads[i].tid, NULL), == 0);
2176                // So we must do our best effort to empty the queue - so
2177                // the producer (or any other threads) don't block.
2178                libtrace_packet_t * packet;
2179                assert(libtrace->perpkt_threads[i].state == THREAD_FINISHED);
2180                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
2181                        if (packet) // This could be NULL iff the perpkt finishes early
2182                                trace_destroy_packet(packet);
2183        }
2184
2185        /* Now the hasher */
2186        if (trace_has_dedicated_hasher(libtrace)) {
2187                pthread_join(libtrace->hasher_thread.tid, NULL);
2188                assert(libtrace->hasher_thread.state == THREAD_FINISHED);
2189        }
2190
2191        // Now that everything is finished nothing can be touching our
2192        // buffers so clean them up
2193        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2194                // Its possible 1 packet got added by the reporter (or 1 per any other thread) since we cleaned up
2195                // if they lost timeslice before-during a write
2196                libtrace_packet_t * packet;
2197                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
2198                        trace_destroy_packet(packet);
2199                if (trace_has_dedicated_hasher(libtrace)) {
2200                        assert(libtrace_ringbuffer_is_empty(&libtrace->perpkt_threads[i].rbuffer));
2201                        libtrace_ringbuffer_destroy(&libtrace->perpkt_threads[i].rbuffer);
2202                }
2203                // Cannot destroy vector yet, this happens with trace_destroy
2204        }
2205
2206        if (trace_has_reporter(libtrace)) {
2207                pthread_join(libtrace->reporter_thread.tid, NULL);
2208                assert(libtrace->reporter_thread.state == THREAD_FINISHED);
2209        }
2210
2211        // Wait for the tick (keepalive) thread if it has been started
2212        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
2213                libtrace_message_t msg = {0, {.uint64=0}, NULL};
2214                msg.code = MESSAGE_DO_STOP;
2215                trace_message_thread(libtrace, &libtrace->keepalive_thread, &msg);
2216                pthread_join(libtrace->keepalive_thread.tid, NULL);
2217        }
2218
2219        libtrace_change_state(libtrace, STATE_JOINED, true);
2220        print_memory_stats();
2221}
2222
2223DLLEXPORT int libtrace_thread_get_message_count(libtrace_t * libtrace,
2224                                                libtrace_thread_t *t)
2225{
2226        int ret;
2227        if (t == NULL)
2228                t = get_thread_descriptor(libtrace);
2229        if (t == NULL)
2230                return -1;
2231        ret = libtrace_message_queue_count(&t->messages);
2232        return ret < 0 ? 0 : ret;
2233}
2234
2235DLLEXPORT int libtrace_thread_get_message(libtrace_t * libtrace,
2236                                          libtrace_thread_t *t,
2237                                          libtrace_message_t * message)
2238{
2239        int ret;
2240        if (t == NULL)
2241                t = get_thread_descriptor(libtrace);
2242        if (t == NULL)
2243                return -1;
2244        ret = libtrace_message_queue_get(&t->messages, message);
2245        return ret < 0 ? 0 : ret;
2246}
2247
2248DLLEXPORT int libtrace_thread_try_get_message(libtrace_t * libtrace,
2249                                              libtrace_thread_t *t,
2250                                              libtrace_message_t * message)
2251{
2252        if (t == NULL)
2253                t = get_thread_descriptor(libtrace);
2254        if (t == NULL)
2255                return -1;
2256        if (libtrace_message_queue_try_get(&t->messages, message) != LIBTRACE_MQ_FAILED)
2257                return 0;
2258        else
2259                return -1;
2260}
2261
2262DLLEXPORT int trace_message_thread(libtrace_t * libtrace, libtrace_thread_t *t, libtrace_message_t * message)
2263{
2264        int ret;
2265        if (!message->sender)
2266                message->sender = get_thread_descriptor(libtrace);
2267
2268        ret = libtrace_message_queue_put(&t->messages, message);
2269        return ret < 0 ? 0 : ret;
2270}
2271
2272DLLEXPORT int trace_message_reporter(libtrace_t * libtrace, libtrace_message_t * message)
2273{
2274        if (!trace_has_reporter(libtrace) ||
2275            !(libtrace->reporter_thread.state == THREAD_RUNNING
2276              || libtrace->reporter_thread.state == THREAD_PAUSED))
2277                return -1;
2278
2279        return trace_message_thread(libtrace, &libtrace->reporter_thread, message);
2280}
2281
2282DLLEXPORT int trace_post_reporter(libtrace_t *libtrace)
2283{
2284        libtrace_message_t message = {0, {.uint64=0}, NULL};
2285        message.code = MESSAGE_POST_REPORTER;
2286        return trace_message_reporter(libtrace, (void *) &message);
2287}
2288
2289DLLEXPORT int trace_message_perpkts(libtrace_t * libtrace, libtrace_message_t * message)
2290{
2291        int i;
2292        int missed = 0;
2293        if (message->sender == NULL)
2294                message->sender = get_thread_descriptor(libtrace);
2295        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2296                if (libtrace->perpkt_threads[i].state == THREAD_RUNNING ||
2297                    libtrace->perpkt_threads[i].state == THREAD_PAUSED) {
2298                        libtrace_message_queue_put(&libtrace->perpkt_threads[i].messages, message);
2299                } else {
2300                        missed += 1;
2301                }
2302        }
2303        return -missed;
2304}
2305
2306/**
2307 * Publishes a result to the reduce queue
2308 * Should only be called by a perpkt thread, i.e. from a perpkt handler
2309 */
2310DLLEXPORT void trace_publish_result(libtrace_t *libtrace, libtrace_thread_t *t, uint64_t key, libtrace_generic_t value, int type) {
2311        libtrace_result_t res;
2312        res.type = type;
2313        res.key = key;
2314        res.value = value;
2315        assert(libtrace->combiner.publish);
2316        libtrace->combiner.publish(libtrace, t->perpkt_num, &libtrace->combiner, &res);
2317        return;
2318}
2319
2320DLLEXPORT void trace_set_combiner(libtrace_t *trace, const libtrace_combine_t *combiner, libtrace_generic_t config){
2321        if (combiner) {
2322                trace->combiner = *combiner;
2323                trace->combiner.configuration = config;
2324        } else {
2325                // No combiner, so don't try use it
2326                memset(&trace->combiner, 0, sizeof(trace->combiner));
2327        }
2328}
2329
2330DLLEXPORT uint64_t trace_packet_get_order(libtrace_packet_t * packet) {
2331        return packet->order;
2332}
2333
2334DLLEXPORT uint64_t trace_packet_get_hash(libtrace_packet_t * packet) {
2335        return packet->hash;
2336}
2337
2338DLLEXPORT void trace_packet_set_order(libtrace_packet_t * packet, uint64_t order) {
2339        packet->order = order;
2340}
2341
2342DLLEXPORT void trace_packet_set_hash(libtrace_packet_t * packet, uint64_t hash) {
2343        packet->hash = hash;
2344}
2345
2346DLLEXPORT bool trace_has_finished(libtrace_t * libtrace) {
2347        return libtrace->state == STATE_FINISHED || libtrace->state == STATE_JOINED;
2348}
2349
2350/**
2351 * @return True if the trace is not running such that it can be configured
2352 */
2353static inline bool trace_is_configurable(libtrace_t *trace) {
2354        return trace->state == STATE_NEW ||
2355                        trace->state == STATE_PAUSED;
2356}
2357
2358DLLEXPORT int trace_set_perpkt_threads(libtrace_t *trace, int nb) {
2359        if (!trace_is_configurable(trace)) return -1;
2360
2361        /* TODO consider allowing an offset from the total number of cores i.e.
2362         * -1 reserve 1 core */
2363        if (nb >= 0) {
2364                trace->config.perpkt_threads = nb;
2365                return 0;
2366        } else {
2367                return -1;
2368        }
2369}
2370
2371DLLEXPORT int trace_set_tick_interval(libtrace_t *trace, size_t millisec) {
2372        if (!trace_is_configurable(trace)) return -1;
2373
2374        trace->config.tick_interval = millisec;
2375        return 0;
2376}
2377
2378DLLEXPORT int trace_set_tick_count(libtrace_t *trace, size_t count) {
2379        if (!trace_is_configurable(trace)) return -1;
2380
2381        trace->config.tick_count = count;
2382        return 0;
2383}
2384
2385DLLEXPORT int trace_set_tracetime(libtrace_t *trace, bool tracetime) {
2386        if (!trace_is_configurable(trace)) return -1;
2387
2388        trace->tracetime = tracetime;
2389        return 0;
2390}
2391
2392DLLEXPORT int trace_set_cache_size(libtrace_t *trace, size_t size) {
2393        if (!trace_is_configurable(trace)) return -1;
2394
2395        trace->config.cache_size = size;
2396        return 0;
2397}
2398
2399DLLEXPORT int trace_set_thread_cache_size(libtrace_t *trace, size_t size) {
2400        if (!trace_is_configurable(trace)) return -1;
2401
2402        trace->config.thread_cache_size = size;
2403        return 0;
2404}
2405
2406DLLEXPORT int trace_set_fixed_count(libtrace_t *trace, bool fixed) {
2407        if (!trace_is_configurable(trace)) return -1;
2408
2409        trace->config.fixed_count = fixed;
2410        return 0;
2411}
2412
2413DLLEXPORT int trace_set_burst_size(libtrace_t *trace, size_t size) {
2414        if (!trace_is_configurable(trace)) return -1;
2415
2416        trace->config.burst_size = size;
2417        return 0;
2418}
2419
2420DLLEXPORT int trace_set_hasher_queue_size(libtrace_t *trace, size_t size) {
2421        if (!trace_is_configurable(trace)) return -1;
2422
2423        trace->config.hasher_queue_size = size;
2424        return 0;
2425}
2426
2427DLLEXPORT int trace_set_hasher_polling(libtrace_t *trace, bool polling) {
2428        if (!trace_is_configurable(trace)) return -1;
2429
2430        trace->config.hasher_polling = polling;
2431        return 0;
2432}
2433
2434DLLEXPORT int trace_set_reporter_polling(libtrace_t *trace, bool polling) {
2435        if (!trace_is_configurable(trace)) return -1;
2436
2437        trace->config.reporter_polling = polling;
2438        return 0;
2439}
2440
2441DLLEXPORT int trace_set_reporter_thold(libtrace_t *trace, size_t thold) {
2442        if (!trace_is_configurable(trace)) return -1;
2443
2444        trace->config.reporter_thold = thold;
2445        return 0;
2446}
2447
2448DLLEXPORT int trace_set_debug_state(libtrace_t *trace, bool debug_state) {
2449        if (!trace_is_configurable(trace)) return -1;
2450
2451        trace->config.debug_state = debug_state;
2452        return 0;
2453}
2454
2455static bool config_bool_parse(char *value, size_t nvalue) {
2456        if (strncmp(value, "true", nvalue) == 0)
2457                return true;
2458        else if (strncmp(value, "false", nvalue) == 0)
2459                return false;
2460        else
2461                return strtoll(value, NULL, 10) != 0;
2462}
2463
2464/* Note update documentation on trace_set_configuration */
2465static void config_string(struct user_configuration *uc, char *key, size_t nkey, char *value, size_t nvalue) {
2466        assert(key);
2467        assert(value);
2468        assert(uc);
2469        if (strncmp(key, "cache_size", nkey) == 0
2470            || strncmp(key, "cs", nkey) == 0) {
2471                uc->cache_size = strtoll(value, NULL, 10);
2472        } else if (strncmp(key, "thread_cache_size", nkey) == 0
2473                   || strncmp(key, "tcs", nkey) == 0) {
2474                uc->thread_cache_size = strtoll(value, NULL, 10);
2475        } else if (strncmp(key, "fixed_count", nkey) == 0
2476                   || strncmp(key, "fc", nkey) == 0) {
2477                uc->fixed_count = config_bool_parse(value, nvalue);
2478        } else if (strncmp(key, "burst_size", nkey) == 0
2479                   || strncmp(key, "bs", nkey) == 0) {
2480                uc->burst_size = strtoll(value, NULL, 10);
2481        } else if (strncmp(key, "tick_interval", nkey) == 0
2482                   || strncmp(key, "ti", nkey) == 0) {
2483                uc->tick_interval = strtoll(value, NULL, 10);
2484        } else if (strncmp(key, "tick_count", nkey) == 0
2485                   || strncmp(key, "tc", nkey) == 0) {
2486                uc->tick_count = strtoll(value, NULL, 10);
2487        } else if (strncmp(key, "perpkt_threads", nkey) == 0
2488                   || strncmp(key, "pt", nkey) == 0) {
2489                uc->perpkt_threads = strtoll(value, NULL, 10);
2490        } else if (strncmp(key, "hasher_queue_size", nkey) == 0
2491                   || strncmp(key, "hqs", nkey) == 0) {
2492                uc->hasher_queue_size = strtoll(value, NULL, 10);
2493        } else if (strncmp(key, "hasher_polling", nkey) == 0
2494                   || strncmp(key, "hp", nkey) == 0) {
2495                uc->hasher_polling = config_bool_parse(value, nvalue);
2496        } else if (strncmp(key, "reporter_polling", nkey) == 0
2497                   || strncmp(key, "rp", nkey) == 0) {
2498                uc->reporter_polling = config_bool_parse(value, nvalue);
2499        } else if (strncmp(key, "reporter_thold", nkey) == 0
2500                   || strncmp(key, "rt", nkey) == 0) {
2501                uc->reporter_thold = strtoll(value, NULL, 10);
2502        } else if (strncmp(key, "debug_state", nkey) == 0
2503                   || strncmp(key, "ds", nkey) == 0) {
2504                uc->debug_state = config_bool_parse(value, nvalue);
2505        } else {
2506                fprintf(stderr, "No matching option %s(=%s), ignoring\n", key, value);
2507        }
2508}
2509
2510DLLEXPORT int trace_set_configuration(libtrace_t *trace, const char *str) {
2511        char *pch;
2512        char key[100];
2513        char value[100];
2514        char *dup;
2515        assert(str);
2516        assert(trace);
2517
2518        if (!trace_is_configurable(trace)) return -1;
2519
2520        dup = strdup(str);
2521        pch = strtok (dup," ,.-");
2522        while (pch != NULL)
2523        {
2524                if (sscanf(pch, "%99[^=]=%99s", key, value) == 2) {
2525                        config_string(&trace->config, key, sizeof(key), value, sizeof(value));
2526                } else {
2527                        fprintf(stderr, "Error parsing option %s\n", pch);
2528                }
2529                pch = strtok (NULL," ,.-");
2530        }
2531        free(dup);
2532
2533        return 0;
2534}
2535
2536DLLEXPORT int trace_set_configuration_file(libtrace_t *trace, FILE *file) {
2537        char line[1024];
2538        if (!trace_is_configurable(trace)) return -1;
2539
2540        while (fgets(line, sizeof(line), file) != NULL)
2541        {
2542                trace_set_configuration(trace, line);
2543        }
2544
2545        if(ferror(file))
2546                return -1;
2547        else
2548                return 0;
2549}
2550
2551DLLEXPORT void trace_free_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
2552        assert(packet);
2553        /* Always release any resources this might be holding */
2554        trace_fin_packet(packet);
2555        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packet, 1, 1);
2556}
2557
2558DLLEXPORT libtrace_info_t *trace_get_information(libtrace_t * libtrace) {
2559        if (libtrace->format)
2560                return &libtrace->format->info;
2561        else
2562                return NULL;
2563}
Note: See TracBrowser for help on using the repository browser.