source: lib/trace_parallel.c @ 8a237c7

cachetimestampsdeveloprc-4.0.4ringdecrementfixringperformance
Last change on this file since 8a237c7 was 8a237c7, checked in by Shane Alcock <salcock@…>, 3 years ago

Fix uninitialised memory complaints when sending first packet msg.

Valgrind doesn't like it when we write unpacked structures without
explicitly setting all bytes (including the padding ones) to zero
beforehand.

  • Property mode set to 100644
File size: 85.0 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27#define _GNU_SOURCE
28#include "common.h"
29#include "config.h"
30#include <assert.h>
31#include <errno.h>
32#include <fcntl.h>
33#include <stdio.h>
34#include <stdlib.h>
35#include <string.h>
36#include <sys/stat.h>
37#include <sys/types.h>
38#ifndef WIN32
39#include <sys/socket.h>
40#endif
41#include <stdarg.h>
42#include <sys/param.h>
43
44#ifdef HAVE_LIMITS_H
45#  include <limits.h>
46#endif
47
48#ifdef HAVE_SYS_LIMITS_H
49#  include <sys/limits.h>
50#endif
51
52#ifdef HAVE_NET_IF_ARP_H
53#  include <net/if_arp.h>
54#endif
55
56#ifdef HAVE_NET_IF_H
57#  include <net/if.h>
58#endif
59
60#ifdef HAVE_NETINET_IN_H
61#  include <netinet/in.h>
62#endif
63
64#ifdef HAVE_NET_ETHERNET_H
65#  include <net/ethernet.h>
66#endif
67
68#ifdef HAVE_NETINET_IF_ETHER_H
69#  include <netinet/if_ether.h>
70#endif
71
72#include <time.h>
73#ifdef WIN32
74#include <sys/timeb.h>
75#endif
76
77#include "libtrace.h"
78#include "libtrace_parallel.h"
79
80#ifdef HAVE_NET_BPF_H
81#  include <net/bpf.h>
82#else
83#  ifdef HAVE_PCAP_BPF_H
84#    include <pcap-bpf.h>
85#  endif
86#endif
87
88
89#include "libtrace_int.h"
90#include "format_helper.h"
91#include "rt_protocol.h"
92#include "hash_toeplitz.h"
93
94#include <pthread.h>
95#include <signal.h>
96#include <unistd.h>
97#include <ctype.h>
98
99static inline int delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t);
100extern int libtrace_parallel;
101
102struct mem_stats {
103        struct memfail {
104           uint64_t cache_hit;
105           uint64_t ring_hit;
106           uint64_t miss;
107           uint64_t recycled;
108        } readbulk, read, write, writebulk;
109};
110
111
112#ifdef ENABLE_MEM_STATS
113// Grrr gcc wants this spelt out
114__thread struct mem_stats mem_hits = {{0},{0},{0},{0}};
115
116
117static void print_memory_stats() {
118        uint64_t total;
119#if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__)
120        char t_name[50];
121        pthread_getname_np(pthread_self(), t_name, sizeof(t_name));
122
123        fprintf(stderr, "Thread ID#%d - %s\n", (int) pthread_self(), t_name);
124#else
125        fprintf(stderr, "Thread ID#%d\n", (int) pthread_self());
126#endif
127
128        total = mem_hits.read.cache_hit + mem_hits.read.ring_hit + mem_hits.read.miss;
129        if (total) {
130                fprintf(stderr, "\tRead:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
131                                mem_hits.read.cache_hit, mem_hits.read.ring_hit, mem_hits.read.miss, mem_hits.read.recycled);
132                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
133                                total, (double) mem_hits.read.miss / (double) total * 100.0);
134        }
135
136        total = mem_hits.readbulk.cache_hit + mem_hits.readbulk.ring_hit + mem_hits.readbulk.miss;
137        if (total) {
138                fprintf(stderr, "\tReadbulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
139                                mem_hits.readbulk.cache_hit, mem_hits.readbulk.ring_hit, mem_hits.readbulk.miss, mem_hits.readbulk.recycled);
140
141
142                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
143                                total, (double) mem_hits.readbulk.miss / (double) total * 100.0);
144        }
145
146        total = mem_hits.write.cache_hit + mem_hits.write.ring_hit + mem_hits.write.miss;
147        if (total) {
148                fprintf(stderr, "\tWrite:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
149                                mem_hits.write.cache_hit, mem_hits.write.ring_hit, mem_hits.write.miss, mem_hits.write.recycled);
150
151                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
152                                total, (double) mem_hits.write.miss / (double) total * 100.0);
153        }
154
155        total = mem_hits.writebulk.cache_hit + mem_hits.writebulk.ring_hit + mem_hits.writebulk.miss;
156        if (total) {
157                fprintf(stderr, "\tWritebulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
158                                mem_hits.writebulk.cache_hit, mem_hits.writebulk.ring_hit, mem_hits.writebulk.miss, mem_hits.writebulk.recycled);
159
160                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
161                                total, (double) mem_hits.writebulk.miss / (double) total * 100.0);
162        }
163}
164#else
165static void print_memory_stats() {}
166#endif
167
168static const libtrace_generic_t gen_zero = {0};
169
170/* This should optimise away the switch to nothing in the explict cases */
171inline void send_message(libtrace_t *trace, libtrace_thread_t *thread,
172                const enum libtrace_messages type,
173                libtrace_generic_t data, libtrace_thread_t *sender) {
174
175        fn_cb_dataless fn = NULL;
176        enum libtrace_messages switchtype;
177        libtrace_callback_set_t *cbs = NULL;
178
179        if (thread == &trace->reporter_thread) {
180                cbs = trace->reporter_cbs;
181        } else {
182                cbs = trace->perpkt_cbs;
183        }
184
185        if (cbs == NULL)
186                return;
187
188        if (type >= MESSAGE_USER)
189                switchtype = MESSAGE_USER;
190        else
191                switchtype = (enum libtrace_messages) type;
192
193        switch (switchtype) {
194        case MESSAGE_STARTING:
195                if (cbs->message_starting)
196                        thread->user_data = (*cbs->message_starting)(trace,
197                                        thread, trace->global_blob);
198                return;
199        case MESSAGE_FIRST_PACKET:
200                if (cbs->message_first_packet)
201                                (*cbs->message_first_packet)(trace, thread,
202                                trace->global_blob, thread->user_data,
203                                sender);
204                return;
205        case MESSAGE_TICK_COUNT:
206                if (cbs->message_tick_count)
207                        (*cbs->message_tick_count)(trace, thread,
208                                        trace->global_blob, thread->user_data,
209                                        data.uint64);
210                return;
211        case MESSAGE_TICK_INTERVAL:
212                if (cbs->message_tick_interval)
213                        (*cbs->message_tick_interval)(trace, thread,
214                                        trace->global_blob, thread->user_data,
215                                        data.uint64);
216                return;
217        case MESSAGE_STOPPING:
218                fn = cbs->message_stopping;
219                break;
220        case MESSAGE_RESUMING:
221                fn = cbs->message_resuming;
222                break;
223        case MESSAGE_PAUSING:
224                fn = cbs->message_pausing;
225                break;
226        case MESSAGE_USER:
227                if (cbs->message_user)
228                        (*cbs->message_user)(trace, thread, trace->global_blob,
229                                        thread->user_data, type, data, sender);
230                return;
231        case MESSAGE_RESULT:
232                if (cbs->message_result)
233                        (*cbs->message_result)(trace, thread,
234                                        trace->global_blob, thread->user_data,
235                                        data.res);
236                return;
237
238        /* These should be unused */
239        case MESSAGE_DO_PAUSE:
240        case MESSAGE_DO_STOP:
241        case MESSAGE_POST_REPORTER:
242        case MESSAGE_PACKET:
243                return;
244        }
245
246        if (fn)
247                (*fn)(trace, thread, trace->global_blob, thread->user_data);
248}
249
250DLLEXPORT void trace_destroy_callback_set(libtrace_callback_set_t *cbset) {
251        free(cbset);
252}
253
254DLLEXPORT libtrace_callback_set_t *trace_create_callback_set() {
255        libtrace_callback_set_t *cbset;
256
257        cbset = (libtrace_callback_set_t *)malloc(sizeof(libtrace_callback_set_t));
258        memset(cbset, 0, sizeof(libtrace_callback_set_t));
259        return cbset;
260}
261
262/*
263 * This can be used once the hasher thread has been started and internally after
264 * verify_configuration.
265 */
266DLLEXPORT bool trace_has_dedicated_hasher(libtrace_t * libtrace)
267{
268        return libtrace->hasher_thread.type == THREAD_HASHER;
269}
270
271DLLEXPORT bool trace_has_reporter(libtrace_t * libtrace)
272{
273        assert(libtrace->state != STATE_NEW);
274        return libtrace->reporter_thread.type == THREAD_REPORTER && libtrace->reporter_cbs;
275}
276
277/**
278 * When running the number of perpkt threads in use.
279 * TODO what if the trace is not running yet, or has finished??
280 *
281 * @brief libtrace_perpkt_thread_nb
282 * @param t The trace
283 * @return
284 */
285DLLEXPORT int trace_get_perpkt_threads(libtrace_t * t) {
286        return t->perpkt_thread_count;
287}
288
289DLLEXPORT int trace_get_perpkt_thread_id(libtrace_thread_t *thread) {
290        return thread->perpkt_num;
291}
292
293/**
294 * Changes the overall traces state and signals the condition.
295 *
296 * @param trace A pointer to the trace
297 * @param new_state The new state of the trace
298 * @param need_lock Set to true if libtrace_lock is not held, otherwise
299 *        false in the case the lock is currently held by this thread.
300 */
301static inline void libtrace_change_state(libtrace_t *trace,
302        const enum trace_state new_state, const bool need_lock)
303{
304        UNUSED enum trace_state prev_state;
305        if (need_lock)
306                pthread_mutex_lock(&trace->libtrace_lock);
307        prev_state = trace->state;
308        trace->state = new_state;
309
310        if (trace->config.debug_state)
311                fprintf(stderr, "Trace(%s) state changed from %s to %s\n",
312                        trace->uridata, get_trace_state_name(prev_state),
313                        get_trace_state_name(trace->state));
314
315        pthread_cond_broadcast(&trace->perpkt_cond);
316        if (need_lock)
317                pthread_mutex_unlock(&trace->libtrace_lock);
318}
319
320/**
321 * Changes a thread's state and broadcasts the condition variable. This
322 * should always be done when the lock is held.
323 *
324 * Additionally for perpkt threads the state counts are updated.
325 *
326 * @param trace A pointer to the trace
327 * @param t A pointer to the thread to modify
328 * @param new_state The new state of the thread
329 * @param need_lock Set to true if libtrace_lock is not held, otherwise
330 *        false in the case the lock is currently held by this thread.
331 */
332static inline void thread_change_state(libtrace_t *trace, libtrace_thread_t *t,
333        const enum thread_states new_state, const bool need_lock)
334{
335        enum thread_states prev_state;
336        if (need_lock)
337                pthread_mutex_lock(&trace->libtrace_lock);
338        prev_state = t->state;
339        t->state = new_state;
340        if (t->type == THREAD_PERPKT) {
341                --trace->perpkt_thread_states[prev_state];
342                ++trace->perpkt_thread_states[new_state];
343        }
344
345        if (trace->config.debug_state)
346                fprintf(stderr, "Thread %d state changed from %d to %d\n",
347                        (int) t->tid, prev_state, t->state);
348
349        if (trace->perpkt_thread_states[THREAD_FINISHED] == trace->perpkt_thread_count) {
350                /* Make sure we save our final stats in case someone wants
351                 * them at the end of their program.
352                 */
353
354                trace_get_statistics(trace, NULL);
355                libtrace_change_state(trace, STATE_FINISHED, false);
356        }
357
358        pthread_cond_broadcast(&trace->perpkt_cond);
359        if (need_lock)
360                pthread_mutex_unlock(&trace->libtrace_lock);
361}
362
363/**
364 * This is valid once a trace is initialised
365 *
366 * @return True if the format supports parallel threads.
367 */
368static inline bool trace_supports_parallel(libtrace_t *trace)
369{
370        assert(trace);
371        assert(trace->format);
372        if (trace->format->pstart_input)
373                return true;
374        else
375                return false;
376}
377
378void libtrace_zero_thread(libtrace_thread_t * t) {
379        t->accepted_packets = 0;
380        t->filtered_packets = 0;
381        t->recorded_first = false;
382        t->tracetime_offset_usec = 0;
383        t->user_data = 0;
384        t->format_data = 0;
385        libtrace_zero_ringbuffer(&t->rbuffer);
386        t->trace = NULL;
387        t->ret = NULL;
388        t->type = THREAD_EMPTY;
389        t->perpkt_num = -1;
390}
391
392// Ints are aligned int is atomic so safe to read and write at same time
393// However write must be locked, read doesn't (We never try read before written to table)
394libtrace_thread_t * get_thread_table(libtrace_t *libtrace) {
395        int i = 0;
396        pthread_t tid = pthread_self();
397
398        for (;i<libtrace->perpkt_thread_count ;++i) {
399                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
400                        return &libtrace->perpkt_threads[i];
401        }
402        return NULL;
403}
404
405static libtrace_thread_t * get_thread_descriptor(libtrace_t *libtrace) {
406        libtrace_thread_t *ret;
407        if (!(ret = get_thread_table(libtrace))) {
408                pthread_t tid = pthread_self();
409                // Check if we are reporter or something else
410                if (libtrace->hasher_thread.type == THREAD_REPORTER &&
411                                pthread_equal(tid, libtrace->reporter_thread.tid))
412                        ret = &libtrace->reporter_thread;
413                else if (libtrace->hasher_thread.type == THREAD_HASHER &&
414                         pthread_equal(tid, libtrace->hasher_thread.tid))
415                        ret = &libtrace->hasher_thread;
416                else
417                        ret = NULL;
418        }
419        return ret;
420}
421
422DLLEXPORT void libtrace_make_packet_safe(libtrace_packet_t *pkt) {
423        // Duplicate the packet in standard malloc'd memory and free the
424        // original, This is a 1:1 exchange so the ocache count remains unchanged.
425        if (pkt->buf_control != TRACE_CTRL_PACKET) {
426                libtrace_packet_t *dup;
427                dup = trace_copy_packet(pkt);
428                /* Release the external buffer */
429                trace_fin_packet(pkt);
430                /* Copy the duplicated packet over the existing */
431                memcpy(pkt, dup, sizeof(libtrace_packet_t));
432                /* Free the packet structure */
433                free(dup);
434        }
435}
436
437/**
438 * Makes a libtrace_result_t safe, used when pausing a trace.
439 * This will call libtrace_make_packet_safe if the result is
440 * a packet.
441 */
442DLLEXPORT void libtrace_make_result_safe(libtrace_result_t *res) {
443        if (res->type == RESULT_PACKET) {
444                libtrace_make_packet_safe(res->value.pkt);
445        }
446}
447
448/**
449 * Holds threads in a paused state, until released by broadcasting
450 * the condition mutex.
451 */
452static void trace_thread_pause(libtrace_t *trace, libtrace_thread_t *t) {
453        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
454        thread_change_state(trace, t, THREAD_PAUSED, false);
455        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
456                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
457        }
458        thread_change_state(trace, t, THREAD_RUNNING, false);
459        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
460}
461
462/**
463 * Sends a packet to the user, expects either a valid packet or a TICK packet.
464 *
465 * @param trace The trace
466 * @param t The current thread
467 * @param packet A pointer to the packet storage, which may be set to null upon
468 *               return, or a packet to be finished.
469 * @param tracetime If true packets are delayed to match with tracetime
470 * @return 0 is successful, otherwise if playing back in tracetime
471 *         READ_MESSAGE(-2) can be returned in which case the packet is not sent.
472 *
473 * @note READ_MESSAGE will only be returned if tracetime is true.
474 */
475static inline int dispatch_packet(libtrace_t *trace,
476                                  libtrace_thread_t *t,
477                                  libtrace_packet_t **packet,
478                                  bool tracetime) {
479
480        if ((*packet)->error > 0) {
481                if (tracetime) {
482                        if (delay_tracetime(trace, packet[0], t) == READ_MESSAGE)
483                                return READ_MESSAGE;
484                }
485                if (!IS_LIBTRACE_META_PACKET((*packet))) {
486                        t->accepted_packets++;
487                }
488                if (trace->perpkt_cbs->message_packet)
489                        *packet = (*trace->perpkt_cbs->message_packet)(trace, t, trace->global_blob, t->user_data, *packet);
490                trace_fin_packet(*packet);
491        } else {
492                assert((*packet)->error == READ_TICK);
493                libtrace_generic_t data = {.uint64 = trace_packet_get_order(*packet)};
494                send_message(trace, t, MESSAGE_TICK_COUNT, data, t);
495        }
496        return 0;
497}
498
499/**
500 * Sends a batch of packets to the user, expects either a valid packet or a
501 * TICK packet.
502 *
503 * @param trace The trace
504 * @param t The current thread
505 * @param packets [in,out] An array of packets, these may be null upon return
506 * @param nb_packets The total number of packets in the list
507 * @param empty [in,out] A pointer to an integer storing the first empty slot,
508 * upon return this is updated
509 * @param offset [in,out] The offset into the array, upon return this is updated
510 * @param tracetime If true packets are delayed to match with tracetime
511 * @return 0 is successful, otherwise if playing back in tracetime
512 *         READ_MESSAGE(-2) can be returned in which case the packet is not sent.
513 *
514 * @note READ_MESSAGE will only be returned if tracetime is true.
515 */
516static inline int dispatch_packets(libtrace_t *trace,
517                                  libtrace_thread_t *t,
518                                  libtrace_packet_t *packets[],
519                                  int nb_packets, int *empty, int *offset,
520                                  bool tracetime) {
521        for (;*offset < nb_packets; ++*offset) {
522                int ret;
523                ret = dispatch_packet(trace, t, &packets[*offset], tracetime);
524                if (ret == 0) {
525                        /* Move full slots to front as we go */
526                        if (packets[*offset]) {
527                                if (*empty != *offset) {
528                                        packets[*empty] = packets[*offset];
529                                        packets[*offset] = NULL;
530                                }
531                                ++*empty;
532                        }
533                } else {
534                        /* Break early */
535                        assert(ret == READ_MESSAGE);
536                        return READ_MESSAGE;
537                }
538        }
539
540        return 0;
541}
542
543/**
544 * Pauses a per packet thread, messages will not be processed when the thread
545 * is paused.
546 *
547 * This process involves reading packets if a hasher thread is used. As such
548 * this function can fail to pause due to errors when reading in which case
549 * the thread should be stopped instead.
550 *
551 *
552 * @brief trace_perpkt_thread_pause
553 * @return READ_ERROR(-1) or READ_EOF(0) or 1 if successfull
554 */
555static int trace_perpkt_thread_pause(libtrace_t *trace, libtrace_thread_t *t,
556                                     libtrace_packet_t *packets[],
557                                     int nb_packets, int *empty, int *offset) {
558        libtrace_packet_t * packet = NULL;
559
560        /* Let the user thread know we are going to pause */
561        send_message(trace, t, MESSAGE_PAUSING, gen_zero, t);
562
563        /* Send through any remaining packets (or messages) without delay */
564
565        /* First send those packets already read, as fast as possible
566         * This should never fail or check for messages etc. */
567        ASSERT_RET(dispatch_packets(trace, t, packets, nb_packets, empty,
568                                    offset, false), == 0);
569
570        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
571        /* If a hasher thread is running, empty input queues so we don't lose data */
572        if (trace_has_dedicated_hasher(trace)) {
573                // The hasher has stopped by this point, so the queue shouldn't be filling
574                while(!libtrace_ringbuffer_is_empty(&t->rbuffer) || t->format_data) {
575                        int ret = trace->pread(trace, t, &packet, 1);
576                        if (ret == 1) {
577                                if (packet->error > 0) {
578                                        store_first_packet(trace, packet, t);
579                                }
580                                ASSERT_RET(dispatch_packet(trace, t, &packet, false), == 0);
581                                if (packet == NULL)
582                                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
583                        } else if (ret != READ_MESSAGE) {
584                                /* Ignore messages we pick these up next loop */
585                                assert (ret == READ_EOF || ret == READ_ERROR);
586                                /* Verify no packets are remaining */
587                                /* TODO refactor this sanity check out!! */
588                                while (!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
589                                        ASSERT_RET(trace->pread(trace, t, &packet, 1), <= 0);
590                                        // No packets after this should have any data in them
591                                        assert(packet->error <= 0);
592                                }
593                                libtrace_ocache_free(&trace->packet_freelist, (void **) &packet, 1, 1);
594                                return -1;
595                        }
596                }
597        }
598        libtrace_ocache_free(&trace->packet_freelist, (void **) &packet, 1, 1);
599
600        /* Now we do the actual pause, this returns when we resumed */
601        trace_thread_pause(trace, t);
602        send_message(trace, t, MESSAGE_RESUMING, gen_zero, t);
603        return 1;
604}
605
606/**
607 * The is the entry point for our packet processing threads.
608 */
609static void* perpkt_threads_entry(void *data) {
610        libtrace_t *trace = (libtrace_t *)data;
611        libtrace_thread_t *t;
612        libtrace_message_t message = {0, {.uint64=0}, NULL};
613        libtrace_packet_t *packets[trace->config.burst_size];
614        size_t i;
615        //int ret;
616        /* The current reading position into the packets */
617        int offset = 0;
618        /* The number of packets last read */
619        int nb_packets = 0;
620        /* The offset to the first NULL packet upto offset */
621        int empty = 0;
622        int j;
623
624        /* Wait until trace_pstart has been completed */
625        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
626        t = get_thread_table(trace);
627        assert(t);
628        if (trace->state == STATE_ERROR) {
629                thread_change_state(trace, t, THREAD_FINISHED, false);
630                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
631                pthread_exit(NULL);
632        }
633        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
634
635        if (trace->format->pregister_thread) {
636                if (trace->format->pregister_thread(trace, t, 
637                                trace_is_parallel(trace)) < 0) {
638                        thread_change_state(trace, t, THREAD_FINISHED, false);
639                        pthread_exit(NULL);
640                }
641        }
642
643        /* Fill our buffer with empty packets */
644        memset(&packets, 0, sizeof(void*) * trace->config.burst_size);
645        libtrace_ocache_alloc(&trace->packet_freelist, (void **) packets,
646                              trace->config.burst_size,
647                              trace->config.burst_size);
648
649        /* ~~~~~~~~~~~ Setup complete now we loop ~~~~~~~~~~~~~~~ */
650
651        /* Let the per_packet function know we have started */
652        send_message(trace, t, MESSAGE_STARTING, gen_zero, t);
653        send_message(trace, t, MESSAGE_RESUMING, gen_zero, t);
654
655        for (;;) {
656
657                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
658                        int ret;
659                        switch (message.code) {
660                                case MESSAGE_DO_PAUSE: // This is internal
661                                        ret = trace_perpkt_thread_pause(trace, t,
662                                              packets, nb_packets, &empty, &offset);
663                                        if (ret == READ_EOF) {
664                                                goto eof;
665                                        } else if (ret == READ_ERROR) {
666                                                goto error;
667                                        }
668                                        assert(ret == 1);
669                                        continue;
670                                case MESSAGE_DO_STOP: // This is internal
671                                        goto eof;
672                        }
673                        send_message(trace, t, message.code, message.data, 
674                                        message.sender);
675                        /* Continue and the empty messages out before packets */
676                        continue;
677                }
678
679
680                /* Do we need to read a new set of packets MOST LIKELY we do */
681                if (offset == nb_packets) {
682                        /* Refill the packet buffer */
683                        if (empty != nb_packets) {
684                                // Refill the empty packets
685                                libtrace_ocache_alloc(&trace->packet_freelist,
686                                                      (void **) &packets[empty],
687                                                      nb_packets - empty,
688                                                      nb_packets - empty);
689                        }
690                        if (!trace->pread) {
691                                assert(packets[0]);
692                                ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
693                                nb_packets = trace_read_packet(trace, packets[0]);
694                                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
695                                packets[0]->error = nb_packets;
696                                if (nb_packets > 0)
697                                        nb_packets = 1;
698                        } else {
699                                nb_packets = trace->pread(trace, t, packets, trace->config.burst_size);
700                        }
701                        offset = 0;
702                        empty = 0;
703                }
704
705                /* Handle error/message cases */
706                if (nb_packets > 0) {
707                        /* Store the first non-meta packet */
708                        for (j = 0; j < nb_packets; j++) {
709                                if (t->recorded_first)
710                                        break;
711                                if (packets[j]->error > 0) {
712                                        store_first_packet(trace, packets[j], t);
713                                }
714                        }
715                        dispatch_packets(trace, t, packets, nb_packets, &empty,
716                                         &offset, trace->tracetime);
717                } else {
718                        switch (nb_packets) {
719                        case READ_EOF:
720                                goto eof;
721                        case READ_ERROR:
722                                goto error;
723                        case READ_MESSAGE:
724                                nb_packets = 0;
725                                continue;
726                        default:
727                                fprintf(stderr, "Unexpected error %d!!\n", nb_packets);
728                                goto error;
729                        }
730                }
731
732        }
733
734error:
735        message.code = MESSAGE_DO_STOP;
736        message.sender = t;
737        message.data.uint64 = 0;
738        trace_message_perpkts(trace, &message);
739eof:
740        /* ~~~~~~~~~~~~~~ Trace is finished do tear down ~~~~~~~~~~~~~~~~~~~~~ */
741
742        // Let the per_packet function know we have stopped
743        send_message(trace, t, MESSAGE_PAUSING, gen_zero, t);
744        send_message(trace, t, MESSAGE_STOPPING, gen_zero, t);
745
746        // Free any remaining packets
747        for (i = 0; i < trace->config.burst_size; i++) {
748                if (packets[i]) {
749                        libtrace_ocache_free(&trace->packet_freelist, (void **) &packets[i], 1, 1);
750                        packets[i] = NULL;
751                }
752        }
753
754        thread_change_state(trace, t, THREAD_FINISHED, true);
755
756        /* Make sure the reporter sees we have finished */
757        if (trace_has_reporter(trace))
758                trace_post_reporter(trace);
759
760        // Release all ocache memory before unregistering with the format
761        // because this might(it does in DPDK) unlink the formats mempool
762        // causing destroy/finish packet to fail.
763        libtrace_ocache_unregister_thread(&trace->packet_freelist);
764        if (trace->format->punregister_thread) {
765                trace->format->punregister_thread(trace, t);
766        }
767        print_memory_stats();
768
769        pthread_exit(NULL);
770}
771
772/**
773 * The start point for our single threaded hasher thread, this will read
774 * and hash a packet from a data source and queue it against the correct
775 * core to process it.
776 */
777static void* hasher_entry(void *data) {
778        libtrace_t *trace = (libtrace_t *)data;
779        libtrace_thread_t * t;
780        int i;
781        libtrace_packet_t * packet;
782        libtrace_message_t message = {0, {.uint64=0}, NULL};
783        int pkt_skipped = 0;
784
785        assert(trace_has_dedicated_hasher(trace));
786        /* Wait until all threads are started and objects are initialised (ring buffers) */
787        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
788        t = &trace->hasher_thread;
789        assert(t->type == THREAD_HASHER && pthread_equal(pthread_self(), t->tid));
790        if (trace->state == STATE_ERROR) {
791                thread_change_state(trace, t, THREAD_FINISHED, false);
792                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
793                pthread_exit(NULL);
794        }
795        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
796
797        /* We are reading but it is not the parallel API */
798        if (trace->format->pregister_thread) {
799                trace->format->pregister_thread(trace, t, true);
800        }
801
802        /* Read all packets in then hash and queue against the correct thread */
803        while (1) {
804                int thread;
805                if (!pkt_skipped)
806                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
807                assert(packet);
808
809                // Check for messages that we expect MESSAGE_DO_PAUSE, (internal messages only)
810                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
811                        switch(message.code) {
812                                case MESSAGE_DO_PAUSE:
813                                        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
814                                        thread_change_state(trace, t, THREAD_PAUSED, false);
815                                        pthread_cond_broadcast(&trace->perpkt_cond);
816                                        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
817                                                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
818                                        }
819                                        thread_change_state(trace, t, THREAD_RUNNING, false);
820                                        pthread_cond_broadcast(&trace->perpkt_cond);
821                                        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
822                                        break;
823                                case MESSAGE_DO_STOP:
824                                        /* Either FINISHED or FINISHING */
825                                        assert(trace->started == false);
826                                        /* Mark the current packet as EOF */
827                                        packet->error = 0;
828                                        goto hasher_eof;
829                                default:
830                                        fprintf(stderr, "Hasher thread didn't expect message code=%d\n", message.code);
831                        }
832                        pkt_skipped = 1;
833                        continue;
834                }
835
836                if ((packet->error = trace_read_packet(trace, packet)) <1) {
837                        if (packet->error == READ_MESSAGE) {
838                                pkt_skipped = 1;
839                                continue;
840                        } else {
841                                break; /* We are EOF or error'd either way we stop  */
842                        }
843                }
844
845                /* We are guaranteed to have a hash function i.e. != NULL */
846                trace_packet_set_hash(packet, (*trace->hasher)(packet, trace->hasher_data));
847                thread = trace_packet_get_hash(packet) % trace->perpkt_thread_count;
848                /* Blocking write to the correct queue - I'm the only writer */
849                if (trace->perpkt_threads[thread].state != THREAD_FINISHED) {
850                        uint64_t order = trace_packet_get_order(packet);
851                        libtrace_ringbuffer_write(&trace->perpkt_threads[thread].rbuffer, packet);
852                        if (trace->config.tick_count && order % trace->config.tick_count == 0) {
853                                // Write ticks to everyone else
854                                libtrace_packet_t * pkts[trace->perpkt_thread_count];
855                                memset(pkts, 0, sizeof(void *) * trace->perpkt_thread_count);
856                                libtrace_ocache_alloc(&trace->packet_freelist, (void **) pkts, trace->perpkt_thread_count, trace->perpkt_thread_count);
857                                for (i = 0; i < trace->perpkt_thread_count; i++) {
858                                        pkts[i]->error = READ_TICK;
859                                        trace_packet_set_order(pkts[i], order);
860                                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, pkts[i]);
861                                }
862                        }
863                        pkt_skipped = 0;
864                } else {
865                        assert(!"Dropping a packet!!");
866                        pkt_skipped = 1; // Reuse that packet no one read it
867                }
868        }
869hasher_eof:
870        /* Broadcast our last failed read to all threads */
871        for (i = 0; i < trace->perpkt_thread_count; i++) {
872                libtrace_packet_t * bcast;
873                if (i == trace->perpkt_thread_count - 1) {
874                        bcast = packet;
875                } else {
876                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &bcast, 1, 1);
877                        bcast->error = packet->error;
878                }
879                ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
880                if (trace->perpkt_threads[i].state != THREAD_FINISHED) {
881                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, bcast);
882                } else {
883                        libtrace_ocache_free(&trace->packet_freelist, (void **) &bcast, 1, 1);
884                }
885                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
886        }
887
888        // We don't need to free the packet
889        thread_change_state(trace, t, THREAD_FINISHED, true);
890
891        libtrace_ocache_unregister_thread(&trace->packet_freelist);
892        if (trace->format->punregister_thread) {
893                trace->format->punregister_thread(trace, t);
894        }
895        print_memory_stats();
896
897        // TODO remove from TTABLE t sometime
898        pthread_exit(NULL);
899}
900
901/* Our simplest case when a thread becomes ready it can obtain an exclusive
902 * lock to read packets from the underlying trace.
903 */
904static int trace_pread_packet_first_in_first_served(libtrace_t *libtrace,
905                                                    libtrace_thread_t *t,
906                                                    libtrace_packet_t *packets[],
907                                                    size_t nb_packets) {
908        size_t i = 0;
909        //bool tick_hit = false;
910
911        ASSERT_RET(pthread_mutex_lock(&libtrace->read_packet_lock), == 0);
912        /* Read nb_packets */
913        for (i = 0; i < nb_packets; ++i) {
914                if (libtrace_message_queue_count(&t->messages) > 0) {
915                        if ( i==0 ) {
916                                ASSERT_RET(pthread_mutex_unlock(&libtrace->read_packet_lock), == 0);
917                                return READ_MESSAGE;
918                        } else {
919                                break;
920                        }
921                }
922                packets[i]->error = trace_read_packet(libtrace, packets[i]);
923
924                if (packets[i]->error <= 0) {
925                        /* We'll catch this next time if we have already got packets */
926                        if ( i==0 ) {
927                                ASSERT_RET(pthread_mutex_unlock(&libtrace->read_packet_lock), == 0);
928                                return packets[i]->error;
929                        } else {
930                                break;
931                        }
932                }
933                /*
934                if (libtrace->config.tick_count && trace_packet_get_order(packets[i]) % libtrace->config.tick_count == 0) {
935                        tick_hit = true;
936                }*/
937
938                // Doing this inside the lock ensures the first packet is
939                // always recorded first
940                if (!t->recorded_first && packets[0]->error > 0) {
941                        store_first_packet(libtrace, packets[0], t);
942                }
943        }
944        ASSERT_RET(pthread_mutex_unlock(&libtrace->read_packet_lock), == 0);
945        /* XXX TODO this needs to be inband with packets, or we don't bother in this case
946        if (tick_hit) {
947                libtrace_message_t tick;
948                tick.additional.uint64 = trace_packet_get_order(packets[i]);
949                tick.code = MESSAGE_TICK;
950                trace_send_message_to_perpkts(libtrace, &tick);
951        } */
952        return i;
953}
954
955/**
956 * For the case that we have a dedicated hasher thread
957 * 1. We read a packet from our buffer
958 * 2. Move that into the packet provided (packet)
959 */
960inline static int trace_pread_packet_hasher_thread(libtrace_t *libtrace,
961                                                   libtrace_thread_t *t,
962                                                   libtrace_packet_t *packets[],
963                                                   size_t nb_packets) {
964        size_t i;
965
966        /* We store the last error message here */
967        if (t->format_data) {
968                return ((libtrace_packet_t *)t->format_data)->error;
969        }
970
971        // Always grab at least one
972        if (packets[0]) // Recycle the old get the new
973                libtrace_ocache_free(&libtrace->packet_freelist, (void **) packets, 1, 1);
974        packets[0] = libtrace_ringbuffer_read(&t->rbuffer);
975
976        if (packets[0]->error <= 0 && packets[0]->error != READ_TICK) {
977                return packets[0]->error;
978        }
979
980        for (i = 1; i < nb_packets; i++) {
981                if (packets[i]) // Recycle the old get the new
982                        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packets[i], 1, 1);
983                if (!libtrace_ringbuffer_try_read(&t->rbuffer, (void **) &packets[i])) {
984                        packets[i] = NULL;
985                        break;
986                }
987
988                /* We will return an error or EOF the next time around */
989                if (packets[i]->error <= 0 && packets[0]->error != READ_TICK) {
990                        /* The message case will be checked automatically -
991                           However other cases like EOF and error will only be
992                           sent once*/
993                        if (packets[i]->error != READ_MESSAGE) {
994                                assert(t->format_data == NULL);
995                                t->format_data = packets[i];
996                        }
997                        break;
998                }
999        }
1000
1001        return i;
1002}
1003
1004/**
1005 * For the first packet of each queue we keep a copy and note the system
1006 * time it was received at.
1007 *
1008 * This is used for finding the first packet when playing back a trace
1009 * in trace time. And can be used by real time applications to print
1010 * results out every XXX seconds.
1011 */
1012void store_first_packet(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t)
1013{
1014
1015        libtrace_message_t mesg = {0, {.uint64=0}, NULL};
1016        struct timeval tv;
1017        libtrace_packet_t * dup;
1018
1019        if (t->recorded_first) {
1020                return;
1021        }
1022
1023        if (IS_LIBTRACE_META_PACKET(packet)) {
1024                return;
1025        }
1026
1027        /* We mark system time against a copy of the packet */
1028        gettimeofday(&tv, NULL);
1029        dup = trace_copy_packet(packet);
1030
1031        ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
1032        libtrace->first_packets.packets[t->perpkt_num].packet = dup;
1033        memcpy(&libtrace->first_packets.packets[t->perpkt_num].tv, &tv, sizeof(tv));
1034        libtrace->first_packets.count++;
1035
1036        /* Now update the first */
1037        if (libtrace->first_packets.count == 1) {
1038                /* We the first entry hence also the first known packet */
1039                libtrace->first_packets.first = t->perpkt_num;
1040        } else {
1041                /* Check if we are newer than the previous 'first' packet */
1042                size_t first = libtrace->first_packets.first;
1043                struct timeval cur_ts = trace_get_timeval(dup);
1044                struct timeval first_ts = trace_get_timeval(libtrace->first_packets.packets[first].packet);
1045                if (timercmp(&cur_ts, &first_ts, <))
1046                        libtrace->first_packets.first = t->perpkt_num;
1047        }
1048        ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
1049
1050        memset(&mesg, 0, sizeof(libtrace_message_t));
1051        mesg.code = MESSAGE_FIRST_PACKET;
1052        trace_message_reporter(libtrace, &mesg);
1053        trace_message_perpkts(libtrace, &mesg);
1054        t->recorded_first = true;
1055}
1056
1057DLLEXPORT int trace_get_first_packet(libtrace_t *libtrace,
1058                                     libtrace_thread_t *t,
1059                                     const libtrace_packet_t **packet,
1060                                     const struct timeval **tv)
1061{
1062        void * tmp;
1063        int ret = 0;
1064
1065        if (t) {
1066                if (t->type != THREAD_PERPKT || t->trace != libtrace)
1067                        return -1;
1068        }
1069
1070        /* Throw away these which we don't use */
1071        if (!packet)
1072                packet = (const libtrace_packet_t **) &tmp;
1073        if (!tv)
1074                tv = (const struct timeval **) &tmp;
1075
1076        ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
1077        if (t) {
1078                /* Get the requested thread */
1079                *packet = libtrace->first_packets.packets[t->perpkt_num].packet;
1080                *tv = &libtrace->first_packets.packets[t->perpkt_num].tv;
1081        } else if (libtrace->first_packets.count) {
1082                /* Get the first packet across all threads */
1083                *packet = libtrace->first_packets.packets[libtrace->first_packets.first].packet;
1084                *tv = &libtrace->first_packets.packets[libtrace->first_packets.first].tv;
1085                if (libtrace->first_packets.count == (size_t) libtrace->perpkt_thread_count) {
1086                        ret = 1;
1087                } else {
1088                        struct timeval curr_tv;
1089                        // If a second has passed since the first entry we will assume this is the very first packet
1090                        gettimeofday(&curr_tv, NULL);
1091                        if (curr_tv.tv_sec > (*tv)->tv_sec) {
1092                                if(curr_tv.tv_usec > (*tv)->tv_usec || curr_tv.tv_sec - (*tv)->tv_sec > 1) {
1093                                        ret = 1;
1094                                }
1095                        }
1096                }
1097        } else {
1098                *packet = NULL;
1099                *tv = NULL;
1100        }
1101        ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
1102        return ret;
1103}
1104
1105
1106DLLEXPORT uint64_t tv_to_usec(const struct timeval *tv)
1107{
1108        return (uint64_t) tv->tv_sec*1000000ull + (uint64_t) tv->tv_usec;
1109}
1110
1111inline static struct timeval usec_to_tv(uint64_t usec)
1112{
1113        struct timeval tv;
1114        tv.tv_sec = usec / 1000000;
1115        tv.tv_usec = usec % 1000000;
1116        return tv;
1117}
1118
1119/** Similar to delay_tracetime but send messages to all threads periodically */
1120static void* reporter_entry(void *data) {
1121        libtrace_message_t message = {0, {.uint64=0}, NULL};
1122        libtrace_t *trace = (libtrace_t *)data;
1123        libtrace_thread_t *t = &trace->reporter_thread;
1124
1125        /* Wait until all threads are started */
1126        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
1127        if (trace->state == STATE_ERROR) {
1128                thread_change_state(trace, t, THREAD_FINISHED, false);
1129                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1130                pthread_exit(NULL);
1131        }
1132        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1133
1134        if (trace->format->pregister_thread) {
1135                trace->format->pregister_thread(trace, t, false);
1136        }
1137
1138        send_message(trace, t, MESSAGE_STARTING, (libtrace_generic_t){0}, t);
1139        send_message(trace, t, MESSAGE_RESUMING, (libtrace_generic_t){0}, t);
1140
1141        while (!trace_has_finished(trace)) {
1142                if (trace->config.reporter_polling) {
1143                        if (libtrace_message_queue_try_get(&t->messages, &message) == LIBTRACE_MQ_FAILED)
1144                                message.code = MESSAGE_POST_REPORTER;
1145                } else {
1146                        libtrace_message_queue_get(&t->messages, &message);
1147                }
1148                switch (message.code) {
1149                        // Check for results
1150                        case MESSAGE_POST_REPORTER:
1151                                trace->combiner.read(trace, &trace->combiner);
1152                                break;
1153                        case MESSAGE_DO_PAUSE:
1154                                assert(trace->combiner.pause);
1155                                trace->combiner.pause(trace, &trace->combiner);
1156                                send_message(trace, t, MESSAGE_PAUSING,
1157                                                (libtrace_generic_t) {0}, t);
1158                                trace_thread_pause(trace, t);
1159                                send_message(trace, t, MESSAGE_RESUMING,
1160                                                (libtrace_generic_t) {0}, t);
1161                                break;
1162                default:
1163                        send_message(trace, t, message.code, message.data,
1164                                        message.sender);
1165                }
1166        }
1167
1168        // Flush out whats left now all our threads have finished
1169        trace->combiner.read_final(trace, &trace->combiner);
1170
1171        // GOODBYE
1172        send_message(trace, t, MESSAGE_PAUSING,(libtrace_generic_t) {0}, t);
1173        send_message(trace, t, MESSAGE_STOPPING,(libtrace_generic_t) {0}, t);
1174
1175        thread_change_state(trace, &trace->reporter_thread, THREAD_FINISHED, true);
1176        print_memory_stats();
1177        return NULL;
1178}
1179
1180/** Similar to delay_tracetime but send messages to all threads periodically */
1181static void* keepalive_entry(void *data) {
1182        struct timeval prev, next;
1183        libtrace_message_t message = {0, {.uint64=0}, NULL};
1184        libtrace_t *trace = (libtrace_t *)data;
1185        uint64_t next_release;
1186        libtrace_thread_t *t = &trace->keepalive_thread;
1187
1188        /* Wait until all threads are started */
1189        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
1190        if (trace->state == STATE_ERROR) {
1191                thread_change_state(trace, t, THREAD_FINISHED, false);
1192                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1193                pthread_exit(NULL);
1194        }
1195        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1196
1197        gettimeofday(&prev, NULL);
1198        memset(&message, 0, sizeof(libtrace_message_t));
1199        message.code = MESSAGE_TICK_INTERVAL;
1200
1201        while (trace->state != STATE_FINISHED) {
1202                fd_set rfds;
1203                next_release = tv_to_usec(&prev) + (trace->config.tick_interval * 1000);
1204                gettimeofday(&next, NULL);
1205                if (next_release > tv_to_usec(&next)) {
1206                        next = usec_to_tv(next_release - tv_to_usec(&next));
1207                        // Wait for timeout or a message
1208                        FD_ZERO(&rfds);
1209                        FD_SET(libtrace_message_queue_get_fd(&t->messages), &rfds);
1210                        if (select(libtrace_message_queue_get_fd(&t->messages)+1, &rfds, NULL, NULL, &next) == 1) {
1211                                libtrace_message_t msg;
1212                                libtrace_message_queue_get(&t->messages, &msg);
1213                                assert(msg.code == MESSAGE_DO_STOP);
1214                                goto done;
1215                        }
1216                }
1217                prev = usec_to_tv(next_release);
1218                if (trace->state == STATE_RUNNING) {
1219                        message.data.uint64 = ((((uint64_t)prev.tv_sec) << 32) +
1220                                               (((uint64_t)prev.tv_usec << 32)/1000000));
1221                        trace_message_perpkts(trace, &message);
1222                }
1223        }
1224done:
1225
1226        thread_change_state(trace, t, THREAD_FINISHED, true);
1227        return NULL;
1228}
1229
1230/**
1231 * Delays a packets playback so the playback will be in trace time.
1232 * This may break early if a message becomes available.
1233 *
1234 * Requires the first packet for this thread to be received.
1235 * @param libtrace  The trace
1236 * @param packet    The packet to delay
1237 * @param t         The current thread
1238 * @return Either READ_MESSAGE(-2) or 0 is successful
1239 */
1240static inline int delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t) {
1241        struct timeval curr_tv, pkt_tv;
1242        uint64_t next_release = t->tracetime_offset_usec;
1243        uint64_t curr_usec;
1244
1245        if (!t->tracetime_offset_usec) {
1246                const libtrace_packet_t *first_pkt;
1247                const struct timeval *sys_tv;
1248                int64_t initial_offset;
1249                int stable = trace_get_first_packet(libtrace, NULL, &first_pkt, &sys_tv);
1250                if (!first_pkt)
1251                        return 0;
1252                pkt_tv = trace_get_timeval(first_pkt);
1253                initial_offset = (int64_t)tv_to_usec(sys_tv) - (int64_t)tv_to_usec(&pkt_tv);
1254                /* In the unlikely case offset is 0, change it to 1 */
1255                if (stable)
1256                        t->tracetime_offset_usec = initial_offset ? initial_offset: 1;
1257                next_release = initial_offset;
1258        }
1259        /* next_release == offset */
1260        pkt_tv = trace_get_timeval(packet);
1261        next_release += tv_to_usec(&pkt_tv);
1262        gettimeofday(&curr_tv, NULL);
1263        curr_usec = tv_to_usec(&curr_tv);
1264        if (next_release > curr_usec) {
1265                int ret, mesg_fd = libtrace_message_queue_get_fd(&t->messages);
1266                struct timeval delay_tv = usec_to_tv(next_release-curr_usec);
1267                fd_set rfds;
1268                FD_ZERO(&rfds);
1269                FD_SET(mesg_fd, &rfds);
1270                // We need to wait
1271                ret = select(mesg_fd+1, &rfds, NULL, NULL, &delay_tv);
1272                if (ret == 0) {
1273                        return 0;
1274                } else if (ret > 0) {
1275                        return READ_MESSAGE;
1276                } else {
1277                        assert(!"trace_delay_packet: Unexpected return from select");
1278                }
1279        }
1280        return 0;
1281}
1282
1283/* Discards packets that don't match the filter.
1284 * Discarded packets are emptied and then moved to the end of the packet list.
1285 *
1286 * @param trace       The trace format, containing the filter
1287 * @param packets     An array of packets
1288 * @param nb_packets  The number of valid items in packets
1289 *
1290 * @return The number of packets that passed the filter, which are moved to
1291 *          the start of the packets array
1292 */
1293static inline size_t filter_packets(libtrace_t *trace,
1294                                    libtrace_packet_t **packets,
1295                                    size_t nb_packets) {
1296        size_t offset = 0;
1297        size_t i;
1298
1299        for (i = 0; i < nb_packets; ++i) {
1300                // The filter needs the trace attached to receive the link type
1301                packets[i]->trace = trace;
1302                if (trace_apply_filter(trace->filter, packets[i])) {
1303                        libtrace_packet_t *tmp;
1304                        tmp = packets[offset];
1305                        packets[offset++] = packets[i];
1306                        packets[i] = tmp;
1307                } else {
1308                        trace_fin_packet(packets[i]);
1309                }
1310        }
1311
1312        return offset;
1313}
1314
1315/* Read a batch of packets from the trace into a buffer.
1316 * Note that this function will block until a packet is read (or EOF is reached)
1317 *
1318 * @param libtrace    The trace
1319 * @param t           The thread
1320 * @param packets     An array of packets
1321 * @param nb_packets  The number of empty packets in packets
1322 * @return The number of packets read, 0 on EOF (or an error/message -1,-2).
1323 */
1324static int trace_pread_packet_wrapper(libtrace_t *libtrace,
1325                                      libtrace_thread_t *t,
1326                                      libtrace_packet_t *packets[],
1327                                      size_t nb_packets) {
1328        int i;
1329        assert(nb_packets);
1330        assert(libtrace && "libtrace is NULL in trace_read_packet()");
1331        if (trace_is_err(libtrace))
1332                return -1;
1333        if (!libtrace->started) {
1334                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
1335                              "You must call libtrace_start() before trace_read_packet()\n");
1336                return -1;
1337        }
1338
1339        if (libtrace->format->pread_packets) {
1340                int ret;
1341                for (i = 0; i < (int) nb_packets; ++i) {
1342                        assert(i[packets]);
1343                        if (!(packets[i]->buf_control==TRACE_CTRL_PACKET ||
1344                              packets[i]->buf_control==TRACE_CTRL_EXTERNAL)) {
1345                                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,
1346                                              "Packet passed to trace_read_packet() is invalid\n");
1347                                return -1;
1348                        }
1349                }
1350                do {
1351                        ret=libtrace->format->pread_packets(libtrace, t,
1352                                                            packets,
1353                                                            nb_packets);
1354                        /* Error, EOF or message? */
1355                        if (ret <= 0) {
1356                                return ret;
1357                        }
1358
1359                        if (libtrace->filter) {
1360                                int remaining;
1361                                remaining = filter_packets(libtrace,
1362                                                           packets, ret);
1363                                t->filtered_packets += ret - remaining;
1364                                ret = remaining;
1365                        }
1366                        for (i = 0; i < ret; ++i) {
1367                                /* We do not mark the packet against the trace,
1368                                 * before hand or after. After breaks DAG meta
1369                                 * packets and before is inefficient */
1370                                //packets[i]->trace = libtrace;
1371                                /* TODO IN FORMAT?? Like traditional libtrace */
1372                                if (libtrace->snaplen>0)
1373                                        trace_set_capture_length(packets[i],
1374                                                        libtrace->snaplen);
1375                        }
1376                } while(ret == 0);
1377                return ret;
1378        }
1379        trace_set_err(libtrace, TRACE_ERR_UNSUPPORTED,
1380                      "This format does not support reading packets\n");
1381        return ~0U;
1382}
1383
1384/* Restarts a parallel trace, this is called from trace_pstart.
1385 * The libtrace lock is held upon calling this function.
1386 * Typically with a parallel trace the threads are not
1387 * killed rather.
1388 */
1389static int trace_prestart(libtrace_t * libtrace, void *global_blob,
1390                          libtrace_callback_set_t *per_packet_cbs, 
1391                          libtrace_callback_set_t *reporter_cbs) {
1392        int i, err = 0;
1393        if (libtrace->state != STATE_PAUSED) {
1394                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
1395                        "trace(%s) is not currently paused",
1396                              libtrace->uridata);
1397                return -1;
1398        }
1399
1400        assert(libtrace_parallel);
1401        assert(!libtrace->perpkt_thread_states[THREAD_RUNNING]);
1402
1403        /* Reset first packets */
1404        pthread_spin_lock(&libtrace->first_packets.lock);
1405        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1406                assert(!!libtrace->perpkt_threads[i].recorded_first == !!libtrace->first_packets.packets[i].packet);
1407                if (libtrace->first_packets.packets[i].packet) {
1408                        trace_destroy_packet(libtrace->first_packets.packets[i].packet);
1409                        libtrace->first_packets.packets[i].packet = NULL;
1410                        libtrace->first_packets.packets[i].tv.tv_sec = 0;
1411                        libtrace->first_packets.packets[i].tv.tv_usec = 0;
1412                        libtrace->first_packets.count--;
1413                        libtrace->perpkt_threads[i].recorded_first = false;
1414                }
1415        }
1416        assert(libtrace->first_packets.count == 0);
1417        libtrace->first_packets.first = 0;
1418        pthread_spin_unlock(&libtrace->first_packets.lock);
1419
1420        /* Reset delay */
1421        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1422                libtrace->perpkt_threads[i].tracetime_offset_usec = 0;
1423        }
1424
1425        /* Reset statistics */
1426        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1427                libtrace->perpkt_threads[i].accepted_packets = 0;
1428                libtrace->perpkt_threads[i].filtered_packets = 0;
1429        }
1430        libtrace->accepted_packets = 0;
1431        libtrace->filtered_packets = 0;
1432
1433        /* Update functions if requested */
1434        if(global_blob)
1435                libtrace->global_blob = global_blob;
1436
1437        if (per_packet_cbs) {
1438                if (libtrace->perpkt_cbs)
1439                        trace_destroy_callback_set(libtrace->perpkt_cbs);
1440                libtrace->perpkt_cbs = trace_create_callback_set();
1441                memcpy(libtrace->perpkt_cbs, per_packet_cbs, 
1442                                sizeof(libtrace_callback_set_t));
1443        }
1444
1445        if (reporter_cbs) {
1446                if (libtrace->reporter_cbs)
1447                        trace_destroy_callback_set(libtrace->reporter_cbs);
1448
1449                libtrace->reporter_cbs = trace_create_callback_set();
1450                memcpy(libtrace->reporter_cbs, reporter_cbs, 
1451                                sizeof(libtrace_callback_set_t));
1452        }
1453
1454        if (trace_is_parallel(libtrace)) {
1455                err = libtrace->format->pstart_input(libtrace);
1456        } else {
1457                if (libtrace->format->start_input) {
1458                        err = libtrace->format->start_input(libtrace);
1459                }
1460        }
1461
1462        if (err == 0) {
1463                libtrace->started = true;
1464                libtrace_change_state(libtrace, STATE_RUNNING, false);
1465        }
1466        return err;
1467}
1468
1469/**
1470 * @return the number of CPU cores on the machine. -1 if unknown.
1471 */
1472SIMPLE_FUNCTION static int get_nb_cores() {
1473        int numCPU;
1474#ifdef _SC_NPROCESSORS_ONLN
1475        /* Most systems do this now */
1476        numCPU = sysconf(_SC_NPROCESSORS_ONLN);
1477
1478#else
1479        int mib[] = {CTL_HW, HW_AVAILCPU};
1480        size_t len = sizeof(numCPU);
1481
1482        /* get the number of CPUs from the system */
1483        sysctl(mib, 2, &numCPU, &len, NULL, 0);
1484#endif
1485        return numCPU <= 0 ? 1 : numCPU;
1486}
1487
1488/**
1489 * Verifies the configuration and sets default values for any values not
1490 * specified by the user.
1491 */
1492static void verify_configuration(libtrace_t *libtrace) {
1493
1494        if (libtrace->config.hasher_queue_size <= 0)
1495                libtrace->config.hasher_queue_size = 1000;
1496
1497        if (libtrace->config.perpkt_threads <= 0) {
1498                libtrace->perpkt_thread_count = get_nb_cores();
1499                if (libtrace->perpkt_thread_count <= 0)
1500                        // Lets just use one
1501                        libtrace->perpkt_thread_count = 1;
1502        } else {
1503                libtrace->perpkt_thread_count = libtrace->config.perpkt_threads;
1504        }
1505
1506        if (libtrace->config.reporter_thold <= 0)
1507                libtrace->config.reporter_thold = 100;
1508        if (libtrace->config.burst_size <= 0)
1509                libtrace->config.burst_size = 32;
1510        if (libtrace->config.thread_cache_size <= 0)
1511                libtrace->config.thread_cache_size = 64;
1512        if (libtrace->config.cache_size <= 0)
1513                libtrace->config.cache_size = (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count;
1514
1515        if (libtrace->config.cache_size <
1516                (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count)
1517                fprintf(stderr, "WARNING deadlocks may occur and extra memory allocating buffer sizes (packet_freelist_size) mismatched\n");
1518
1519        if (libtrace->combiner.initialise == NULL && libtrace->combiner.publish == NULL)
1520                libtrace->combiner = combiner_unordered;
1521
1522        /* Figure out if we are using a dedicated hasher thread? */
1523        if (libtrace->hasher && libtrace->perpkt_thread_count > 1) {
1524                libtrace->hasher_thread.type = THREAD_HASHER;
1525        }
1526}
1527
1528/**
1529 * Starts a libtrace_thread, including allocating memory for messaging.
1530 * Threads are expected to wait until the libtrace look is released.
1531 * Hence why we don't init structures until later.
1532 *
1533 * @param trace The trace the thread is associated with
1534 * @param t The thread that is filled when the thread is started
1535 * @param type The type of thread
1536 * @param start_routine The entry location of the thread
1537 * @param perpkt_num The perpkt thread number (should be set -1 if not perpkt)
1538 * @param name For debugging purposes set the threads name (Optional)
1539 *
1540 * @return 0 on success or -1 upon error in which case the libtrace error is set.
1541 *         In this situation the thread structure is zeroed.
1542 */
1543static int trace_start_thread(libtrace_t *trace,
1544                       libtrace_thread_t *t,
1545                       enum thread_types type,
1546                       void *(*start_routine) (void *),
1547                       int perpkt_num,
1548                       const char *name) {
1549#ifdef __linux__
1550        cpu_set_t cpus;
1551        int i;
1552#endif
1553        int ret;
1554        assert(t->type == THREAD_EMPTY);
1555        t->trace = trace;
1556        t->ret = NULL;
1557        t->user_data = NULL;
1558        t->type = type;
1559        t->state = THREAD_RUNNING;
1560
1561        assert(name);
1562
1563#ifdef __linux__
1564        CPU_ZERO(&cpus);
1565        for (i = 0; i < get_nb_cores(); i++)
1566                CPU_SET(i, &cpus);
1567
1568        ret = pthread_create(&t->tid, NULL, start_routine, (void *) trace);
1569        if( ret == 0 ) {
1570                ret = pthread_setaffinity_np(t->tid, sizeof(cpus), &cpus);
1571        }
1572
1573#else
1574        ret = pthread_create(&t->tid, NULL, start_routine, (void *) trace);
1575#endif
1576        if (ret != 0) {
1577                libtrace_zero_thread(t);
1578                trace_set_err(trace, ret, "Failed to create a thread of type=%d\n", type);
1579                return -1;
1580        }
1581        libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
1582        if (trace_has_dedicated_hasher(trace) && type == THREAD_PERPKT) {
1583                libtrace_ringbuffer_init(&t->rbuffer,
1584                                         trace->config.hasher_queue_size,
1585                                         trace->config.hasher_polling?
1586                                                 LIBTRACE_RINGBUFFER_POLLING:
1587                                                 LIBTRACE_RINGBUFFER_BLOCKING);
1588        }
1589#if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__)
1590        if(name)
1591                pthread_setname_np(t->tid, name);
1592#endif
1593        t->perpkt_num = perpkt_num;
1594        return 0;
1595}
1596
1597/** Parses the environment variable LIBTRACE_CONF into the supplied
1598 * configuration structure.
1599 *
1600 * @param[in,out] libtrace The trace from which we determine the URI and set
1601 * the configuration.
1602 *
1603 * We search for 3 environment variables and apply them to the config in the
1604 * following order. Such that the first has the lowest priority.
1605 *
1606 * 1. LIBTRACE_CONF, The global environment configuration
1607 * 2. LIBTRACE_CONF_<FORMAT>, Applied to a given format
1608 * 3. LIBTRACE_CONF_<FORMAT_URI>, Applied the specified trace
1609 *
1610 * E.g.
1611 * - int:eth0 would match LIBTRACE_CONF, LIBTRACE_CONF_INT, LIBTRACE_CONF_INT_ETH0
1612 * - dag:/dev/dag0,0 would match LIBTRACE_CONF, LIBTRACE_CONF_DAG, LIBTRACE_CONF_DAG__DEV_DAG0_0
1613 * - test.erf would match LIBTRACE_CONF, LIBTRACE_CONF_ERF, LIBTRACE_CONF_ERF_TEST_ERF
1614 *
1615 * @note All environment variables names MUST only contian
1616 * [A-Z], [0-9] and [_] (underscore) and not start with a number. Any characters
1617 * outside of this range should be captilised if possible or replaced with an
1618 * underscore.
1619 */
1620static void parse_env_config (libtrace_t *libtrace) {
1621        char env_name[1024] = "LIBTRACE_CONF_";
1622        size_t len = strlen(env_name);
1623        size_t mark = 0;
1624        size_t i;
1625        char * env;
1626
1627        /* Make our compound string */
1628        strncpy(&env_name[len], libtrace->format->name, sizeof(env_name) - len);
1629        len += strlen(libtrace->format->name);
1630        strncpy(&env_name[len], ":", sizeof(env_name) - len);
1631        len += 1;
1632        strncpy(&env_name[len], libtrace->uridata, sizeof(env_name) - len);
1633
1634        /* env names are allowed to be A-Z (CAPS) 0-9 and _ */
1635        for (i = 0; env_name[i] != 0; ++i) {
1636                env_name[i] = toupper(env_name[i]);
1637                if(env_name[i] == ':') {
1638                        mark = i;
1639                }
1640                if (!( (env_name[i] >= 'A' && env_name[i] <= 'Z') ||
1641                       (env_name[i] >= '0' && env_name[i] <= '9') )) {
1642                        env_name[i] = '_';
1643                }
1644        }
1645
1646        /* First apply global env settings LIBTRACE_CONF */
1647        env = getenv("LIBTRACE_CONF");
1648        if (env)
1649        {
1650                printf("Got env %s", env);
1651                trace_set_configuration(libtrace, env);
1652        }
1653
1654        /* Then format settings LIBTRACE_CONF_<FORMAT> */
1655        if (mark != 0) {
1656                env_name[mark] = 0;
1657                env = getenv(env_name);
1658                if (env) {
1659                        trace_set_configuration(libtrace, env);
1660                }
1661                env_name[mark] = '_';
1662        }
1663
1664        /* Finally this specific trace LIBTRACE_CONF_<FORMAT_URI> */
1665        env = getenv(env_name);
1666        if (env) {
1667                trace_set_configuration(libtrace, env);
1668        }
1669}
1670
1671DLLEXPORT bool trace_is_parallel(libtrace_t * libtrace) {
1672        if (libtrace->state == STATE_NEW)
1673                return trace_supports_parallel(libtrace);
1674        return libtrace->pread == trace_pread_packet_wrapper;
1675}
1676
1677DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob,
1678                           libtrace_callback_set_t *per_packet_cbs,
1679                           libtrace_callback_set_t *reporter_cbs) {
1680        int i;
1681        int ret = -1;
1682        char name[24];
1683        sigset_t sig_before, sig_block_all;
1684        assert(libtrace);
1685
1686        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1687        if (trace_is_err(libtrace)) {
1688                goto cleanup_none;
1689        }
1690
1691        if (libtrace->state == STATE_PAUSED) {
1692                ret = trace_prestart(libtrace, global_blob, per_packet_cbs, 
1693                                reporter_cbs);
1694                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1695                return ret;
1696        }
1697
1698        if (libtrace->state != STATE_NEW) {
1699                trace_set_err(libtrace, TRACE_ERR_BAD_STATE, "trace_pstart "
1700                              "should be called on a NEW or PAUSED trace but "
1701                              "instead was called from %s",
1702                              get_trace_state_name(libtrace->state));
1703                goto cleanup_none;
1704        }
1705
1706        /* Store the user defined things against the trace */
1707        libtrace->global_blob = global_blob;
1708
1709        /* Save a copy of the callbacks in case the user tries to change them
1710         * on us later */
1711        if (!per_packet_cbs) {
1712                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "trace_pstart "
1713                                "requires a non-NULL set of per packet "
1714                                "callbacks.");
1715                goto cleanup_none;
1716        }
1717
1718        if (per_packet_cbs->message_packet == NULL) {
1719                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "The per "
1720                                "packet callbacks must include a handler "
1721                                "for a packet. Please set this using "
1722                                "trace_set_packet_cb().");
1723                goto cleanup_none;
1724        }
1725
1726        libtrace->perpkt_cbs = trace_create_callback_set();
1727        memcpy(libtrace->perpkt_cbs, per_packet_cbs, sizeof(libtrace_callback_set_t));
1728       
1729        if (reporter_cbs) {
1730                libtrace->reporter_cbs = trace_create_callback_set();
1731                memcpy(libtrace->reporter_cbs, reporter_cbs, sizeof(libtrace_callback_set_t));
1732        }
1733
1734       
1735
1736
1737        /* And zero other fields */
1738        for (i = 0; i < THREAD_STATE_MAX; ++i) {
1739                libtrace->perpkt_thread_states[i] = 0;
1740        }
1741        libtrace->first_packets.first = 0;
1742        libtrace->first_packets.count = 0;
1743        libtrace->first_packets.packets = NULL;
1744        libtrace->perpkt_threads = NULL;
1745        /* Set a global which says we are using a parallel trace. This is
1746         * for backwards compatibility due to changes when destroying packets */
1747        libtrace_parallel = 1;
1748
1749        /* Parses configuration passed through environment variables */
1750        parse_env_config(libtrace);
1751        verify_configuration(libtrace);
1752
1753        ret = -1;
1754        /* Try start the format - we prefer parallel over single threaded, as
1755         * these formats should support messages better */
1756
1757        if (trace_supports_parallel(libtrace) &&
1758            !trace_has_dedicated_hasher(libtrace)) {
1759                ret = libtrace->format->pstart_input(libtrace);
1760                libtrace->pread = trace_pread_packet_wrapper;
1761        }
1762        if (ret != 0) {
1763                if (libtrace->format->start_input) {
1764                        ret = libtrace->format->start_input(libtrace);
1765                }
1766                if (libtrace->perpkt_thread_count > 1) {
1767                        libtrace->pread = trace_pread_packet_first_in_first_served;
1768                        /* Don't wait for a burst of packets if the format is
1769                         * live as this could block ring based formats and
1770                         * introduces delay. */
1771                        if (libtrace->format->info.live) {
1772                                libtrace->config.burst_size = 1;
1773                        }
1774                }
1775                else {
1776                        /* Use standard read_packet */
1777                        libtrace->pread = NULL;
1778                }
1779        }
1780
1781        if (ret != 0) {
1782                goto cleanup_none;
1783        }
1784
1785        /* --- Start all the threads we need --- */
1786        /* Disable signals because it is inherited by the threads we start */
1787        sigemptyset(&sig_block_all);
1788        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_block_all, &sig_before), == 0);
1789
1790        /* If we need a hasher thread start it
1791         * Special Case: If single threaded we don't need a hasher
1792         */
1793        if (trace_has_dedicated_hasher(libtrace)) {
1794                libtrace->hasher_thread.type = THREAD_EMPTY;
1795                ret = trace_start_thread(libtrace, &libtrace->hasher_thread,
1796                                   THREAD_HASHER, hasher_entry, -1,
1797                                   "hasher-thread");
1798                if (ret != 0)
1799                        goto cleanup_started;
1800                libtrace->pread = trace_pread_packet_hasher_thread;
1801        } else {
1802                libtrace->hasher_thread.type = THREAD_EMPTY;
1803        }
1804
1805        /* Start up our perpkt threads */
1806        libtrace->perpkt_threads = calloc(sizeof(libtrace_thread_t),
1807                                          libtrace->perpkt_thread_count);
1808        if (!libtrace->perpkt_threads) {
1809                trace_set_err(libtrace, errno, "trace_pstart "
1810                              "failed to allocate memory.");
1811                goto cleanup_threads;
1812        }
1813        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1814                snprintf(name, sizeof(name), "perpkt-%d", i);
1815                libtrace_zero_thread(&libtrace->perpkt_threads[i]);
1816                ret = trace_start_thread(libtrace, &libtrace->perpkt_threads[i],
1817                                   THREAD_PERPKT, perpkt_threads_entry, i,
1818                                   name);
1819                if (ret != 0)
1820                        goto cleanup_threads;
1821        }
1822
1823        /* Start the reporter thread */
1824        if (reporter_cbs) {
1825                if (libtrace->combiner.initialise)
1826                        libtrace->combiner.initialise(libtrace, &libtrace->combiner);
1827                ret = trace_start_thread(libtrace, &libtrace->reporter_thread,
1828                                   THREAD_REPORTER, reporter_entry, -1,
1829                                   "reporter_thread");
1830                if (ret != 0)
1831                        goto cleanup_threads;
1832        }
1833
1834        /* Start the keepalive thread */
1835        if (libtrace->config.tick_interval > 0) {
1836                ret = trace_start_thread(libtrace, &libtrace->keepalive_thread,
1837                                   THREAD_KEEPALIVE, keepalive_entry, -1,
1838                                   "keepalive_thread");
1839                if (ret != 0)
1840                        goto cleanup_threads;
1841        }
1842
1843        /* Init other data structures */
1844        libtrace->perpkt_thread_states[THREAD_RUNNING] = libtrace->perpkt_thread_count;
1845        ASSERT_RET(pthread_spin_init(&libtrace->first_packets.lock, 0), == 0);
1846        libtrace->first_packets.packets = calloc(libtrace->perpkt_thread_count,
1847                                                 sizeof(*libtrace->first_packets.packets));
1848        if (libtrace->first_packets.packets == NULL) {
1849                trace_set_err(libtrace, errno, "trace_pstart "
1850                              "failed to allocate memory.");
1851                goto cleanup_threads;
1852        }
1853
1854        if (libtrace_ocache_init(&libtrace->packet_freelist,
1855                             (void* (*)()) trace_create_packet,
1856                             (void (*)(void *))trace_destroy_packet,
1857                             libtrace->config.thread_cache_size,
1858                             libtrace->config.cache_size * 4,
1859                             libtrace->config.fixed_count) != 0) {
1860                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "trace_pstart "
1861                              "failed to allocate ocache.");
1862                goto cleanup_threads;
1863        }
1864
1865        /* Threads don't start */
1866        libtrace->started = true;
1867        libtrace_change_state(libtrace, STATE_RUNNING, false);
1868
1869        ret = 0;
1870        goto success;
1871cleanup_threads:
1872        if (libtrace->first_packets.packets) {
1873                free(libtrace->first_packets.packets);
1874                libtrace->first_packets.packets = NULL;
1875        }
1876        libtrace_change_state(libtrace, STATE_ERROR, false);
1877        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1878        if (libtrace->hasher_thread.type == THREAD_HASHER) {
1879                pthread_join(libtrace->hasher_thread.tid, NULL);
1880                libtrace_zero_thread(&libtrace->hasher_thread);
1881        }
1882
1883        if (libtrace->perpkt_threads) {
1884                for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1885                        if (libtrace->perpkt_threads[i].type == THREAD_PERPKT) {
1886                                pthread_join(libtrace->perpkt_threads[i].tid, NULL);
1887                                libtrace_zero_thread(&libtrace->perpkt_threads[i]);
1888                        } else break;
1889                }
1890                free(libtrace->perpkt_threads);
1891                libtrace->perpkt_threads = NULL;
1892        }
1893
1894        if (libtrace->reporter_thread.type == THREAD_REPORTER) {
1895                pthread_join(libtrace->reporter_thread.tid, NULL);
1896                libtrace_zero_thread(&libtrace->reporter_thread);
1897        }
1898
1899        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
1900                pthread_join(libtrace->keepalive_thread.tid, NULL);
1901                libtrace_zero_thread(&libtrace->keepalive_thread);
1902        }
1903        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1904        libtrace_change_state(libtrace, STATE_NEW, false);
1905        assert(libtrace->perpkt_thread_states[THREAD_RUNNING] == 0);
1906        libtrace->perpkt_thread_states[THREAD_FINISHED] = 0;
1907cleanup_started:
1908        if (libtrace->pread == trace_pread_packet_wrapper) {
1909                if (libtrace->format->ppause_input)
1910                        libtrace->format->ppause_input(libtrace);
1911        } else {
1912                if (libtrace->format->pause_input)
1913                        libtrace->format->pause_input(libtrace);
1914        }
1915        ret = -1;
1916success:
1917        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_before, NULL), == 0);
1918cleanup_none:
1919        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1920        return ret;
1921}
1922
1923DLLEXPORT int trace_set_starting_cb(libtrace_callback_set_t *cbset,
1924                fn_cb_starting handler) {
1925        cbset->message_starting = handler;
1926        return 0;
1927}
1928
1929DLLEXPORT int trace_set_pausing_cb(libtrace_callback_set_t *cbset,
1930                fn_cb_dataless handler) {
1931        cbset->message_pausing = handler;
1932        return 0;
1933}
1934
1935DLLEXPORT int trace_set_resuming_cb(libtrace_callback_set_t *cbset,
1936                fn_cb_dataless handler) {
1937        cbset->message_resuming = handler;
1938        return 0;
1939}
1940
1941DLLEXPORT int trace_set_stopping_cb(libtrace_callback_set_t *cbset,
1942                fn_cb_dataless handler) {
1943        cbset->message_stopping = handler;
1944        return 0;
1945}
1946
1947DLLEXPORT int trace_set_packet_cb(libtrace_callback_set_t *cbset,
1948                fn_cb_packet handler) {
1949        cbset->message_packet = handler;
1950        return 0;
1951}
1952
1953DLLEXPORT int trace_set_first_packet_cb(libtrace_callback_set_t *cbset,
1954                fn_cb_first_packet handler) {
1955        cbset->message_first_packet = handler;
1956        return 0;
1957}
1958
1959DLLEXPORT int trace_set_tick_count_cb(libtrace_callback_set_t *cbset,
1960                fn_cb_tick handler) {
1961        cbset->message_tick_count = handler;
1962        return 0;
1963}
1964
1965DLLEXPORT int trace_set_tick_interval_cb(libtrace_callback_set_t *cbset,
1966                fn_cb_tick handler) {
1967        cbset->message_tick_interval = handler;
1968        return 0;
1969}
1970
1971DLLEXPORT int trace_set_result_cb(libtrace_callback_set_t *cbset,
1972                fn_cb_result handler) {
1973        cbset->message_result = handler;
1974        return 0;
1975}
1976
1977DLLEXPORT int trace_set_user_message_cb(libtrace_callback_set_t *cbset,
1978                fn_cb_usermessage handler) {
1979        cbset->message_user = handler;
1980        return 0;
1981}
1982
1983/*
1984 * Pauses a trace, this should only be called by the main thread
1985 * 1. Set started = false
1986 * 2. All perpkt threads are paused waiting on a condition var
1987 * 3. Then call ppause on the underlying format if found
1988 * 4. The traces state is paused
1989 *
1990 * Once done you should be able to modify the trace setup and call pstart again
1991 * TODO add support to change the number of threads.
1992 */
1993DLLEXPORT int trace_ppause(libtrace_t *libtrace)
1994{
1995        libtrace_thread_t *t;
1996        int i;
1997        assert(libtrace);
1998
1999        t = get_thread_table(libtrace);
2000        // Check state from within the lock if we are going to change it
2001        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
2002
2003        /* If we are already paused, just treat this as a NOOP */
2004        if (libtrace->state == STATE_PAUSED) {
2005                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2006                return 0;
2007        }
2008        if (!libtrace->started || libtrace->state != STATE_RUNNING) {
2009                trace_set_err(libtrace,TRACE_ERR_BAD_STATE, "You must call trace_start() before calling trace_ppause()");
2010                return -1;
2011        }
2012
2013        libtrace_change_state(libtrace, STATE_PAUSING, false);
2014        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2015
2016        // Special case handle the hasher thread case
2017        if (trace_has_dedicated_hasher(libtrace)) {
2018                if (libtrace->config.debug_state)
2019                        fprintf(stderr, "Hasher thread is running, asking it to pause ...");
2020                libtrace_message_t message = {0, {.uint64=0}, NULL};
2021                message.code = MESSAGE_DO_PAUSE;
2022                trace_message_thread(libtrace, &libtrace->hasher_thread, &message);
2023                // Wait for it to pause
2024                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
2025                while (libtrace->hasher_thread.state == THREAD_RUNNING) {
2026                        ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
2027                }
2028                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2029                if (libtrace->config.debug_state)
2030                        fprintf(stderr, " DONE\n");
2031        }
2032
2033        if (libtrace->config.debug_state)
2034                fprintf(stderr, "Asking perpkt threads to pause ...");
2035        // Stop threads, skip this one if it's a perpkt
2036        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2037                if (&libtrace->perpkt_threads[i] != t) {
2038                        libtrace_message_t message = {0, {.uint64=0}, NULL};
2039                        message.code = MESSAGE_DO_PAUSE;
2040                        ASSERT_RET(trace_message_thread(libtrace, &libtrace->perpkt_threads[i], &message), != -1);
2041                        if(trace_has_dedicated_hasher(libtrace)) {
2042                                // The hasher has stopped and other threads have messages waiting therefore
2043                                // If the queues are empty the other threads would have no data
2044                                // So send some message packets to simply ask the threads to check
2045                                // We are the only writer since hasher has paused
2046                                libtrace_packet_t *pkt;
2047                                libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &pkt, 1, 1);
2048                                pkt->error = READ_MESSAGE;
2049                                libtrace_ringbuffer_write(&libtrace->perpkt_threads[i].rbuffer, pkt);
2050                        }
2051                } else {
2052                        fprintf(stderr, "Mapper threads should not be used to pause a trace this could cause any number of problems!!\n");
2053                }
2054        }
2055
2056        if (t) {
2057                // A perpkt is doing the pausing, interesting, fake an extra thread paused
2058                // We rely on the user to *not* return before starting the trace again
2059                thread_change_state(libtrace, t, THREAD_PAUSED, true);
2060        }
2061
2062        // Wait for all threads to pause
2063        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
2064        while(libtrace->perpkt_thread_states[THREAD_RUNNING]) {
2065                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
2066        }
2067        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2068
2069        if (libtrace->config.debug_state)
2070                fprintf(stderr, " DONE\n");
2071
2072        // Deal with the reporter
2073        if (trace_has_reporter(libtrace)) {
2074                if (libtrace->config.debug_state)
2075                        fprintf(stderr, "Reporter thread is running, asking it to pause ...");
2076                if (pthread_equal(pthread_self(), libtrace->reporter_thread.tid)) {
2077                        libtrace->combiner.pause(libtrace, &libtrace->combiner);
2078                        thread_change_state(libtrace, &libtrace->reporter_thread, THREAD_PAUSED, true);
2079               
2080                } else {
2081                        libtrace_message_t message = {0, {.uint64=0}, NULL};
2082                        message.code = MESSAGE_DO_PAUSE;
2083                        trace_message_thread(libtrace, &libtrace->reporter_thread, &message);
2084                        // Wait for it to pause
2085                        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
2086                        while (libtrace->reporter_thread.state == THREAD_RUNNING) {
2087                                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
2088                        }
2089                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2090                }
2091                if (libtrace->config.debug_state)
2092                        fprintf(stderr, " DONE\n");
2093        }
2094
2095        /* Cache values before we pause */
2096        if (libtrace->stats == NULL)
2097                libtrace->stats = trace_create_statistics();
2098        // Save the statistics against the trace
2099        trace_get_statistics(libtrace, NULL);
2100        if (trace_is_parallel(libtrace)) {
2101                libtrace->started = false;
2102                if (libtrace->format->ppause_input)
2103                        libtrace->format->ppause_input(libtrace);
2104                // TODO What happens if we don't have pause input??
2105        } else {
2106                int err;
2107                err = trace_pause(libtrace);
2108                // We should handle this a bit better
2109                if (err)
2110                        return err;
2111        }
2112
2113        // Only set as paused after the pause has been called on the trace
2114        libtrace_change_state(libtrace, STATE_PAUSED, true);
2115        return 0;
2116}
2117
2118/**
2119 * Stop trace finish prematurely as though it meet an EOF
2120 * This should only be called by the main thread
2121 * 1. Calls ppause
2122 * 2. Sends a message asking for threads to finish
2123 * 3. Releases threads which will pause
2124 */
2125DLLEXPORT int trace_pstop(libtrace_t *libtrace)
2126{
2127        int i, err;
2128        libtrace_message_t message = {0, {.uint64=0}, NULL};
2129        assert(libtrace);
2130
2131        // Ensure all threads have paused and the underlying trace format has
2132        // been closed and all packets associated are cleaned up
2133        // Pause will do any state checks for us
2134        err = trace_ppause(libtrace);
2135        if (err)
2136                return err;
2137
2138        // Now send a message asking the threads to stop
2139        // This will be retrieved before trying to read another packet
2140
2141        message.code = MESSAGE_DO_STOP;
2142        trace_message_perpkts(libtrace, &message);
2143        if (trace_has_dedicated_hasher(libtrace))
2144                trace_message_thread(libtrace, &libtrace->hasher_thread, &message);
2145
2146        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2147                trace_message_thread(libtrace, &libtrace->perpkt_threads[i], &message);
2148        }
2149
2150        /* Now release the threads and let them stop - when the threads finish
2151         * the state will be set to finished */
2152        libtrace_change_state(libtrace, STATE_FINISHING, true);
2153        return 0;
2154}
2155
2156DLLEXPORT int trace_set_hasher(libtrace_t *trace, enum hasher_types type, fn_hasher hasher, void *data) {
2157        int ret = -1;
2158        if ((type == HASHER_CUSTOM && !hasher) || (type == HASHER_BALANCE && hasher)) {
2159                return -1;
2160        }
2161
2162        // Save the requirements
2163        trace->hasher_type = type;
2164        if (hasher) {
2165                if (trace->hasher_owner == HASH_OWNED_LIBTRACE) {
2166                        if (trace->hasher_data) {
2167                                free(trace->hasher_data);
2168                        }
2169                }
2170                trace->hasher = hasher;
2171                trace->hasher_data = data;
2172                trace->hasher_owner = HASH_OWNED_EXTERNAL;
2173        } else {
2174                trace->hasher = NULL;
2175                trace->hasher_data = NULL;
2176                trace->hasher_owner = HASH_OWNED_LIBTRACE;
2177        }
2178
2179        // Try push this to hardware - NOTE hardware could do custom if
2180        // there is a more efficient way to apply it, in this case
2181        // it will simply grab the function out of libtrace_t
2182        if (trace_supports_parallel(trace) && trace->format->config_input)
2183                ret = trace->format->config_input(trace, TRACE_OPTION_HASHER, &type);
2184
2185        if (ret == -1) {
2186                /* We have to deal with this ourself */
2187                if (!hasher) {
2188                        switch (type)
2189                        {
2190                                case HASHER_CUSTOM:
2191                                case HASHER_BALANCE:
2192                                        return 0;
2193                                case HASHER_BIDIRECTIONAL:
2194                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
2195                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
2196                                        toeplitz_init_config(trace->hasher_data, 1);
2197                                        return 0;
2198                                case HASHER_UNIDIRECTIONAL:
2199                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
2200                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
2201                                        toeplitz_init_config(trace->hasher_data, 0);
2202                                        return 0;
2203                        }
2204                        return -1;
2205                }
2206        } else {
2207                /* If the hasher is hardware we zero out the hasher and hasher
2208                 * data fields - only if we need a hasher do we do this */
2209                trace->hasher = NULL;
2210                trace->hasher_data = NULL;
2211        }
2212
2213        return 0;
2214}
2215
2216// Waits for all threads to finish
2217DLLEXPORT void trace_join(libtrace_t *libtrace) {
2218        int i;
2219
2220        /* Firstly wait for the perpkt threads to finish, since these are
2221         * user controlled */
2222        for (i=0; i< libtrace->perpkt_thread_count; i++) {
2223                ASSERT_RET(pthread_join(libtrace->perpkt_threads[i].tid, NULL), == 0);
2224                // So we must do our best effort to empty the queue - so
2225                // the producer (or any other threads) don't block.
2226                libtrace_packet_t * packet;
2227                assert(libtrace->perpkt_threads[i].state == THREAD_FINISHED);
2228                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
2229                        if (packet) // This could be NULL iff the perpkt finishes early
2230                                trace_destroy_packet(packet);
2231        }
2232
2233        /* Now the hasher */
2234        if (trace_has_dedicated_hasher(libtrace)) {
2235                pthread_join(libtrace->hasher_thread.tid, NULL);
2236                assert(libtrace->hasher_thread.state == THREAD_FINISHED);
2237        }
2238
2239        // Now that everything is finished nothing can be touching our
2240        // buffers so clean them up
2241        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2242                // Its possible 1 packet got added by the reporter (or 1 per any other thread) since we cleaned up
2243                // if they lost timeslice before-during a write
2244                libtrace_packet_t * packet;
2245                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
2246                        trace_destroy_packet(packet);
2247                if (trace_has_dedicated_hasher(libtrace)) {
2248                        assert(libtrace_ringbuffer_is_empty(&libtrace->perpkt_threads[i].rbuffer));
2249                        libtrace_ringbuffer_destroy(&libtrace->perpkt_threads[i].rbuffer);
2250                }
2251                // Cannot destroy vector yet, this happens with trace_destroy
2252        }
2253
2254        if (trace_has_reporter(libtrace)) {
2255                pthread_join(libtrace->reporter_thread.tid, NULL);
2256                assert(libtrace->reporter_thread.state == THREAD_FINISHED);
2257        }
2258
2259        // Wait for the tick (keepalive) thread if it has been started
2260        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
2261                libtrace_message_t msg = {0, {.uint64=0}, NULL};
2262                msg.code = MESSAGE_DO_STOP;
2263                trace_message_thread(libtrace, &libtrace->keepalive_thread, &msg);
2264                pthread_join(libtrace->keepalive_thread.tid, NULL);
2265        }
2266
2267        libtrace_change_state(libtrace, STATE_JOINED, true);
2268        print_memory_stats();
2269}
2270
2271DLLEXPORT int libtrace_thread_get_message_count(libtrace_t * libtrace,
2272                                                libtrace_thread_t *t)
2273{
2274        int ret;
2275        if (t == NULL)
2276                t = get_thread_descriptor(libtrace);
2277        if (t == NULL)
2278                return -1;
2279        ret = libtrace_message_queue_count(&t->messages);
2280        return ret < 0 ? 0 : ret;
2281}
2282
2283DLLEXPORT int libtrace_thread_get_message(libtrace_t * libtrace,
2284                                          libtrace_thread_t *t,
2285                                          libtrace_message_t * message)
2286{
2287        int ret;
2288        if (t == NULL)
2289                t = get_thread_descriptor(libtrace);
2290        if (t == NULL)
2291                return -1;
2292        ret = libtrace_message_queue_get(&t->messages, message);
2293        return ret < 0 ? 0 : ret;
2294}
2295
2296DLLEXPORT int libtrace_thread_try_get_message(libtrace_t * libtrace,
2297                                              libtrace_thread_t *t,
2298                                              libtrace_message_t * message)
2299{
2300        if (t == NULL)
2301                t = get_thread_descriptor(libtrace);
2302        if (t == NULL)
2303                return -1;
2304        if (libtrace_message_queue_try_get(&t->messages, message) != LIBTRACE_MQ_FAILED)
2305                return 0;
2306        else
2307                return -1;
2308}
2309
2310DLLEXPORT int trace_message_thread(libtrace_t * libtrace, libtrace_thread_t *t, libtrace_message_t * message)
2311{
2312        int ret;
2313        if (!message->sender)
2314                message->sender = get_thread_descriptor(libtrace);
2315
2316        ret = libtrace_message_queue_put(&t->messages, message);
2317        return ret < 0 ? 0 : ret;
2318}
2319
2320DLLEXPORT int trace_message_reporter(libtrace_t * libtrace, libtrace_message_t * message)
2321{
2322        if (!trace_has_reporter(libtrace) ||
2323            !(libtrace->reporter_thread.state == THREAD_RUNNING
2324              || libtrace->reporter_thread.state == THREAD_PAUSED))
2325                return -1;
2326
2327        return trace_message_thread(libtrace, &libtrace->reporter_thread, message);
2328}
2329
2330DLLEXPORT int trace_post_reporter(libtrace_t *libtrace)
2331{
2332        libtrace_message_t message = {0, {.uint64=0}, NULL};
2333        message.code = MESSAGE_POST_REPORTER;
2334        return trace_message_reporter(libtrace, (void *) &message);
2335}
2336
2337DLLEXPORT int trace_message_perpkts(libtrace_t * libtrace, libtrace_message_t * message)
2338{
2339        int i;
2340        int missed = 0;
2341        if (message->sender == NULL)
2342                message->sender = get_thread_descriptor(libtrace);
2343        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2344                if (libtrace->perpkt_threads[i].state == THREAD_RUNNING ||
2345                    libtrace->perpkt_threads[i].state == THREAD_PAUSED) {
2346                        libtrace_message_queue_put(&libtrace->perpkt_threads[i].messages, message);
2347                } else {
2348                        missed += 1;
2349                }
2350        }
2351        return -missed;
2352}
2353
2354/**
2355 * Publishes a result to the reduce queue
2356 * Should only be called by a perpkt thread, i.e. from a perpkt handler
2357 */
2358DLLEXPORT void trace_publish_result(libtrace_t *libtrace, libtrace_thread_t *t, uint64_t key, libtrace_generic_t value, int type) {
2359        libtrace_result_t res;
2360        res.type = type;
2361        res.key = key;
2362        res.value = value;
2363        assert(libtrace->combiner.publish);
2364        libtrace->combiner.publish(libtrace, t->perpkt_num, &libtrace->combiner, &res);
2365        return;
2366}
2367
2368DLLEXPORT void trace_set_combiner(libtrace_t *trace, const libtrace_combine_t *combiner, libtrace_generic_t config){
2369        if (combiner) {
2370                trace->combiner = *combiner;
2371                trace->combiner.configuration = config;
2372        } else {
2373                // No combiner, so don't try use it
2374                memset(&trace->combiner, 0, sizeof(trace->combiner));
2375        }
2376}
2377
2378DLLEXPORT uint64_t trace_packet_get_order(libtrace_packet_t * packet) {
2379        return packet->order;
2380}
2381
2382DLLEXPORT uint64_t trace_packet_get_hash(libtrace_packet_t * packet) {
2383        return packet->hash;
2384}
2385
2386DLLEXPORT void trace_packet_set_order(libtrace_packet_t * packet, uint64_t order) {
2387        packet->order = order;
2388}
2389
2390DLLEXPORT void trace_packet_set_hash(libtrace_packet_t * packet, uint64_t hash) {
2391        packet->hash = hash;
2392}
2393
2394DLLEXPORT bool trace_has_finished(libtrace_t * libtrace) {
2395        return libtrace->state == STATE_FINISHED || libtrace->state == STATE_JOINED;
2396}
2397
2398/**
2399 * @return True if the trace is not running such that it can be configured
2400 */
2401static inline bool trace_is_configurable(libtrace_t *trace) {
2402        return trace->state == STATE_NEW ||
2403                        trace->state == STATE_PAUSED;
2404}
2405
2406DLLEXPORT int trace_set_perpkt_threads(libtrace_t *trace, int nb) {
2407        // Only supported on new traces not paused traces
2408        if (trace->state != STATE_NEW) return -1;
2409
2410        /* TODO consider allowing an offset from the total number of cores i.e.
2411         * -1 reserve 1 core */
2412        if (nb >= 0) {
2413                trace->config.perpkt_threads = nb;
2414                return 0;
2415        } else {
2416                return -1;
2417        }
2418}
2419
2420DLLEXPORT int trace_set_tick_interval(libtrace_t *trace, size_t millisec) {
2421        if (!trace_is_configurable(trace)) return -1;
2422
2423        trace->config.tick_interval = millisec;
2424        return 0;
2425}
2426
2427DLLEXPORT int trace_set_tick_count(libtrace_t *trace, size_t count) {
2428        if (!trace_is_configurable(trace)) return -1;
2429
2430        trace->config.tick_count = count;
2431        return 0;
2432}
2433
2434DLLEXPORT int trace_set_tracetime(libtrace_t *trace, bool tracetime) {
2435        if (!trace_is_configurable(trace)) return -1;
2436
2437        trace->tracetime = tracetime;
2438        return 0;
2439}
2440
2441DLLEXPORT int trace_set_cache_size(libtrace_t *trace, size_t size) {
2442        if (!trace_is_configurable(trace)) return -1;
2443
2444        trace->config.cache_size = size;
2445        return 0;
2446}
2447
2448DLLEXPORT int trace_set_thread_cache_size(libtrace_t *trace, size_t size) {
2449        if (!trace_is_configurable(trace)) return -1;
2450
2451        trace->config.thread_cache_size = size;
2452        return 0;
2453}
2454
2455DLLEXPORT int trace_set_fixed_count(libtrace_t *trace, bool fixed) {
2456        if (!trace_is_configurable(trace)) return -1;
2457
2458        trace->config.fixed_count = fixed;
2459        return 0;
2460}
2461
2462DLLEXPORT int trace_set_burst_size(libtrace_t *trace, size_t size) {
2463        if (!trace_is_configurable(trace)) return -1;
2464
2465        trace->config.burst_size = size;
2466        return 0;
2467}
2468
2469DLLEXPORT int trace_set_hasher_queue_size(libtrace_t *trace, size_t size) {
2470        if (!trace_is_configurable(trace)) return -1;
2471
2472        trace->config.hasher_queue_size = size;
2473        return 0;
2474}
2475
2476DLLEXPORT int trace_set_hasher_polling(libtrace_t *trace, bool polling) {
2477        if (!trace_is_configurable(trace)) return -1;
2478
2479        trace->config.hasher_polling = polling;
2480        return 0;
2481}
2482
2483DLLEXPORT int trace_set_reporter_polling(libtrace_t *trace, bool polling) {
2484        if (!trace_is_configurable(trace)) return -1;
2485
2486        trace->config.reporter_polling = polling;
2487        return 0;
2488}
2489
2490DLLEXPORT int trace_set_reporter_thold(libtrace_t *trace, size_t thold) {
2491        if (!trace_is_configurable(trace)) return -1;
2492
2493        trace->config.reporter_thold = thold;
2494        return 0;
2495}
2496
2497DLLEXPORT int trace_set_debug_state(libtrace_t *trace, bool debug_state) {
2498        if (!trace_is_configurable(trace)) return -1;
2499
2500        trace->config.debug_state = debug_state;
2501        return 0;
2502}
2503
2504static bool config_bool_parse(char *value, size_t nvalue) {
2505        if (strncmp(value, "true", nvalue) == 0)
2506                return true;
2507        else if (strncmp(value, "false", nvalue) == 0)
2508                return false;
2509        else
2510                return strtoll(value, NULL, 10) != 0;
2511}
2512
2513/* Note update documentation on trace_set_configuration */
2514static void config_string(struct user_configuration *uc, char *key, size_t nkey, char *value, size_t nvalue) {
2515        assert(key);
2516        assert(value);
2517        assert(uc);
2518        if (strncmp(key, "cache_size", nkey) == 0
2519            || strncmp(key, "cs", nkey) == 0) {
2520                uc->cache_size = strtoll(value, NULL, 10);
2521        } else if (strncmp(key, "thread_cache_size", nkey) == 0
2522                   || strncmp(key, "tcs", nkey) == 0) {
2523                uc->thread_cache_size = strtoll(value, NULL, 10);
2524        } else if (strncmp(key, "fixed_count", nkey) == 0
2525                   || strncmp(key, "fc", nkey) == 0) {
2526                uc->fixed_count = config_bool_parse(value, nvalue);
2527        } else if (strncmp(key, "burst_size", nkey) == 0
2528                   || strncmp(key, "bs", nkey) == 0) {
2529                uc->burst_size = strtoll(value, NULL, 10);
2530        } else if (strncmp(key, "tick_interval", nkey) == 0
2531                   || strncmp(key, "ti", nkey) == 0) {
2532                uc->tick_interval = strtoll(value, NULL, 10);
2533        } else if (strncmp(key, "tick_count", nkey) == 0
2534                   || strncmp(key, "tc", nkey) == 0) {
2535                uc->tick_count = strtoll(value, NULL, 10);
2536        } else if (strncmp(key, "perpkt_threads", nkey) == 0
2537                   || strncmp(key, "pt", nkey) == 0) {
2538                uc->perpkt_threads = strtoll(value, NULL, 10);
2539        } else if (strncmp(key, "hasher_queue_size", nkey) == 0
2540                   || strncmp(key, "hqs", nkey) == 0) {
2541                uc->hasher_queue_size = strtoll(value, NULL, 10);
2542        } else if (strncmp(key, "hasher_polling", nkey) == 0
2543                   || strncmp(key, "hp", nkey) == 0) {
2544                uc->hasher_polling = config_bool_parse(value, nvalue);
2545        } else if (strncmp(key, "reporter_polling", nkey) == 0
2546                   || strncmp(key, "rp", nkey) == 0) {
2547                uc->reporter_polling = config_bool_parse(value, nvalue);
2548        } else if (strncmp(key, "reporter_thold", nkey) == 0
2549                   || strncmp(key, "rt", nkey) == 0) {
2550                uc->reporter_thold = strtoll(value, NULL, 10);
2551        } else if (strncmp(key, "debug_state", nkey) == 0
2552                   || strncmp(key, "ds", nkey) == 0) {
2553                uc->debug_state = config_bool_parse(value, nvalue);
2554        } else {
2555                fprintf(stderr, "No matching option %s(=%s), ignoring\n", key, value);
2556        }
2557}
2558
2559DLLEXPORT int trace_set_configuration(libtrace_t *trace, const char *str) {
2560        char *pch;
2561        char key[100];
2562        char value[100];
2563        char *dup;
2564        assert(str);
2565        assert(trace);
2566
2567        if (!trace_is_configurable(trace)) return -1;
2568
2569        dup = strdup(str);
2570        pch = strtok (dup," ,.-");
2571        while (pch != NULL)
2572        {
2573                if (sscanf(pch, "%99[^=]=%99s", key, value) == 2) {
2574                        config_string(&trace->config, key, sizeof(key), value, sizeof(value));
2575                } else {
2576                        fprintf(stderr, "Error parsing option %s\n", pch);
2577                }
2578                pch = strtok (NULL," ,.-");
2579        }
2580        free(dup);
2581
2582        return 0;
2583}
2584
2585DLLEXPORT int trace_set_configuration_file(libtrace_t *trace, FILE *file) {
2586        char line[1024];
2587        if (!trace_is_configurable(trace)) return -1;
2588
2589        while (fgets(line, sizeof(line), file) != NULL)
2590        {
2591                trace_set_configuration(trace, line);
2592        }
2593
2594        if(ferror(file))
2595                return -1;
2596        else
2597                return 0;
2598}
2599
2600DLLEXPORT void trace_free_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
2601        assert(packet);
2602        /* Always release any resources this might be holding */
2603        trace_fin_packet(packet);
2604        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packet, 1, 1);
2605}
2606
2607DLLEXPORT void trace_increment_packet_refcount(libtrace_packet_t *packet) {
2608        pthread_mutex_lock(&(packet->ref_lock));
2609        if (packet->refcount < 0) {
2610                packet->refcount = 1;
2611        } else {
2612                packet->refcount ++;
2613        }
2614        pthread_mutex_unlock(&(packet->ref_lock));
2615}
2616
2617DLLEXPORT void trace_decrement_packet_refcount(libtrace_packet_t *packet) {
2618        pthread_mutex_lock(&(packet->ref_lock));
2619        packet->refcount --;
2620
2621        if (packet->refcount <= 0) {
2622                trace_free_packet(packet->trace, packet);
2623        }
2624        pthread_mutex_unlock(&(packet->ref_lock));
2625}
2626
2627
2628DLLEXPORT libtrace_info_t *trace_get_information(libtrace_t * libtrace) {
2629        if (libtrace->format)
2630                return &libtrace->format->info;
2631        else
2632                return NULL;
2633}
Note: See TracBrowser for help on using the repository browser.