source: lib/trace_parallel.c @ b6ff245

develop
Last change on this file since b6ff245 was b6ff245, checked in by Jacob Van Walraven <jcv9@…>, 2 years ago

cleanup

  • Property mode set to 100644
File size: 91.4 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27#define _GNU_SOURCE
28#include "common.h"
29#include "config.h"
30#include <assert.h>
31#include <errno.h>
32#include <fcntl.h>
33#include <stdio.h>
34#include <stdlib.h>
35#include <string.h>
36#include <sys/stat.h>
37#include <sys/types.h>
38#ifndef WIN32
39#include <sys/socket.h>
40#endif
41#include <stdarg.h>
42#include <sys/param.h>
43
44#ifdef HAVE_LIMITS_H
45#  include <limits.h>
46#endif
47
48#ifdef HAVE_SYS_LIMITS_H
49#  include <sys/limits.h>
50#endif
51
52#ifdef HAVE_NET_IF_ARP_H
53#  include <net/if_arp.h>
54#endif
55
56#ifdef HAVE_NET_IF_H
57#  include <net/if.h>
58#endif
59
60#ifdef HAVE_NETINET_IN_H
61#  include <netinet/in.h>
62#endif
63
64#ifdef HAVE_NET_ETHERNET_H
65#  include <net/ethernet.h>
66#endif
67
68#ifdef HAVE_NETINET_IF_ETHER_H
69#  include <netinet/if_ether.h>
70#endif
71
72#include <time.h>
73#ifdef WIN32
74#include <sys/timeb.h>
75#endif
76
77#include "libtrace.h"
78#include "libtrace_parallel.h"
79
80#ifdef HAVE_NET_BPF_H
81#  include <net/bpf.h>
82#else
83#  ifdef HAVE_PCAP_BPF_H
84#    include <pcap-bpf.h>
85#  endif
86#endif
87
88
89#include "libtrace_int.h"
90#include "format_helper.h"
91#include "rt_protocol.h"
92#include "hash_toeplitz.h"
93
94#include <pthread.h>
95#include <signal.h>
96#include <unistd.h>
97#include <ctype.h>
98
99static inline int delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t);
100extern int libtrace_parallel;
101
102struct mem_stats {
103        struct memfail {
104           uint64_t cache_hit;
105           uint64_t ring_hit;
106           uint64_t miss;
107           uint64_t recycled;
108        } readbulk, read, write, writebulk;
109};
110
111
112#ifdef ENABLE_MEM_STATS
113// Grrr gcc wants this spelt out
114__thread struct mem_stats mem_hits = {{0},{0},{0},{0}};
115
116
117static void print_memory_stats() {
118        uint64_t total;
119#if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__)
120        char t_name[50];
121        pthread_getname_np(pthread_self(), t_name, sizeof(t_name));
122
123        fprintf(stderr, "Thread ID#%d - %s\n", (int) pthread_self(), t_name);
124#else
125        fprintf(stderr, "Thread ID#%d\n", (int) pthread_self());
126#endif
127
128        total = mem_hits.read.cache_hit + mem_hits.read.ring_hit + mem_hits.read.miss;
129        if (total) {
130                fprintf(stderr, "\tRead:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
131                                mem_hits.read.cache_hit, mem_hits.read.ring_hit, mem_hits.read.miss, mem_hits.read.recycled);
132                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
133                                total, (double) mem_hits.read.miss / (double) total * 100.0);
134        }
135
136        total = mem_hits.readbulk.cache_hit + mem_hits.readbulk.ring_hit + mem_hits.readbulk.miss;
137        if (total) {
138                fprintf(stderr, "\tReadbulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
139                                mem_hits.readbulk.cache_hit, mem_hits.readbulk.ring_hit, mem_hits.readbulk.miss, mem_hits.readbulk.recycled);
140
141
142                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
143                                total, (double) mem_hits.readbulk.miss / (double) total * 100.0);
144        }
145
146        total = mem_hits.write.cache_hit + mem_hits.write.ring_hit + mem_hits.write.miss;
147        if (total) {
148                fprintf(stderr, "\tWrite:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
149                                mem_hits.write.cache_hit, mem_hits.write.ring_hit, mem_hits.write.miss, mem_hits.write.recycled);
150
151                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
152                                total, (double) mem_hits.write.miss / (double) total * 100.0);
153        }
154
155        total = mem_hits.writebulk.cache_hit + mem_hits.writebulk.ring_hit + mem_hits.writebulk.miss;
156        if (total) {
157                fprintf(stderr, "\tWritebulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
158                                mem_hits.writebulk.cache_hit, mem_hits.writebulk.ring_hit, mem_hits.writebulk.miss, mem_hits.writebulk.recycled);
159
160                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
161                                total, (double) mem_hits.writebulk.miss / (double) total * 100.0);
162        }
163}
164#else
165static void print_memory_stats() {}
166#endif
167
168static const libtrace_generic_t gen_zero = {0};
169
170/* This should optimise away the switch to nothing in the explict cases */
171inline void send_message(libtrace_t *trace, libtrace_thread_t *thread,
172                const enum libtrace_messages type,
173                libtrace_generic_t data, libtrace_thread_t *sender) {
174
175        fn_cb_dataless fn = NULL;
176        enum libtrace_messages switchtype;
177        libtrace_callback_set_t *cbs = NULL;
178
179        if (thread == &trace->reporter_thread) {
180                cbs = trace->reporter_cbs;
181        } else {
182                cbs = trace->perpkt_cbs;
183        }
184
185        if (cbs == NULL)
186                return;
187
188        if (type >= MESSAGE_USER)
189                switchtype = MESSAGE_USER;
190        else
191                switchtype = (enum libtrace_messages) type;
192
193        switch (switchtype) {
194        case MESSAGE_STARTING:
195                if (cbs->message_starting)
196                        thread->user_data = (*cbs->message_starting)(trace,
197                                        thread, trace->global_blob);
198                return;
199        case MESSAGE_FIRST_PACKET:
200                if (cbs->message_first_packet)
201                                (*cbs->message_first_packet)(trace, thread,
202                                trace->global_blob, thread->user_data,
203                                sender);
204                return;
205        case MESSAGE_TICK_COUNT:
206                if (cbs->message_tick_count)
207                        (*cbs->message_tick_count)(trace, thread,
208                                        trace->global_blob, thread->user_data,
209                                        data.uint64);
210                return;
211        case MESSAGE_TICK_INTERVAL:
212                if (cbs->message_tick_interval)
213                        (*cbs->message_tick_interval)(trace, thread,
214                                        trace->global_blob, thread->user_data,
215                                        data.uint64);
216                return;
217        case MESSAGE_STOPPING:
218                fn = cbs->message_stopping;
219                break;
220        case MESSAGE_RESUMING:
221                fn = cbs->message_resuming;
222                break;
223        case MESSAGE_PAUSING:
224                fn = cbs->message_pausing;
225                break;
226        case MESSAGE_USER:
227                if (cbs->message_user)
228                        (*cbs->message_user)(trace, thread, trace->global_blob,
229                                        thread->user_data, type, data, sender);
230                return;
231        case MESSAGE_RESULT:
232                if (cbs->message_result)
233                        (*cbs->message_result)(trace, thread,
234                                        trace->global_blob, thread->user_data,
235                                        data.res);
236                return;
237
238        /* These should be unused */
239        case MESSAGE_DO_PAUSE:
240        case MESSAGE_DO_STOP:
241        case MESSAGE_POST_REPORTER:
242        case MESSAGE_PACKET:
243                return;
244        }
245
246        if (fn)
247                (*fn)(trace, thread, trace->global_blob, thread->user_data);
248}
249
250DLLEXPORT void trace_destroy_callback_set(libtrace_callback_set_t *cbset) {
251        free(cbset);
252}
253
254DLLEXPORT libtrace_callback_set_t *trace_create_callback_set() {
255        libtrace_callback_set_t *cbset;
256
257        cbset = (libtrace_callback_set_t *)malloc(sizeof(libtrace_callback_set_t));
258        memset(cbset, 0, sizeof(libtrace_callback_set_t));
259        return cbset;
260}
261
262/*
263 * This can be used once the hasher thread has been started and internally after
264 * verify_configuration.
265 */
266DLLEXPORT bool trace_has_dedicated_hasher(libtrace_t * libtrace)
267{
268        return libtrace->hasher_thread.type == THREAD_HASHER;
269}
270
271DLLEXPORT bool trace_has_reporter(libtrace_t * libtrace)
272{
273        /*assert(libtrace->state != STATE_NEW);*/
274        if (!(libtrace->state != STATE_NEW)) {
275                trace_set_err(libtrace, TRACE_ERR_BAD_STATE, "Cannot check reporter for the current state in trace_has_reporter()");
276                return false;
277        }
278        return libtrace->reporter_thread.type == THREAD_REPORTER && libtrace->reporter_cbs;
279}
280
281/**
282 * When running the number of perpkt threads in use.
283 * TODO what if the trace is not running yet, or has finished??
284 *
285 * @brief libtrace_perpkt_thread_nb
286 * @param t The trace
287 * @return
288 */
289DLLEXPORT int trace_get_perpkt_threads(libtrace_t * t) {
290        return t->perpkt_thread_count;
291}
292
293DLLEXPORT int trace_get_perpkt_thread_id(libtrace_thread_t *thread) {
294        return thread->perpkt_num;
295}
296
297/**
298 * Changes the overall traces state and signals the condition.
299 *
300 * @param trace A pointer to the trace
301 * @param new_state The new state of the trace
302 * @param need_lock Set to true if libtrace_lock is not held, otherwise
303 *        false in the case the lock is currently held by this thread.
304 */
305static inline void libtrace_change_state(libtrace_t *trace,
306        const enum trace_state new_state, const bool need_lock)
307{
308        UNUSED enum trace_state prev_state;
309        if (need_lock)
310                pthread_mutex_lock(&trace->libtrace_lock);
311        prev_state = trace->state;
312        trace->state = new_state;
313
314        if (trace->config.debug_state)
315                fprintf(stderr, "Trace(%s) state changed from %s to %s\n",
316                        trace->uridata, get_trace_state_name(prev_state),
317                        get_trace_state_name(trace->state));
318
319        pthread_cond_broadcast(&trace->perpkt_cond);
320        if (need_lock)
321                pthread_mutex_unlock(&trace->libtrace_lock);
322}
323
324/**
325 * Changes a thread's state and broadcasts the condition variable. This
326 * should always be done when the lock is held.
327 *
328 * Additionally for perpkt threads the state counts are updated.
329 *
330 * @param trace A pointer to the trace
331 * @param t A pointer to the thread to modify
332 * @param new_state The new state of the thread
333 * @param need_lock Set to true if libtrace_lock is not held, otherwise
334 *        false in the case the lock is currently held by this thread.
335 */
336static inline void thread_change_state(libtrace_t *trace, libtrace_thread_t *t,
337        const enum thread_states new_state, const bool need_lock)
338{
339        enum thread_states prev_state;
340        if (need_lock)
341                pthread_mutex_lock(&trace->libtrace_lock);
342        prev_state = t->state;
343        t->state = new_state;
344        if (t->type == THREAD_PERPKT) {
345                --trace->perpkt_thread_states[prev_state];
346                ++trace->perpkt_thread_states[new_state];
347        }
348
349        if (trace->config.debug_state)
350                fprintf(stderr, "Thread %d state changed from %d to %d\n",
351                        (int) t->tid, prev_state, t->state);
352
353        if (trace->perpkt_thread_states[THREAD_FINISHED] == trace->perpkt_thread_count) {
354                /* Make sure we save our final stats in case someone wants
355                 * them at the end of their program.
356                 */
357
358                trace_get_statistics(trace, NULL);
359                libtrace_change_state(trace, STATE_FINISHED, false);
360        }
361
362        pthread_cond_broadcast(&trace->perpkt_cond);
363        if (need_lock)
364                pthread_mutex_unlock(&trace->libtrace_lock);
365}
366
367/**
368 * This is valid once a trace is initialised
369 *
370 * @return True if the format supports parallel threads.
371 */
372static inline bool trace_supports_parallel(libtrace_t *trace)
373{
374        /*assert(trace);*/
375        if (!trace) {
376                fprintf(stderr, "NULL trace passed into trace_supports_parallel()\n");
377                return false;
378        }
379        /*assert(trace->format);*/
380        if (!trace->format) {
381                trace_set_err(trace, TRACE_ERR_BAD_FORMAT,
382                        "NULL capture format associated with trace in trace_supports_parallel()");
383                return false;
384        }
385        if (trace->format->pstart_input)
386                return true;
387        else
388                return false;
389}
390
391void libtrace_zero_thread(libtrace_thread_t * t) {
392        t->accepted_packets = 0;
393        t->filtered_packets = 0;
394        t->recorded_first = false;
395        t->tracetime_offset_usec = 0;
396        t->user_data = 0;
397        t->format_data = 0;
398        libtrace_zero_ringbuffer(&t->rbuffer);
399        t->trace = NULL;
400        t->ret = NULL;
401        t->type = THREAD_EMPTY;
402        t->perpkt_num = -1;
403}
404
405// Ints are aligned int is atomic so safe to read and write at same time
406// However write must be locked, read doesn't (We never try read before written to table)
407libtrace_thread_t * get_thread_table(libtrace_t *libtrace) {
408        int i = 0;
409        pthread_t tid = pthread_self();
410
411        for (;i<libtrace->perpkt_thread_count ;++i) {
412                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
413                        return &libtrace->perpkt_threads[i];
414        }
415        return NULL;
416}
417
418static libtrace_thread_t * get_thread_descriptor(libtrace_t *libtrace) {
419        libtrace_thread_t *ret;
420        if (!(ret = get_thread_table(libtrace))) {
421                pthread_t tid = pthread_self();
422                // Check if we are reporter or something else
423                if (libtrace->hasher_thread.type == THREAD_REPORTER &&
424                                pthread_equal(tid, libtrace->reporter_thread.tid))
425                        ret = &libtrace->reporter_thread;
426                else if (libtrace->hasher_thread.type == THREAD_HASHER &&
427                         pthread_equal(tid, libtrace->hasher_thread.tid))
428                        ret = &libtrace->hasher_thread;
429                else
430                        ret = NULL;
431        }
432        return ret;
433}
434
435DLLEXPORT void libtrace_make_packet_safe(libtrace_packet_t *pkt) {
436        // Duplicate the packet in standard malloc'd memory and free the
437        // original, This is a 1:1 exchange so the ocache count remains unchanged.
438        if (pkt->buf_control != TRACE_CTRL_PACKET) {
439                libtrace_packet_t *dup;
440                dup = trace_copy_packet(pkt);
441                /* Release the external buffer */
442                trace_fin_packet(pkt);
443                /* Copy the duplicated packet over the existing */
444                memcpy(pkt, dup, sizeof(libtrace_packet_t));
445                /* Free the packet structure */
446                free(dup);
447        }
448}
449
450/**
451 * Makes a libtrace_result_t safe, used when pausing a trace.
452 * This will call libtrace_make_packet_safe if the result is
453 * a packet.
454 */
455DLLEXPORT void libtrace_make_result_safe(libtrace_result_t *res) {
456        if (res->type == RESULT_PACKET) {
457                libtrace_make_packet_safe(res->value.pkt);
458        }
459}
460
461/**
462 * Holds threads in a paused state, until released by broadcasting
463 * the condition mutex.
464 */
465static void trace_thread_pause(libtrace_t *trace, libtrace_thread_t *t) {
466        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
467        thread_change_state(trace, t, THREAD_PAUSED, false);
468        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
469                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
470        }
471        thread_change_state(trace, t, THREAD_RUNNING, false);
472        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
473}
474
475/**
476 * Sends a packet to the user, expects either a valid packet or a TICK packet.
477 *
478 * @param trace The trace
479 * @param t The current thread
480 * @param packet A pointer to the packet storage, which may be set to null upon
481 *               return, or a packet to be finished.
482 * @param tracetime If true packets are delayed to match with tracetime
483 * @return 0 is successful, otherwise if playing back in tracetime
484 *         READ_MESSAGE(-2) can be returned in which case the packet is not sent.
485 *
486 * @note READ_MESSAGE will only be returned if tracetime is true.
487 */
488static inline int dispatch_packet(libtrace_t *trace,
489                                  libtrace_thread_t *t,
490                                  libtrace_packet_t **packet,
491                                  bool tracetime) {
492
493        if ((*packet)->error > 0) {
494                if (tracetime) {
495                        if (delay_tracetime(trace, packet[0], t) == READ_MESSAGE)
496                                return READ_MESSAGE;
497                }
498                if (!IS_LIBTRACE_META_PACKET((*packet))) {
499                        t->accepted_packets++;
500                }
501                if (trace->perpkt_cbs->message_packet)
502                        *packet = (*trace->perpkt_cbs->message_packet)(trace, t, trace->global_blob, t->user_data, *packet);
503                trace_fin_packet(*packet);
504        } else {
505                /*assert((*packet)->error == READ_TICK);*/
506                if (!((*packet)->error == READ_TICK)) {
507                        trace_set_err(trace, TRACE_ERR_BAD_STATE,
508                                "Packet is not in READ_TICK state in dispath_packet()");
509                        return -1;
510                }
511                libtrace_generic_t data = {.uint64 = trace_packet_get_order(*packet)};
512                send_message(trace, t, MESSAGE_TICK_COUNT, data, t);
513        }
514        return 0;
515}
516
517/**
518 * Sends a batch of packets to the user, expects either a valid packet or a
519 * TICK packet.
520 *
521 * @param trace The trace
522 * @param t The current thread
523 * @param packets [in,out] An array of packets, these may be null upon return
524 * @param nb_packets The total number of packets in the list
525 * @param empty [in,out] A pointer to an integer storing the first empty slot,
526 * upon return this is updated
527 * @param offset [in,out] The offset into the array, upon return this is updated
528 * @param tracetime If true packets are delayed to match with tracetime
529 * @return 0 is successful, otherwise if playing back in tracetime
530 *         READ_MESSAGE(-2) can be returned in which case the packet is not sent.
531 *
532 * @note READ_MESSAGE will only be returned if tracetime is true.
533 */
534static inline int dispatch_packets(libtrace_t *trace,
535                                  libtrace_thread_t *t,
536                                  libtrace_packet_t *packets[],
537                                  int nb_packets, int *empty, int *offset,
538                                  bool tracetime) {
539        for (;*offset < nb_packets; ++*offset) {
540                int ret;
541                ret = dispatch_packet(trace, t, &packets[*offset], tracetime);
542                if (ret == 0) {
543                        /* Move full slots to front as we go */
544                        if (packets[*offset]) {
545                                if (*empty != *offset) {
546                                        packets[*empty] = packets[*offset];
547                                        packets[*offset] = NULL;
548                                }
549                                ++*empty;
550                        }
551                } else {
552                        /* Break early */
553                        /*assert(ret == READ_MESSAGE);*/
554                        if (!(ret == READ_MESSAGE)) {
555                                trace_set_err(trace, TRACE_ERR_UNKNOWN_OPTION,
556                                        "Expected READ_MESSAGE in dispatch_packets()");
557                                return -1;
558                        }
559                        return READ_MESSAGE;
560                }
561        }
562
563        return 0;
564}
565
566/**
567 * Pauses a per packet thread, messages will not be processed when the thread
568 * is paused.
569 *
570 * This process involves reading packets if a hasher thread is used. As such
571 * this function can fail to pause due to errors when reading in which case
572 * the thread should be stopped instead.
573 *
574 *
575 * @brief trace_perpkt_thread_pause
576 * @return READ_ERROR(-1) or READ_EOF(0) or 1 if successfull
577 */
578static int trace_perpkt_thread_pause(libtrace_t *trace, libtrace_thread_t *t,
579                                     libtrace_packet_t *packets[],
580                                     int nb_packets, int *empty, int *offset) {
581        libtrace_packet_t * packet = NULL;
582
583        /* Let the user thread know we are going to pause */
584        send_message(trace, t, MESSAGE_PAUSING, gen_zero, t);
585
586        /* Send through any remaining packets (or messages) without delay */
587
588        /* First send those packets already read, as fast as possible
589         * This should never fail or check for messages etc. */
590        ASSERT_RET(dispatch_packets(trace, t, packets, nb_packets, empty,
591                                    offset, false), == 0);
592
593        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
594        /* If a hasher thread is running, empty input queues so we don't lose data */
595        if (trace_has_dedicated_hasher(trace)) {
596                // The hasher has stopped by this point, so the queue shouldn't be filling
597                while(!libtrace_ringbuffer_is_empty(&t->rbuffer) || t->format_data) {
598                        int ret = trace->pread(trace, t, &packet, 1);
599                        if (ret == 1) {
600                                if (packet->error > 0) {
601                                        store_first_packet(trace, packet, t);
602                                }
603                                ASSERT_RET(dispatch_packet(trace, t, &packet, false), == 0);
604                                if (packet == NULL)
605                                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
606                        } else if (ret != READ_MESSAGE) {
607                                /* Ignore messages we pick these up next loop */
608                                /*assert (ret == READ_EOF || ret == READ_ERROR);*/
609                                if (!(ret == READ_EOF || ret == READ_ERROR)) {
610                                        trace_set_err(trace, TRACE_ERR_PAUSE_PTHREAD,
611                                                "Error pausing processing thread in trace_perpkt_thread_pause()");
612                                        return -1;
613                                }
614                                /* Verify no packets are remaining */
615                                /* TODO refactor this sanity check out!! */
616                                while (!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
617                                        ASSERT_RET(trace->pread(trace, t, &packet, 1), <= 0);
618                                        // No packets after this should have any data in them
619                                        /*assert(packet->error <= 0);*/
620                                        if (!(packet->error <= 0)) {
621                                                trace_set_err(trace, TRACE_ERR_BAD_PACKET,
622                                                        "Expected no data from packets however found some in trace_perpkt_thread_pause()");
623                                                return -1;
624                                        }
625                                }
626                                libtrace_ocache_free(&trace->packet_freelist, (void **) &packet, 1, 1);
627                                return -1;
628                        }
629                }
630        }
631        libtrace_ocache_free(&trace->packet_freelist, (void **) &packet, 1, 1);
632
633        /* Now we do the actual pause, this returns when we resumed */
634        trace_thread_pause(trace, t);
635        send_message(trace, t, MESSAGE_RESUMING, gen_zero, t);
636        return 1;
637}
638
639/**
640 * The is the entry point for our packet processing threads.
641 */
642static void* perpkt_threads_entry(void *data) {
643        libtrace_t *trace = (libtrace_t *)data;
644        libtrace_thread_t *t;
645        libtrace_message_t message = {0, {.uint64=0}, NULL};
646        libtrace_packet_t *packets[trace->config.burst_size];
647        size_t i;
648        //int ret;
649        /* The current reading position into the packets */
650        int offset = 0;
651        /* The number of packets last read */
652        int nb_packets = 0;
653        /* The offset to the first NULL packet upto offset */
654        int empty = 0;
655        int j;
656
657        /* Wait until trace_pstart has been completed */
658        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
659        t = get_thread_table(trace);
660        /*assert(t);*/
661        if (!t) {
662                trace_set_err(trace, TRACE_ERR_THREAD, "Unable to get thread table in perpkt_threads_entry()");
663                return NULL;
664        }
665        if (trace->state == STATE_ERROR) {
666                thread_change_state(trace, t, THREAD_FINISHED, false);
667                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
668                pthread_exit(NULL);
669        }
670        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
671
672        if (trace->format->pregister_thread) {
673                if (trace->format->pregister_thread(trace, t, 
674                                trace_is_parallel(trace)) < 0) {
675                        thread_change_state(trace, t, THREAD_FINISHED, false);
676                        pthread_exit(NULL);
677                }
678        }
679
680        /* Fill our buffer with empty packets */
681        memset(&packets, 0, sizeof(void*) * trace->config.burst_size);
682        libtrace_ocache_alloc(&trace->packet_freelist, (void **) packets,
683                              trace->config.burst_size,
684                              trace->config.burst_size);
685
686        /* ~~~~~~~~~~~ Setup complete now we loop ~~~~~~~~~~~~~~~ */
687
688        /* Let the per_packet function know we have started */
689        send_message(trace, t, MESSAGE_STARTING, gen_zero, t);
690        send_message(trace, t, MESSAGE_RESUMING, gen_zero, t);
691
692        for (;;) {
693
694                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
695                        int ret;
696                        switch (message.code) {
697                                case MESSAGE_DO_PAUSE: // This is internal
698                                        ret = trace_perpkt_thread_pause(trace, t,
699                                              packets, nb_packets, &empty, &offset);
700                                        if (ret == READ_EOF) {
701                                                goto eof;
702                                        } else if (ret == READ_ERROR) {
703                                                goto error;
704                                        }
705                                        /*assert(ret == 1);*/
706                                        if (!(ret == 1)) {
707                                                fprintf(stderr, "Unknown error pausing thread in perpkt_threads_entry()\n");
708                                                return NULL;
709                                        }
710                                        continue;
711                                case MESSAGE_DO_STOP: // This is internal
712                                        goto eof;
713                        }
714                        send_message(trace, t, message.code, message.data, 
715                                        message.sender);
716                        /* Continue and the empty messages out before packets */
717                        continue;
718                }
719
720
721                /* Do we need to read a new set of packets MOST LIKELY we do */
722                if (offset == nb_packets) {
723                        /* Refill the packet buffer */
724                        if (empty != nb_packets) {
725                                // Refill the empty packets
726                                libtrace_ocache_alloc(&trace->packet_freelist,
727                                                      (void **) &packets[empty],
728                                                      nb_packets - empty,
729                                                      nb_packets - empty);
730                        }
731                        if (!trace->pread) {
732                                /*assert(packets[0]);*/
733                                if (!packets[0]) {
734                                        fprintf(stderr, "Packet missing in perpkt_threads_entry()\n");
735                                        return NULL;
736                                }
737                                nb_packets = trace_read_packet(trace, packets[0]);
738                                packets[0]->error = nb_packets;
739                                if (nb_packets > 0)
740                                        nb_packets = 1;
741                        } else {
742                                nb_packets = trace->pread(trace, t, packets, trace->config.burst_size);
743                        }
744                        offset = 0;
745                        empty = 0;
746                }
747
748                /* Handle error/message cases */
749                if (nb_packets > 0) {
750                        /* Store the first non-meta packet */
751                        for (j = 0; j < nb_packets; j++) {
752                                if (t->recorded_first)
753                                        break;
754                                if (packets[j]->error > 0) {
755                                        store_first_packet(trace, packets[j], t);
756                                }
757                        }
758                        dispatch_packets(trace, t, packets, nb_packets, &empty,
759                                         &offset, trace->tracetime);
760                } else {
761                        switch (nb_packets) {
762                        case READ_EOF:
763                                goto eof;
764                        case READ_ERROR:
765                                goto error;
766                        case READ_MESSAGE:
767                                nb_packets = 0;
768                                continue;
769                        default:
770                                fprintf(stderr, "Unexpected error %d!!\n", nb_packets);
771                                goto error;
772                        }
773                }
774
775        }
776
777error:
778        message.code = MESSAGE_DO_STOP;
779        message.sender = t;
780        message.data.uint64 = 0;
781        trace_message_perpkts(trace, &message);
782eof:
783        /* ~~~~~~~~~~~~~~ Trace is finished do tear down ~~~~~~~~~~~~~~~~~~~~~ */
784
785        // Let the per_packet function know we have stopped
786        send_message(trace, t, MESSAGE_PAUSING, gen_zero, t);
787        send_message(trace, t, MESSAGE_STOPPING, gen_zero, t);
788
789        // Free any remaining packets
790        for (i = 0; i < trace->config.burst_size; i++) {
791                if (packets[i]) {
792                        libtrace_ocache_free(&trace->packet_freelist, (void **) &packets[i], 1, 1);
793                        packets[i] = NULL;
794                }
795        }
796
797        thread_change_state(trace, t, THREAD_FINISHED, true);
798
799        /* Make sure the reporter sees we have finished */
800        if (trace_has_reporter(trace))
801                trace_post_reporter(trace);
802
803        // Release all ocache memory before unregistering with the format
804        // because this might(it does in DPDK) unlink the formats mempool
805        // causing destroy/finish packet to fail.
806        libtrace_ocache_unregister_thread(&trace->packet_freelist);
807        if (trace->format->punregister_thread) {
808                trace->format->punregister_thread(trace, t);
809        }
810        print_memory_stats();
811
812        pthread_exit(NULL);
813}
814
815/**
816 * The start point for our single threaded hasher thread, this will read
817 * and hash a packet from a data source and queue it against the correct
818 * core to process it.
819 */
820static void* hasher_entry(void *data) {
821        libtrace_t *trace = (libtrace_t *)data;
822        libtrace_thread_t * t;
823        int i;
824        libtrace_packet_t * packet;
825        libtrace_message_t message = {0, {.uint64=0}, NULL};
826        int pkt_skipped = 0;
827
828        /*assert(trace_has_dedicated_hasher(trace));*/
829        if (!trace_has_dedicated_hasher(trace)) {
830                fprintf(stderr, "Trace does not have hasher associated with it in hasher_entry()\n");
831                return NULL;
832        }
833        /* Wait until all threads are started and objects are initialised (ring buffers) */
834        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
835        t = &trace->hasher_thread;
836        /*assert(t->type == THREAD_HASHER && pthread_equal(pthread_self(), t->tid));*/
837        if (!(t->type == THREAD_HASHER && pthread_equal(pthread_self(), t->tid))) {
838                fprintf(stderr, "Incorrect thread type or non matching thread IDs in hasher_entry()\n");
839                return NULL;
840        }
841
842        if (trace->state == STATE_ERROR) {
843                thread_change_state(trace, t, THREAD_FINISHED, false);
844                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
845                pthread_exit(NULL);
846        }
847        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
848
849        /* We are reading but it is not the parallel API */
850        if (trace->format->pregister_thread) {
851                trace->format->pregister_thread(trace, t, true);
852        }
853
854        /* Read all packets in then hash and queue against the correct thread */
855        while (1) {
856                int thread;
857                if (!pkt_skipped)
858                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
859                /*assert(packet);*/
860                if (!packet) {
861                        fprintf(stderr, "Cannot hash and queue a NULL packet in hasher_entry()\n");
862                        return NULL;
863                }
864
865                // Check for messages that we expect MESSAGE_DO_PAUSE, (internal messages only)
866                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
867                        switch(message.code) {
868                                case MESSAGE_DO_PAUSE:
869                                        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
870                                        thread_change_state(trace, t, THREAD_PAUSED, false);
871                                        pthread_cond_broadcast(&trace->perpkt_cond);
872                                        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
873                                                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
874                                        }
875                                        thread_change_state(trace, t, THREAD_RUNNING, false);
876                                        pthread_cond_broadcast(&trace->perpkt_cond);
877                                        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
878                                        break;
879                                case MESSAGE_DO_STOP:
880                                        /* Either FINISHED or FINISHING */
881                                        /*assert(trace->started == false);*/
882                                        if (!(trace->started == false)) {
883                                                fprintf(stderr, "Expected trace to have not started in hasher_entry()\n");
884                                                return NULL;
885                                        }
886                                        /* Mark the current packet as EOF */
887                                        packet->error = 0;
888                                        goto hasher_eof;
889                                default:
890                                        fprintf(stderr, "Hasher thread didn't expect message code=%d\n", message.code);
891                        }
892                        pkt_skipped = 1;
893                        continue;
894                }
895
896                if ((packet->error = trace_read_packet(trace, packet)) <1) {
897                        if (packet->error == READ_MESSAGE) {
898                                pkt_skipped = 1;
899                                continue;
900                        } else {
901                                break; /* We are EOF or error'd either way we stop  */
902                        }
903                }
904
905                /* We are guaranteed to have a hash function i.e. != NULL */
906                trace_packet_set_hash(packet, (*trace->hasher)(packet, trace->hasher_data));
907                thread = trace_packet_get_hash(packet) % trace->perpkt_thread_count;
908                /* Blocking write to the correct queue - I'm the only writer */
909                if (trace->perpkt_threads[thread].state != THREAD_FINISHED) {
910                        uint64_t order = trace_packet_get_order(packet);
911                        libtrace_ringbuffer_write(&trace->perpkt_threads[thread].rbuffer, packet);
912                        if (trace->config.tick_count && order % trace->config.tick_count == 0) {
913                                // Write ticks to everyone else
914                                libtrace_packet_t * pkts[trace->perpkt_thread_count];
915                                memset(pkts, 0, sizeof(void *) * trace->perpkt_thread_count);
916                                libtrace_ocache_alloc(&trace->packet_freelist, (void **) pkts, trace->perpkt_thread_count, trace->perpkt_thread_count);
917                                for (i = 0; i < trace->perpkt_thread_count; i++) {
918                                        pkts[i]->error = READ_TICK;
919                                        trace_packet_set_order(pkts[i], order);
920                                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, pkts[i]);
921                                }
922                        }
923                        pkt_skipped = 0;
924                } else {
925                        /*assert(!"Dropping a packet!!");*/
926                        fprintf(stderr, "Expected THREAD_FINISHED state in hasher_entry()\n");
927                        return NULL;
928                        pkt_skipped = 1; // Reuse that packet no one read it
929                }
930        }
931hasher_eof:
932        /* Broadcast our last failed read to all threads */
933        for (i = 0; i < trace->perpkt_thread_count; i++) {
934                libtrace_packet_t * bcast;
935                if (i == trace->perpkt_thread_count - 1) {
936                        bcast = packet;
937                } else {
938                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &bcast, 1, 1);
939                        bcast->error = packet->error;
940                }
941                ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
942                if (trace->perpkt_threads[i].state != THREAD_FINISHED) {
943                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, bcast);
944                } else {
945                        libtrace_ocache_free(&trace->packet_freelist, (void **) &bcast, 1, 1);
946                }
947                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
948        }
949
950        // We don't need to free the packet
951        thread_change_state(trace, t, THREAD_FINISHED, true);
952
953        libtrace_ocache_unregister_thread(&trace->packet_freelist);
954        if (trace->format->punregister_thread) {
955                trace->format->punregister_thread(trace, t);
956        }
957        print_memory_stats();
958
959        // TODO remove from TTABLE t sometime
960        pthread_exit(NULL);
961}
962
963/* Our simplest case when a thread becomes ready it can obtain an exclusive
964 * lock to read packets from the underlying trace.
965 */
966static int trace_pread_packet_first_in_first_served(libtrace_t *libtrace,
967                                                    libtrace_thread_t *t,
968                                                    libtrace_packet_t *packets[],
969                                                    size_t nb_packets) {
970        size_t i = 0;
971        //bool tick_hit = false;
972
973        ASSERT_RET(pthread_mutex_lock(&libtrace->read_packet_lock), == 0);
974        /* Read nb_packets */
975        for (i = 0; i < nb_packets; ++i) {
976                if (libtrace_message_queue_count(&t->messages) > 0) {
977                        if ( i==0 ) {
978                                ASSERT_RET(pthread_mutex_unlock(&libtrace->read_packet_lock), == 0);
979                                return READ_MESSAGE;
980                        } else {
981                                break;
982                        }
983                }
984                packets[i]->error = trace_read_packet(libtrace, packets[i]);
985
986                if (packets[i]->error <= 0) {
987                        /* We'll catch this next time if we have already got packets */
988                        if ( i==0 ) {
989                                ASSERT_RET(pthread_mutex_unlock(&libtrace->read_packet_lock), == 0);
990                                return packets[i]->error;
991                        } else {
992                                break;
993                        }
994                }
995                /*
996                if (libtrace->config.tick_count && trace_packet_get_order(packets[i]) % libtrace->config.tick_count == 0) {
997                        tick_hit = true;
998                }*/
999
1000                // Doing this inside the lock ensures the first packet is
1001                // always recorded first
1002                if (!t->recorded_first && packets[0]->error > 0) {
1003                        store_first_packet(libtrace, packets[0], t);
1004                }
1005        }
1006        ASSERT_RET(pthread_mutex_unlock(&libtrace->read_packet_lock), == 0);
1007        /* XXX TODO this needs to be inband with packets, or we don't bother in this case
1008        if (tick_hit) {
1009                libtrace_message_t tick;
1010                tick.additional.uint64 = trace_packet_get_order(packets[i]);
1011                tick.code = MESSAGE_TICK;
1012                trace_send_message_to_perpkts(libtrace, &tick);
1013        } */
1014        return i;
1015}
1016
1017/**
1018 * For the case that we have a dedicated hasher thread
1019 * 1. We read a packet from our buffer
1020 * 2. Move that into the packet provided (packet)
1021 */
1022inline static int trace_pread_packet_hasher_thread(libtrace_t *libtrace,
1023                                                   libtrace_thread_t *t,
1024                                                   libtrace_packet_t *packets[],
1025                                                   size_t nb_packets) {
1026        size_t i;
1027
1028        /* We store the last error message here */
1029        if (t->format_data) {
1030                return ((libtrace_packet_t *)t->format_data)->error;
1031        }
1032
1033        // Always grab at least one
1034        if (packets[0]) // Recycle the old get the new
1035                libtrace_ocache_free(&libtrace->packet_freelist, (void **) packets, 1, 1);
1036        packets[0] = libtrace_ringbuffer_read(&t->rbuffer);
1037
1038        if (packets[0]->error <= 0 && packets[0]->error != READ_TICK) {
1039                return packets[0]->error;
1040        }
1041
1042        for (i = 1; i < nb_packets; i++) {
1043                if (packets[i]) // Recycle the old get the new
1044                        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packets[i], 1, 1);
1045                if (!libtrace_ringbuffer_try_read(&t->rbuffer, (void **) &packets[i])) {
1046                        packets[i] = NULL;
1047                        break;
1048                }
1049
1050                /* We will return an error or EOF the next time around */
1051                if (packets[i]->error <= 0 && packets[0]->error != READ_TICK) {
1052                        /* The message case will be checked automatically -
1053                           However other cases like EOF and error will only be
1054                           sent once*/
1055                        if (packets[i]->error != READ_MESSAGE) {
1056                                /*assert(t->format_data == NULL);*/
1057                                if (!(t->format_data == NULL)) {
1058                                        trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
1059                                                "Expected format data to be NULL in trace_pread_packet_hasher_thread()");
1060                                        return -1;
1061                                }
1062                                t->format_data = packets[i];
1063                        }
1064                        break;
1065                }
1066        }
1067
1068        return i;
1069}
1070
1071/**
1072 * For the first packet of each queue we keep a copy and note the system
1073 * time it was received at.
1074 *
1075 * This is used for finding the first packet when playing back a trace
1076 * in trace time. And can be used by real time applications to print
1077 * results out every XXX seconds.
1078 */
1079void store_first_packet(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t)
1080{
1081
1082        libtrace_message_t mesg = {0, {.uint64=0}, NULL};
1083        struct timeval tv;
1084        libtrace_packet_t * dup;
1085
1086        if (t->recorded_first) {
1087                return;
1088        }
1089
1090        if (IS_LIBTRACE_META_PACKET(packet)) {
1091                return;
1092        }
1093
1094        /* We mark system time against a copy of the packet */
1095        gettimeofday(&tv, NULL);
1096        dup = trace_copy_packet(packet);
1097
1098        ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
1099        libtrace->first_packets.packets[t->perpkt_num].packet = dup;
1100        memcpy(&libtrace->first_packets.packets[t->perpkt_num].tv, &tv, sizeof(tv));
1101        libtrace->first_packets.count++;
1102
1103        /* Now update the first */
1104        if (libtrace->first_packets.count == 1) {
1105                /* We the first entry hence also the first known packet */
1106                libtrace->first_packets.first = t->perpkt_num;
1107        } else {
1108                /* Check if we are newer than the previous 'first' packet */
1109                size_t first = libtrace->first_packets.first;
1110                struct timeval cur_ts = trace_get_timeval(dup);
1111                struct timeval first_ts = trace_get_timeval(libtrace->first_packets.packets[first].packet);
1112                if (timercmp(&cur_ts, &first_ts, <))
1113                        libtrace->first_packets.first = t->perpkt_num;
1114        }
1115        ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
1116
1117        memset(&mesg, 0, sizeof(libtrace_message_t));
1118        mesg.code = MESSAGE_FIRST_PACKET;
1119        trace_message_reporter(libtrace, &mesg);
1120        trace_message_perpkts(libtrace, &mesg);
1121        t->recorded_first = true;
1122}
1123
1124DLLEXPORT int trace_get_first_packet(libtrace_t *libtrace,
1125                                     libtrace_thread_t *t,
1126                                     const libtrace_packet_t **packet,
1127                                     const struct timeval **tv)
1128{
1129        void * tmp;
1130        int ret = 0;
1131
1132        if (t) {
1133                if (t->type != THREAD_PERPKT || t->trace != libtrace)
1134                        return -1;
1135        }
1136
1137        /* Throw away these which we don't use */
1138        if (!packet)
1139                packet = (const libtrace_packet_t **) &tmp;
1140        if (!tv)
1141                tv = (const struct timeval **) &tmp;
1142
1143        ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
1144        if (t) {
1145                /* Get the requested thread */
1146                *packet = libtrace->first_packets.packets[t->perpkt_num].packet;
1147                *tv = &libtrace->first_packets.packets[t->perpkt_num].tv;
1148        } else if (libtrace->first_packets.count) {
1149                /* Get the first packet across all threads */
1150                *packet = libtrace->first_packets.packets[libtrace->first_packets.first].packet;
1151                *tv = &libtrace->first_packets.packets[libtrace->first_packets.first].tv;
1152                if (libtrace->first_packets.count == (size_t) libtrace->perpkt_thread_count) {
1153                        ret = 1;
1154                } else {
1155                        struct timeval curr_tv;
1156                        // If a second has passed since the first entry we will assume this is the very first packet
1157                        gettimeofday(&curr_tv, NULL);
1158                        if (curr_tv.tv_sec > (*tv)->tv_sec) {
1159                                if(curr_tv.tv_usec > (*tv)->tv_usec || curr_tv.tv_sec - (*tv)->tv_sec > 1) {
1160                                        ret = 1;
1161                                }
1162                        }
1163                }
1164        } else {
1165                *packet = NULL;
1166                *tv = NULL;
1167        }
1168        ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
1169        return ret;
1170}
1171
1172
1173DLLEXPORT uint64_t tv_to_usec(const struct timeval *tv)
1174{
1175        return (uint64_t) tv->tv_sec*1000000ull + (uint64_t) tv->tv_usec;
1176}
1177
1178inline static struct timeval usec_to_tv(uint64_t usec)
1179{
1180        struct timeval tv;
1181        tv.tv_sec = usec / 1000000;
1182        tv.tv_usec = usec % 1000000;
1183        return tv;
1184}
1185
1186/** Similar to delay_tracetime but send messages to all threads periodically */
1187static void* reporter_entry(void *data) {
1188        libtrace_message_t message = {0, {.uint64=0}, NULL};
1189        libtrace_t *trace = (libtrace_t *)data;
1190        libtrace_thread_t *t = &trace->reporter_thread;
1191
1192        /* Wait until all threads are started */
1193        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
1194        if (trace->state == STATE_ERROR) {
1195                thread_change_state(trace, t, THREAD_FINISHED, false);
1196                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1197                pthread_exit(NULL);
1198        }
1199        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1200
1201        if (trace->format->pregister_thread) {
1202                trace->format->pregister_thread(trace, t, false);
1203        }
1204
1205        send_message(trace, t, MESSAGE_STARTING, (libtrace_generic_t){0}, t);
1206        send_message(trace, t, MESSAGE_RESUMING, (libtrace_generic_t){0}, t);
1207
1208        while (!trace_has_finished(trace)) {
1209                if (trace->config.reporter_polling) {
1210                        if (libtrace_message_queue_try_get(&t->messages, &message) == LIBTRACE_MQ_FAILED)
1211                                message.code = MESSAGE_POST_REPORTER;
1212                } else {
1213                        libtrace_message_queue_get(&t->messages, &message);
1214                }
1215                switch (message.code) {
1216                        // Check for results
1217                        case MESSAGE_POST_REPORTER:
1218                                trace->combiner.read(trace, &trace->combiner);
1219                                break;
1220                        case MESSAGE_DO_PAUSE:
1221                                /*assert(trace->combiner.pause);*/
1222                                if(!(trace->combiner.pause)) {
1223                                        trace_set_err(trace, TRACE_ERR_COMBINER,
1224                                                "Expected combiner to be paused in reporter_entry()");
1225                                        return NULL;
1226                                }
1227                                trace->combiner.pause(trace, &trace->combiner);
1228                                send_message(trace, t, MESSAGE_PAUSING,
1229                                                (libtrace_generic_t) {0}, t);
1230                                trace_thread_pause(trace, t);
1231                                send_message(trace, t, MESSAGE_RESUMING,
1232                                                (libtrace_generic_t) {0}, t);
1233                                break;
1234                default:
1235                        send_message(trace, t, message.code, message.data,
1236                                        message.sender);
1237                }
1238        }
1239
1240        // Flush out whats left now all our threads have finished
1241        trace->combiner.read_final(trace, &trace->combiner);
1242
1243        // GOODBYE
1244        send_message(trace, t, MESSAGE_PAUSING,(libtrace_generic_t) {0}, t);
1245        send_message(trace, t, MESSAGE_STOPPING,(libtrace_generic_t) {0}, t);
1246
1247        thread_change_state(trace, &trace->reporter_thread, THREAD_FINISHED, true);
1248        print_memory_stats();
1249        return NULL;
1250}
1251
1252/** Similar to delay_tracetime but send messages to all threads periodically */
1253static void* keepalive_entry(void *data) {
1254        struct timeval prev, next;
1255        libtrace_message_t message = {0, {.uint64=0}, NULL};
1256        libtrace_t *trace = (libtrace_t *)data;
1257        uint64_t next_release;
1258        libtrace_thread_t *t = &trace->keepalive_thread;
1259
1260        /* Wait until all threads are started */
1261        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
1262        if (trace->state == STATE_ERROR) {
1263                thread_change_state(trace, t, THREAD_FINISHED, false);
1264                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1265                pthread_exit(NULL);
1266        }
1267        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1268
1269        gettimeofday(&prev, NULL);
1270        memset(&message, 0, sizeof(libtrace_message_t));
1271        message.code = MESSAGE_TICK_INTERVAL;
1272
1273        while (trace->state != STATE_FINISHED) {
1274                fd_set rfds;
1275                next_release = tv_to_usec(&prev) + (trace->config.tick_interval * 1000);
1276                gettimeofday(&next, NULL);
1277                if (next_release > tv_to_usec(&next)) {
1278                        next = usec_to_tv(next_release - tv_to_usec(&next));
1279                        // Wait for timeout or a message
1280                        FD_ZERO(&rfds);
1281                        FD_SET(libtrace_message_queue_get_fd(&t->messages), &rfds);
1282                        if (select(libtrace_message_queue_get_fd(&t->messages)+1, &rfds, NULL, NULL, &next) == 1) {
1283                                libtrace_message_t msg;
1284                                libtrace_message_queue_get(&t->messages, &msg);
1285                                /*assert(msg.code == MESSAGE_DO_STOP);*/
1286                                if (!(msg.code == MESSAGE_DO_STOP)) {
1287                                        fprintf(stderr, "Unexpected message code in keepalive_entry()\n");
1288                                        return NULL;
1289                                }
1290                                goto done;
1291                        }
1292                }
1293                prev = usec_to_tv(next_release);
1294                if (trace->state == STATE_RUNNING) {
1295                        message.data.uint64 = ((((uint64_t)prev.tv_sec) << 32) +
1296                                               (((uint64_t)prev.tv_usec << 32)/1000000));
1297                        trace_message_perpkts(trace, &message);
1298                }
1299        }
1300done:
1301
1302        thread_change_state(trace, t, THREAD_FINISHED, true);
1303        return NULL;
1304}
1305
1306/**
1307 * Delays a packets playback so the playback will be in trace time.
1308 * This may break early if a message becomes available.
1309 *
1310 * Requires the first packet for this thread to be received.
1311 * @param libtrace  The trace
1312 * @param packet    The packet to delay
1313 * @param t         The current thread
1314 * @return Either READ_MESSAGE(-2) or 0 is successful
1315 */
1316static inline int delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t) {
1317        struct timeval curr_tv, pkt_tv;
1318        uint64_t next_release = t->tracetime_offset_usec;
1319        uint64_t curr_usec;
1320
1321        if (!t->tracetime_offset_usec) {
1322                const libtrace_packet_t *first_pkt;
1323                const struct timeval *sys_tv;
1324                int64_t initial_offset;
1325                int stable = trace_get_first_packet(libtrace, NULL, &first_pkt, &sys_tv);
1326                if (!first_pkt)
1327                        return 0;
1328                pkt_tv = trace_get_timeval(first_pkt);
1329                initial_offset = (int64_t)tv_to_usec(sys_tv) - (int64_t)tv_to_usec(&pkt_tv);
1330                /* In the unlikely case offset is 0, change it to 1 */
1331                if (stable)
1332                        t->tracetime_offset_usec = initial_offset ? initial_offset: 1;
1333                next_release = initial_offset;
1334        }
1335        /* next_release == offset */
1336        pkt_tv = trace_get_timeval(packet);
1337        next_release += tv_to_usec(&pkt_tv);
1338        gettimeofday(&curr_tv, NULL);
1339        curr_usec = tv_to_usec(&curr_tv);
1340        if (next_release > curr_usec) {
1341                int ret, mesg_fd = libtrace_message_queue_get_fd(&t->messages);
1342                struct timeval delay_tv = usec_to_tv(next_release-curr_usec);
1343                fd_set rfds;
1344                FD_ZERO(&rfds);
1345                FD_SET(mesg_fd, &rfds);
1346                // We need to wait
1347                ret = select(mesg_fd+1, &rfds, NULL, NULL, &delay_tv);
1348                if (ret == 0) {
1349                        return 0;
1350                } else if (ret > 0) {
1351                        return READ_MESSAGE;
1352                } else {
1353                        /*assert(!"trace_delay_packet: Unexpected return from select");*/
1354                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, "Unexpected return from select in delay_tracetime()");
1355                        return -1;
1356                }
1357        }
1358        return 0;
1359}
1360
1361/* Discards packets that don't match the filter.
1362 * Discarded packets are emptied and then moved to the end of the packet list.
1363 *
1364 * @param trace       The trace format, containing the filter
1365 * @param packets     An array of packets
1366 * @param nb_packets  The number of valid items in packets
1367 *
1368 * @return The number of packets that passed the filter, which are moved to
1369 *          the start of the packets array
1370 */
1371static inline size_t filter_packets(libtrace_t *trace,
1372                                    libtrace_packet_t **packets,
1373                                    size_t nb_packets) {
1374        size_t offset = 0;
1375        size_t i;
1376
1377        for (i = 0; i < nb_packets; ++i) {
1378                // The filter needs the trace attached to receive the link type
1379                packets[i]->trace = trace;
1380                if (trace_apply_filter(trace->filter, packets[i])) {
1381                        libtrace_packet_t *tmp;
1382                        tmp = packets[offset];
1383                        packets[offset++] = packets[i];
1384                        packets[i] = tmp;
1385                } else {
1386                        trace_fin_packet(packets[i]);
1387                }
1388        }
1389
1390        return offset;
1391}
1392
1393/* Read a batch of packets from the trace into a buffer.
1394 * Note that this function will block until a packet is read (or EOF is reached)
1395 *
1396 * @param libtrace    The trace
1397 * @param t           The thread
1398 * @param packets     An array of packets
1399 * @param nb_packets  The number of empty packets in packets
1400 * @return The number of packets read, 0 on EOF (or an error/message -1,-2).
1401 */
1402static int trace_pread_packet_wrapper(libtrace_t *libtrace,
1403                                      libtrace_thread_t *t,
1404                                      libtrace_packet_t *packets[],
1405                                      size_t nb_packets) {
1406        int i;
1407        /*assert(libtrace && "libtrace is NULL in trace_read_packet()");*/
1408        if (!libtrace) {
1409                fprintf(stderr, "NULL trace passed into trace_read_packet()\n");
1410                return TRACE_ERR_NULL_TRACE;
1411        }
1412        /*assert(nb_packets);*/
1413        if (!nb_packets) {
1414                trace_set_err(libtrace, TRACE_ERR_NULL,
1415                        "NULL nb_packets passed into trace_pread_packet_wrapper()");
1416                return -1;
1417        }
1418        if (trace_is_err(libtrace))
1419                return -1;
1420        if (!libtrace->started) {
1421                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
1422                              "You must call libtrace_start() before trace_read_packet()\n");
1423                return -1;
1424        }
1425
1426        if (libtrace->format->pread_packets) {
1427                int ret;
1428                for (i = 0; i < (int) nb_packets; ++i) {
1429                        /*assert(i[packets]); <--- is this ment to be packets[i]?? */
1430                        if (!i[packets]) {
1431                                trace_set_err(libtrace, TRACE_ERR_BAD_STATE, "NULL packets in trace_read_packet()");
1432                                return -1;
1433                        }
1434                        if (!(packets[i]->buf_control==TRACE_CTRL_PACKET ||
1435                              packets[i]->buf_control==TRACE_CTRL_EXTERNAL)) {
1436                                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,
1437                                              "Packet passed to trace_read_packet() is invalid\n");
1438                                return -1;
1439                        }
1440                        packets[i]->which_trace_start = libtrace->startcount;
1441                }
1442                do {
1443                        ret=libtrace->format->pread_packets(libtrace, t,
1444                                                            packets,
1445                                                            nb_packets);
1446                        /* Error, EOF or message? */
1447                        if (ret <= 0) {
1448                                return ret;
1449                        }
1450
1451                        if (libtrace->filter) {
1452                                int remaining;
1453                                remaining = filter_packets(libtrace,
1454                                                           packets, ret);
1455                                t->filtered_packets += ret - remaining;
1456                                ret = remaining;
1457                        }
1458                        for (i = 0; i < ret; ++i) {
1459                                /* We do not mark the packet against the trace,
1460                                 * before hand or after. After breaks DAG meta
1461                                 * packets and before is inefficient */
1462                                //packets[i]->trace = libtrace;
1463                                /* TODO IN FORMAT?? Like traditional libtrace */
1464                                if (libtrace->snaplen>0)
1465                                        trace_set_capture_length(packets[i],
1466                                                        libtrace->snaplen);
1467                        }
1468                } while(ret == 0);
1469                return ret;
1470        }
1471        trace_set_err(libtrace, TRACE_ERR_UNSUPPORTED,
1472                      "This format does not support reading packets\n");
1473        return ~0U;
1474}
1475
1476/* Restarts a parallel trace, this is called from trace_pstart.
1477 * The libtrace lock is held upon calling this function.
1478 * Typically with a parallel trace the threads are not
1479 * killed rather.
1480 */
1481static int trace_prestart(libtrace_t * libtrace, void *global_blob,
1482                          libtrace_callback_set_t *per_packet_cbs, 
1483                          libtrace_callback_set_t *reporter_cbs) {
1484        int i, err = 0;
1485        if (libtrace->state != STATE_PAUSED) {
1486                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
1487                        "trace(%s) is not currently paused",
1488                              libtrace->uridata);
1489                return -1;
1490        }
1491
1492        /*assert(libtrace_parallel);*/
1493        if (!libtrace_parallel) {
1494                trace_set_err(libtrace, TRACE_ERR_THREAD, "Expected parallel trace in trace_prestart()");
1495                return -1;
1496        }
1497        /*assert(!libtrace->perpkt_thread_states[THREAD_RUNNING]);*/
1498        if (!(!libtrace->perpkt_thread_states[THREAD_RUNNING])) {
1499                trace_set_err(libtrace, TRACE_ERR_THREAD, "Expected no running threads in trace_prestart()");
1500                return -1;
1501        }
1502
1503        /* Reset first packets */
1504        pthread_spin_lock(&libtrace->first_packets.lock);
1505        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1506                /*assert(!!libtrace->perpkt_threads[i].recorded_first == !!libtrace->first_packets.packets[i].packet);*/
1507                if (!(!!libtrace->perpkt_threads[i].recorded_first == !!libtrace->first_packets.packets[i].packet)) {
1508                        trace_set_err(libtrace, TRACE_ERR_THREAD, "Expected different first packet in trace_pstart()");
1509                        return -1;
1510                }
1511                if (libtrace->first_packets.packets[i].packet) {
1512                        trace_destroy_packet(libtrace->first_packets.packets[i].packet);
1513                        libtrace->first_packets.packets[i].packet = NULL;
1514                        libtrace->first_packets.packets[i].tv.tv_sec = 0;
1515                        libtrace->first_packets.packets[i].tv.tv_usec = 0;
1516                        libtrace->first_packets.count--;
1517                        libtrace->perpkt_threads[i].recorded_first = false;
1518                }
1519        }
1520        /*assert(libtrace->first_packets.count == 0);*/
1521        if (!(libtrace->first_packets.count == 0)) {
1522                trace_set_err(libtrace, TRACE_ERR_THREAD, "Expected a first packets count of 0 in trace_pstart()");
1523                return -1;
1524        }
1525        libtrace->first_packets.first = 0;
1526        pthread_spin_unlock(&libtrace->first_packets.lock);
1527
1528        /* Reset delay */
1529        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1530                libtrace->perpkt_threads[i].tracetime_offset_usec = 0;
1531        }
1532
1533        /* Reset statistics */
1534        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1535                libtrace->perpkt_threads[i].accepted_packets = 0;
1536                libtrace->perpkt_threads[i].filtered_packets = 0;
1537        }
1538        libtrace->accepted_packets = 0;
1539        libtrace->filtered_packets = 0;
1540
1541        /* Update functions if requested */
1542        if(global_blob)
1543                libtrace->global_blob = global_blob;
1544
1545        if (per_packet_cbs) {
1546                if (libtrace->perpkt_cbs)
1547                        trace_destroy_callback_set(libtrace->perpkt_cbs);
1548                libtrace->perpkt_cbs = trace_create_callback_set();
1549                memcpy(libtrace->perpkt_cbs, per_packet_cbs, 
1550                                sizeof(libtrace_callback_set_t));
1551        }
1552
1553        if (reporter_cbs) {
1554                if (libtrace->reporter_cbs)
1555                        trace_destroy_callback_set(libtrace->reporter_cbs);
1556
1557                libtrace->reporter_cbs = trace_create_callback_set();
1558                memcpy(libtrace->reporter_cbs, reporter_cbs,
1559                                sizeof(libtrace_callback_set_t));
1560        }
1561
1562        if (trace_is_parallel(libtrace)) {
1563                err = libtrace->format->pstart_input(libtrace);
1564        } else {
1565                if (libtrace->format->start_input) {
1566                        err = libtrace->format->start_input(libtrace);
1567                }
1568        }
1569
1570        if (err == 0) {
1571                libtrace->started = true;
1572                libtrace->startcount ++;
1573                libtrace_change_state(libtrace, STATE_RUNNING, false);
1574        }
1575        return err;
1576}
1577
1578/**
1579 * @return the number of CPU cores on the machine. -1 if unknown.
1580 */
1581SIMPLE_FUNCTION static int get_nb_cores() {
1582        int numCPU;
1583#ifdef _SC_NPROCESSORS_ONLN
1584        /* Most systems do this now */
1585        numCPU = sysconf(_SC_NPROCESSORS_ONLN);
1586
1587#else
1588        int mib[] = {CTL_HW, HW_AVAILCPU};
1589        size_t len = sizeof(numCPU);
1590
1591        /* get the number of CPUs from the system */
1592        sysctl(mib, 2, &numCPU, &len, NULL, 0);
1593#endif
1594        return numCPU <= 0 ? 1 : numCPU;
1595}
1596
1597/**
1598 * Verifies the configuration and sets default values for any values not
1599 * specified by the user.
1600 */
1601static void verify_configuration(libtrace_t *libtrace) {
1602
1603        if (libtrace->config.hasher_queue_size <= 0)
1604                libtrace->config.hasher_queue_size = 1000;
1605
1606        if (libtrace->config.perpkt_threads <= 0) {
1607                libtrace->perpkt_thread_count = get_nb_cores();
1608                if (libtrace->perpkt_thread_count <= 0)
1609                        // Lets just use one
1610                        libtrace->perpkt_thread_count = 1;
1611        } else {
1612                libtrace->perpkt_thread_count = libtrace->config.perpkt_threads;
1613        }
1614
1615        if (libtrace->config.reporter_thold <= 0)
1616                libtrace->config.reporter_thold = 100;
1617        if (libtrace->config.burst_size <= 0)
1618                libtrace->config.burst_size = 32;
1619        if (libtrace->config.thread_cache_size <= 0)
1620                libtrace->config.thread_cache_size = 64;
1621        if (libtrace->config.cache_size <= 0)
1622                libtrace->config.cache_size = (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count;
1623
1624        if (libtrace->config.cache_size <
1625                (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count)
1626                fprintf(stderr, "WARNING deadlocks may occur and extra memory allocating buffer sizes (packet_freelist_size) mismatched\n");
1627
1628        if (libtrace->combiner.initialise == NULL && libtrace->combiner.publish == NULL)
1629                libtrace->combiner = combiner_unordered;
1630
1631        /* Figure out if we are using a dedicated hasher thread? */
1632        if (libtrace->hasher && libtrace->perpkt_thread_count > 1) {
1633                libtrace->hasher_thread.type = THREAD_HASHER;
1634        }
1635}
1636
1637/**
1638 * Starts a libtrace_thread, including allocating memory for messaging.
1639 * Threads are expected to wait until the libtrace look is released.
1640 * Hence why we don't init structures until later.
1641 *
1642 * @param trace The trace the thread is associated with
1643 * @param t The thread that is filled when the thread is started
1644 * @param type The type of thread
1645 * @param start_routine The entry location of the thread
1646 * @param perpkt_num The perpkt thread number (should be set -1 if not perpkt)
1647 * @param name For debugging purposes set the threads name (Optional)
1648 *
1649 * @return 0 on success or -1 upon error in which case the libtrace error is set.
1650 *         In this situation the thread structure is zeroed.
1651 */
1652static int trace_start_thread(libtrace_t *trace,
1653                       libtrace_thread_t *t,
1654                       enum thread_types type,
1655                       void *(*start_routine) (void *),
1656                       int perpkt_num,
1657                       const char *name) {
1658#ifdef __linux__
1659        cpu_set_t cpus;
1660        int i;
1661#endif
1662        int ret;
1663        /*assert(t->type == THREAD_EMPTY);*/
1664        if (!(t->type == THREAD_EMPTY)) {
1665                trace_set_err(trace, TRACE_ERR_THREAD,
1666                        "Expected thread type of THREAD_EMPTY in trace_start_thread()");
1667                return -1;
1668        }
1669        t->trace = trace;
1670        t->ret = NULL;
1671        t->user_data = NULL;
1672        t->type = type;
1673        t->state = THREAD_RUNNING;
1674
1675        /*assert(name);*/
1676        if (!name) {
1677                trace_set_err(trace, TRACE_ERR_THREAD, "NULL thread name in trace_start_thread()");
1678                return -1;
1679        }
1680
1681#ifdef __linux__
1682        CPU_ZERO(&cpus);
1683        for (i = 0; i < get_nb_cores(); i++)
1684                CPU_SET(i, &cpus);
1685
1686        ret = pthread_create(&t->tid, NULL, start_routine, (void *) trace);
1687        if( ret == 0 ) {
1688                ret = pthread_setaffinity_np(t->tid, sizeof(cpus), &cpus);
1689        }
1690
1691#else
1692        ret = pthread_create(&t->tid, NULL, start_routine, (void *) trace);
1693#endif
1694        if (ret != 0) {
1695                libtrace_zero_thread(t);
1696                trace_set_err(trace, ret, "Failed to create a thread of type=%d\n", type);
1697                return -1;
1698        }
1699        libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
1700        if (trace_has_dedicated_hasher(trace) && type == THREAD_PERPKT) {
1701                libtrace_ringbuffer_init(&t->rbuffer,
1702                                         trace->config.hasher_queue_size,
1703                                         trace->config.hasher_polling?
1704                                                 LIBTRACE_RINGBUFFER_POLLING:
1705                                                 LIBTRACE_RINGBUFFER_BLOCKING);
1706        }
1707#if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__)
1708        if(name)
1709                pthread_setname_np(t->tid, name);
1710#endif
1711        t->perpkt_num = perpkt_num;
1712        return 0;
1713}
1714
1715/** Parses the environment variable LIBTRACE_CONF into the supplied
1716 * configuration structure.
1717 *
1718 * @param[in,out] libtrace The trace from which we determine the URI and set
1719 * the configuration.
1720 *
1721 * We search for 3 environment variables and apply them to the config in the
1722 * following order. Such that the first has the lowest priority.
1723 *
1724 * 1. LIBTRACE_CONF, The global environment configuration
1725 * 2. LIBTRACE_CONF_<FORMAT>, Applied to a given format
1726 * 3. LIBTRACE_CONF_<FORMAT_URI>, Applied the specified trace
1727 *
1728 * E.g.
1729 * - int:eth0 would match LIBTRACE_CONF, LIBTRACE_CONF_INT, LIBTRACE_CONF_INT_ETH0
1730 * - dag:/dev/dag0,0 would match LIBTRACE_CONF, LIBTRACE_CONF_DAG, LIBTRACE_CONF_DAG__DEV_DAG0_0
1731 * - test.erf would match LIBTRACE_CONF, LIBTRACE_CONF_ERF, LIBTRACE_CONF_ERF_TEST_ERF
1732 *
1733 * @note All environment variables names MUST only contian
1734 * [A-Z], [0-9] and [_] (underscore) and not start with a number. Any characters
1735 * outside of this range should be captilised if possible or replaced with an
1736 * underscore.
1737 */
1738static void parse_env_config (libtrace_t *libtrace) {
1739        char env_name[1024] = "LIBTRACE_CONF_";
1740        size_t len = strlen(env_name);
1741        size_t mark = 0;
1742        size_t i;
1743        char * env;
1744
1745        /* Make our compound string */
1746        strncpy(&env_name[len], libtrace->format->name, sizeof(env_name) - len);
1747        len += strlen(libtrace->format->name);
1748        strncpy(&env_name[len], ":", sizeof(env_name) - len);
1749        len += 1;
1750        strncpy(&env_name[len], libtrace->uridata, sizeof(env_name) - len);
1751
1752        /* env names are allowed to be A-Z (CAPS) 0-9 and _ */
1753        for (i = 0; env_name[i] != 0; ++i) {
1754                env_name[i] = toupper(env_name[i]);
1755                if(env_name[i] == ':') {
1756                        mark = i;
1757                }
1758                if (!( (env_name[i] >= 'A' && env_name[i] <= 'Z') ||
1759                       (env_name[i] >= '0' && env_name[i] <= '9') )) {
1760                        env_name[i] = '_';
1761                }
1762        }
1763
1764        /* First apply global env settings LIBTRACE_CONF */
1765        env = getenv("LIBTRACE_CONF");
1766        if (env)
1767        {
1768                printf("Got env %s", env);
1769                trace_set_configuration(libtrace, env);
1770        }
1771
1772        /* Then format settings LIBTRACE_CONF_<FORMAT> */
1773        if (mark != 0) {
1774                env_name[mark] = 0;
1775                env = getenv(env_name);
1776                if (env) {
1777                        trace_set_configuration(libtrace, env);
1778                }
1779                env_name[mark] = '_';
1780        }
1781
1782        /* Finally this specific trace LIBTRACE_CONF_<FORMAT_URI> */
1783        env = getenv(env_name);
1784        if (env) {
1785                trace_set_configuration(libtrace, env);
1786        }
1787}
1788
1789DLLEXPORT bool trace_is_parallel(libtrace_t * libtrace) {
1790        if (libtrace->state == STATE_NEW)
1791                return trace_supports_parallel(libtrace);
1792        return libtrace->pread == trace_pread_packet_wrapper;
1793}
1794
1795DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob,
1796                           libtrace_callback_set_t *per_packet_cbs,
1797                           libtrace_callback_set_t *reporter_cbs) {
1798        int i;
1799        int ret = -1;
1800        char name[24];
1801        sigset_t sig_before, sig_block_all;
1802        /*assert(libtrace);*/
1803        if (!libtrace) {
1804                fprintf(stderr, "NULL trace passed to trace_pstart()\n");
1805                return -1;
1806        }
1807
1808        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1809        if (trace_is_err(libtrace)) {
1810                goto cleanup_none;
1811        }
1812
1813        if (libtrace->state == STATE_PAUSED) {
1814                ret = trace_prestart(libtrace, global_blob, per_packet_cbs, 
1815                                reporter_cbs);
1816                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1817                return ret;
1818        }
1819
1820        if (libtrace->state != STATE_NEW) {
1821                trace_set_err(libtrace, TRACE_ERR_BAD_STATE, "trace_pstart "
1822                              "should be called on a NEW or PAUSED trace but "
1823                              "instead was called from %s",
1824                              get_trace_state_name(libtrace->state));
1825                goto cleanup_none;
1826        }
1827
1828        /* Store the user defined things against the trace */
1829        libtrace->global_blob = global_blob;
1830
1831        /* Save a copy of the callbacks in case the user tries to change them
1832         * on us later */
1833        if (!per_packet_cbs) {
1834                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "trace_pstart "
1835                                "requires a non-NULL set of per packet "
1836                                "callbacks.");
1837                goto cleanup_none;
1838        }
1839
1840        if (per_packet_cbs->message_packet == NULL) {
1841                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "The per "
1842                                "packet callbacks must include a handler "
1843                                "for a packet. Please set this using "
1844                                "trace_set_packet_cb().");
1845                goto cleanup_none;
1846        }
1847
1848        libtrace->perpkt_cbs = trace_create_callback_set();
1849        memcpy(libtrace->perpkt_cbs, per_packet_cbs, sizeof(libtrace_callback_set_t));
1850       
1851        if (reporter_cbs) {
1852                libtrace->reporter_cbs = trace_create_callback_set();
1853                memcpy(libtrace->reporter_cbs, reporter_cbs, sizeof(libtrace_callback_set_t));
1854        }
1855
1856       
1857
1858
1859        /* And zero other fields */
1860        for (i = 0; i < THREAD_STATE_MAX; ++i) {
1861                libtrace->perpkt_thread_states[i] = 0;
1862        }
1863        libtrace->first_packets.first = 0;
1864        libtrace->first_packets.count = 0;
1865        libtrace->first_packets.packets = NULL;
1866        libtrace->perpkt_threads = NULL;
1867        /* Set a global which says we are using a parallel trace. This is
1868         * for backwards compatibility due to changes when destroying packets */
1869        libtrace_parallel = 1;
1870
1871        /* Parses configuration passed through environment variables */
1872        parse_env_config(libtrace);
1873        verify_configuration(libtrace);
1874
1875        ret = -1;
1876        /* Try start the format - we prefer parallel over single threaded, as
1877         * these formats should support messages better */
1878
1879        if (trace_supports_parallel(libtrace) &&
1880            !trace_has_dedicated_hasher(libtrace)) {
1881                ret = libtrace->format->pstart_input(libtrace);
1882                libtrace->pread = trace_pread_packet_wrapper;
1883        }
1884        if (ret != 0) {
1885                if (libtrace->format->start_input) {
1886                        ret = libtrace->format->start_input(libtrace);
1887                }
1888                if (libtrace->perpkt_thread_count > 1) {
1889                        libtrace->pread = trace_pread_packet_first_in_first_served;
1890                        /* Don't wait for a burst of packets if the format is
1891                         * live as this could block ring based formats and
1892                         * introduces delay. */
1893                        if (libtrace->format->info.live) {
1894                                libtrace->config.burst_size = 1;
1895                        }
1896                }
1897                else {
1898                        /* Use standard read_packet */
1899                        libtrace->pread = NULL;
1900                }
1901        }
1902
1903        if (ret < 0) {
1904                goto cleanup_none;
1905        }
1906
1907        /* --- Start all the threads we need --- */
1908        /* Disable signals because it is inherited by the threads we start */
1909        sigemptyset(&sig_block_all);
1910        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_block_all, &sig_before), == 0);
1911
1912        /* If we need a hasher thread start it
1913         * Special Case: If single threaded we don't need a hasher
1914         */
1915        if (trace_has_dedicated_hasher(libtrace)) {
1916                libtrace->hasher_thread.type = THREAD_EMPTY;
1917                ret = trace_start_thread(libtrace, &libtrace->hasher_thread,
1918                                   THREAD_HASHER, hasher_entry, -1,
1919                                   "hasher-thread");
1920                if (ret != 0)
1921                        goto cleanup_started;
1922                libtrace->pread = trace_pread_packet_hasher_thread;
1923        } else {
1924                libtrace->hasher_thread.type = THREAD_EMPTY;
1925        }
1926
1927        /* Start up our perpkt threads */
1928        libtrace->perpkt_threads = calloc(sizeof(libtrace_thread_t),
1929                                          libtrace->perpkt_thread_count);
1930        if (!libtrace->perpkt_threads) {
1931                trace_set_err(libtrace, errno, "trace_pstart "
1932                              "failed to allocate memory.");
1933                goto cleanup_threads;
1934        }
1935        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1936                snprintf(name, sizeof(name), "perpkt-%d", i);
1937                libtrace_zero_thread(&libtrace->perpkt_threads[i]);
1938                ret = trace_start_thread(libtrace, &libtrace->perpkt_threads[i],
1939                                   THREAD_PERPKT, perpkt_threads_entry, i,
1940                                   name);
1941                if (ret != 0)
1942                        goto cleanup_threads;
1943        }
1944
1945        /* Start the reporter thread */
1946        if (reporter_cbs) {
1947                if (libtrace->combiner.initialise)
1948                        libtrace->combiner.initialise(libtrace, &libtrace->combiner);
1949                ret = trace_start_thread(libtrace, &libtrace->reporter_thread,
1950                                   THREAD_REPORTER, reporter_entry, -1,
1951                                   "reporter_thread");
1952                if (ret != 0)
1953                        goto cleanup_threads;
1954        }
1955
1956        /* Start the keepalive thread */
1957        if (libtrace->config.tick_interval > 0) {
1958                ret = trace_start_thread(libtrace, &libtrace->keepalive_thread,
1959                                   THREAD_KEEPALIVE, keepalive_entry, -1,
1960                                   "keepalive_thread");
1961                if (ret != 0)
1962                        goto cleanup_threads;
1963        }
1964
1965        /* Init other data structures */
1966        libtrace->perpkt_thread_states[THREAD_RUNNING] = libtrace->perpkt_thread_count;
1967        ASSERT_RET(pthread_spin_init(&libtrace->first_packets.lock, 0), == 0);
1968        libtrace->first_packets.packets = calloc(libtrace->perpkt_thread_count,
1969                                                 sizeof(*libtrace->first_packets.packets));
1970        if (libtrace->first_packets.packets == NULL) {
1971                trace_set_err(libtrace, errno, "trace_pstart "
1972                              "failed to allocate memory.");
1973                goto cleanup_threads;
1974        }
1975
1976        if (libtrace_ocache_init(&libtrace->packet_freelist,
1977                             (void* (*)()) trace_create_packet,
1978                             (void (*)(void *))trace_destroy_packet,
1979                             libtrace->config.thread_cache_size,
1980                             libtrace->config.cache_size * 4,
1981                             libtrace->config.fixed_count) != 0) {
1982                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "trace_pstart "
1983                              "failed to allocate ocache.");
1984                goto cleanup_threads;
1985        }
1986
1987        /* Threads don't start */
1988        libtrace->started = true;
1989        libtrace->startcount ++;
1990        libtrace_change_state(libtrace, STATE_RUNNING, false);
1991
1992        ret = 0;
1993        goto success;
1994cleanup_threads:
1995        if (libtrace->first_packets.packets) {
1996                free(libtrace->first_packets.packets);
1997                libtrace->first_packets.packets = NULL;
1998        }
1999        libtrace_change_state(libtrace, STATE_ERROR, false);
2000        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2001        if (libtrace->hasher_thread.type == THREAD_HASHER) {
2002                pthread_join(libtrace->hasher_thread.tid, NULL);
2003                libtrace_zero_thread(&libtrace->hasher_thread);
2004        }
2005
2006        if (libtrace->perpkt_threads) {
2007                for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2008                        if (libtrace->perpkt_threads[i].type == THREAD_PERPKT) {
2009                                pthread_join(libtrace->perpkt_threads[i].tid, NULL);
2010                                libtrace_zero_thread(&libtrace->perpkt_threads[i]);
2011                        } else break;
2012                }
2013                free(libtrace->perpkt_threads);
2014                libtrace->perpkt_threads = NULL;
2015        }
2016
2017        if (libtrace->reporter_thread.type == THREAD_REPORTER) {
2018                pthread_join(libtrace->reporter_thread.tid, NULL);
2019                libtrace_zero_thread(&libtrace->reporter_thread);
2020        }
2021
2022        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
2023                pthread_join(libtrace->keepalive_thread.tid, NULL);
2024                libtrace_zero_thread(&libtrace->keepalive_thread);
2025        }
2026        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
2027        libtrace_change_state(libtrace, STATE_NEW, false);
2028        /*assert(libtrace->perpkt_thread_states[THREAD_RUNNING] == 0);*/
2029        if (!(libtrace->perpkt_thread_states[THREAD_RUNNING] == 0)) {
2030                trace_set_err(libtrace, TRACE_ERR_THREAD, "Expected 0 running threads in trace_pstart()");
2031                return -1;
2032        }
2033        libtrace->perpkt_thread_states[THREAD_FINISHED] = 0;
2034cleanup_started:
2035        if (libtrace->pread == trace_pread_packet_wrapper) {
2036                if (libtrace->format->ppause_input)
2037                        libtrace->format->ppause_input(libtrace);
2038        } else {
2039                if (libtrace->format->pause_input)
2040                        libtrace->format->pause_input(libtrace);
2041        }
2042        ret = -1;
2043success:
2044        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_before, NULL), == 0);
2045cleanup_none:
2046        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2047        return ret;
2048}
2049
2050DLLEXPORT int trace_set_starting_cb(libtrace_callback_set_t *cbset,
2051                fn_cb_starting handler) {
2052        cbset->message_starting = handler;
2053        return 0;
2054}
2055
2056DLLEXPORT int trace_set_pausing_cb(libtrace_callback_set_t *cbset,
2057                fn_cb_dataless handler) {
2058        cbset->message_pausing = handler;
2059        return 0;
2060}
2061
2062DLLEXPORT int trace_set_resuming_cb(libtrace_callback_set_t *cbset,
2063                fn_cb_dataless handler) {
2064        cbset->message_resuming = handler;
2065        return 0;
2066}
2067
2068DLLEXPORT int trace_set_stopping_cb(libtrace_callback_set_t *cbset,
2069                fn_cb_dataless handler) {
2070        cbset->message_stopping = handler;
2071        return 0;
2072}
2073
2074DLLEXPORT int trace_set_packet_cb(libtrace_callback_set_t *cbset,
2075                fn_cb_packet handler) {
2076        cbset->message_packet = handler;
2077        return 0;
2078}
2079
2080DLLEXPORT int trace_set_first_packet_cb(libtrace_callback_set_t *cbset,
2081                fn_cb_first_packet handler) {
2082        cbset->message_first_packet = handler;
2083        return 0;
2084}
2085
2086DLLEXPORT int trace_set_tick_count_cb(libtrace_callback_set_t *cbset,
2087                fn_cb_tick handler) {
2088        cbset->message_tick_count = handler;
2089        return 0;
2090}
2091
2092DLLEXPORT int trace_set_tick_interval_cb(libtrace_callback_set_t *cbset,
2093                fn_cb_tick handler) {
2094        cbset->message_tick_interval = handler;
2095        return 0;
2096}
2097
2098DLLEXPORT int trace_set_result_cb(libtrace_callback_set_t *cbset,
2099                fn_cb_result handler) {
2100        cbset->message_result = handler;
2101        return 0;
2102}
2103
2104DLLEXPORT int trace_set_user_message_cb(libtrace_callback_set_t *cbset,
2105                fn_cb_usermessage handler) {
2106        cbset->message_user = handler;
2107        return 0;
2108}
2109
2110/*
2111 * Pauses a trace, this should only be called by the main thread
2112 * 1. Set started = false
2113 * 2. All perpkt threads are paused waiting on a condition var
2114 * 3. Then call ppause on the underlying format if found
2115 * 4. The traces state is paused
2116 *
2117 * Once done you should be able to modify the trace setup and call pstart again
2118 * TODO add support to change the number of threads.
2119 */
2120DLLEXPORT int trace_ppause(libtrace_t *libtrace)
2121{
2122        libtrace_thread_t *t;
2123        int i;
2124        /*assert(libtrace);*/
2125        if (!libtrace) {
2126                fprintf(stderr, "NULL trace passed into trace_ppause()\n");
2127                return TRACE_ERR_NULL_TRACE;
2128        }
2129
2130        t = get_thread_table(libtrace);
2131        // Check state from within the lock if we are going to change it
2132        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
2133
2134        /* If we are already paused, just treat this as a NOOP */
2135        if (libtrace->state == STATE_PAUSED) {
2136                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2137                return 0;
2138        }
2139        if (!libtrace->started || libtrace->state != STATE_RUNNING) {
2140                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2141                trace_set_err(libtrace,TRACE_ERR_BAD_STATE, "You must call trace_start() before calling trace_ppause()");
2142                return -1;
2143        }
2144
2145        libtrace_change_state(libtrace, STATE_PAUSING, false);
2146        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2147
2148        // Special case handle the hasher thread case
2149        if (trace_has_dedicated_hasher(libtrace)) {
2150                if (libtrace->config.debug_state)
2151                        fprintf(stderr, "Hasher thread is running, asking it to pause ...");
2152                libtrace_message_t message = {0, {.uint64=0}, NULL};
2153                message.code = MESSAGE_DO_PAUSE;
2154                trace_message_thread(libtrace, &libtrace->hasher_thread, &message);
2155                // Wait for it to pause
2156                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
2157                while (libtrace->hasher_thread.state == THREAD_RUNNING) {
2158                        ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
2159                }
2160                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2161                if (libtrace->config.debug_state)
2162                        fprintf(stderr, " DONE\n");
2163        }
2164
2165        if (libtrace->config.debug_state)
2166                fprintf(stderr, "Asking perpkt threads to pause ...");
2167        // Stop threads, skip this one if it's a perpkt
2168        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2169                if (&libtrace->perpkt_threads[i] != t) {
2170                        libtrace_message_t message = {0, {.uint64=0}, NULL};
2171                        message.code = MESSAGE_DO_PAUSE;
2172                        ASSERT_RET(trace_message_thread(libtrace, &libtrace->perpkt_threads[i], &message), != -1);
2173                        if(trace_has_dedicated_hasher(libtrace)) {
2174                                // The hasher has stopped and other threads have messages waiting therefore
2175                                // If the queues are empty the other threads would have no data
2176                                // So send some message packets to simply ask the threads to check
2177                                // We are the only writer since hasher has paused
2178                                libtrace_packet_t *pkt;
2179                                libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &pkt, 1, 1);
2180                                pkt->error = READ_MESSAGE;
2181                                libtrace_ringbuffer_write(&libtrace->perpkt_threads[i].rbuffer, pkt);
2182                        }
2183                } else {
2184                        fprintf(stderr, "Mapper threads should not be used to pause a trace this could cause any number of problems!!\n");
2185                }
2186        }
2187
2188        if (t) {
2189                // A perpkt is doing the pausing, interesting, fake an extra thread paused
2190                // We rely on the user to *not* return before starting the trace again
2191                thread_change_state(libtrace, t, THREAD_PAUSED, true);
2192        }
2193
2194        // Wait for all threads to pause
2195        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
2196        while(libtrace->perpkt_thread_states[THREAD_RUNNING]) {
2197                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
2198        }
2199        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2200
2201        if (libtrace->config.debug_state)
2202                fprintf(stderr, " DONE\n");
2203
2204        // Deal with the reporter
2205        if (trace_has_reporter(libtrace)) {
2206                if (libtrace->config.debug_state)
2207                        fprintf(stderr, "Reporter thread is running, asking it to pause ...");
2208                if (pthread_equal(pthread_self(), libtrace->reporter_thread.tid)) {
2209                        libtrace->combiner.pause(libtrace, &libtrace->combiner);
2210                        thread_change_state(libtrace, &libtrace->reporter_thread, THREAD_PAUSED, true);
2211               
2212                } else {
2213                        libtrace_message_t message = {0, {.uint64=0}, NULL};
2214                        message.code = MESSAGE_DO_PAUSE;
2215                        trace_message_thread(libtrace, &libtrace->reporter_thread, &message);
2216                        // Wait for it to pause
2217                        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
2218                        while (libtrace->reporter_thread.state == THREAD_RUNNING) {
2219                                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
2220                        }
2221                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2222                }
2223                if (libtrace->config.debug_state)
2224                        fprintf(stderr, " DONE\n");
2225        }
2226
2227        /* Cache values before we pause */
2228        if (libtrace->stats == NULL)
2229                libtrace->stats = trace_create_statistics();
2230        // Save the statistics against the trace
2231        trace_get_statistics(libtrace, NULL);
2232        if (trace_is_parallel(libtrace)) {
2233                libtrace->started = false;
2234                if (libtrace->format->ppause_input)
2235                        libtrace->format->ppause_input(libtrace);
2236                // TODO What happens if we don't have pause input??
2237        } else {
2238                int err;
2239                err = trace_pause(libtrace);
2240                // We should handle this a bit better
2241                if (err)
2242                        return err;
2243        }
2244
2245        // Only set as paused after the pause has been called on the trace
2246        libtrace_change_state(libtrace, STATE_PAUSED, true);
2247        return 0;
2248}
2249
2250/**
2251 * Stop trace finish prematurely as though it meet an EOF
2252 * This should only be called by the main thread
2253 * 1. Calls ppause
2254 * 2. Sends a message asking for threads to finish
2255 * 3. Releases threads which will pause
2256 */
2257DLLEXPORT int trace_pstop(libtrace_t *libtrace)
2258{
2259        int i, err;
2260        libtrace_message_t message = {0, {.uint64=0}, NULL};
2261        /*assert(libtrace);*/
2262        if (!libtrace) {
2263                fprintf(stderr, "NULL trace passed into trace_pstop()\n");
2264                return TRACE_ERR_NULL_TRACE;
2265        }
2266
2267        // Ensure all threads have paused and the underlying trace format has
2268        // been closed and all packets associated are cleaned up
2269        // Pause will do any state checks for us
2270        err = trace_ppause(libtrace);
2271        if (err)
2272                return err;
2273
2274        // Now send a message asking the threads to stop
2275        // This will be retrieved before trying to read another packet
2276
2277        message.code = MESSAGE_DO_STOP;
2278        trace_message_perpkts(libtrace, &message);
2279        if (trace_has_dedicated_hasher(libtrace))
2280                trace_message_thread(libtrace, &libtrace->hasher_thread, &message);
2281
2282        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2283                trace_message_thread(libtrace, &libtrace->perpkt_threads[i], &message);
2284        }
2285
2286        /* Now release the threads and let them stop - when the threads finish
2287         * the state will be set to finished */
2288        libtrace_change_state(libtrace, STATE_FINISHING, true);
2289        return 0;
2290}
2291
2292DLLEXPORT int trace_set_hasher(libtrace_t *trace, enum hasher_types type, fn_hasher hasher, void *data) {
2293        int ret = -1;
2294        if ((type == HASHER_CUSTOM && !hasher) || (type == HASHER_BALANCE && hasher)) {
2295                return -1;
2296        }
2297
2298        // Save the requirements
2299        trace->hasher_type = type;
2300        if (hasher) {
2301                if (trace->hasher_owner == HASH_OWNED_LIBTRACE) {
2302                        if (trace->hasher_data) {
2303                                free(trace->hasher_data);
2304                        }
2305                }
2306                trace->hasher = hasher;
2307                trace->hasher_data = data;
2308                trace->hasher_owner = HASH_OWNED_EXTERNAL;
2309        } else {
2310                trace->hasher = NULL;
2311                trace->hasher_data = NULL;
2312                trace->hasher_owner = HASH_OWNED_LIBTRACE;
2313        }
2314
2315        // Try push this to hardware - NOTE hardware could do custom if
2316        // there is a more efficient way to apply it, in this case
2317        // it will simply grab the function out of libtrace_t
2318        if (trace_supports_parallel(trace) && trace->format->config_input)
2319                ret = trace->format->config_input(trace, TRACE_OPTION_HASHER, &type);
2320
2321        if (ret == -1) {
2322                /* We have to deal with this ourself */
2323                if (!hasher) {
2324                        switch (type)
2325                        {
2326                                case HASHER_CUSTOM:
2327                                case HASHER_BALANCE:
2328                                        return 0;
2329                                case HASHER_BIDIRECTIONAL:
2330                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
2331                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
2332                                        toeplitz_init_config(trace->hasher_data, 1);
2333                                        return 0;
2334                                case HASHER_UNIDIRECTIONAL:
2335                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
2336                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
2337                                        toeplitz_init_config(trace->hasher_data, 0);
2338                                        return 0;
2339                        }
2340                        return -1;
2341                }
2342        } else {
2343                /* If the hasher is hardware we zero out the hasher and hasher
2344                 * data fields - only if we need a hasher do we do this */
2345                trace->hasher = NULL;
2346                trace->hasher_data = NULL;
2347        }
2348
2349        return 0;
2350}
2351
2352// Waits for all threads to finish
2353DLLEXPORT void trace_join(libtrace_t *libtrace) {
2354        int i;
2355
2356        /* Firstly wait for the perpkt threads to finish, since these are
2357         * user controlled */
2358        for (i=0; i< libtrace->perpkt_thread_count; i++) {
2359                ASSERT_RET(pthread_join(libtrace->perpkt_threads[i].tid, NULL), == 0);
2360                // So we must do our best effort to empty the queue - so
2361                // the producer (or any other threads) don't block.
2362                libtrace_packet_t * packet;
2363                /*assert(libtrace->perpkt_threads[i].state == THREAD_FINISHED);*/
2364                if (!(libtrace->perpkt_threads[i].state == THREAD_FINISHED)) {
2365                        trace_set_err(libtrace, TRACE_ERR_THREAD_STATE,
2366                                "Expected processing threads state THREAD_FINISHED in trace_join()");
2367                        return;
2368                }
2369                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
2370                        if (packet) // This could be NULL iff the perpkt finishes early
2371                                trace_destroy_packet(packet);
2372        }
2373
2374        /* Now the hasher */
2375        if (trace_has_dedicated_hasher(libtrace)) {
2376                pthread_join(libtrace->hasher_thread.tid, NULL);
2377                /*assert(libtrace->hasher_thread.state == THREAD_FINISHED);*/
2378                if (!(libtrace->hasher_thread.state == THREAD_FINISHED)) {
2379                        trace_set_err(libtrace, TRACE_ERR_THREAD_STATE,
2380                                "Expected hasher thread state THREAD_FINISHED in trace_join()");
2381                        return;
2382                }
2383        }
2384
2385        // Now that everything is finished nothing can be touching our
2386        // buffers so clean them up
2387        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2388                // Its possible 1 packet got added by the reporter (or 1 per any other thread) since we cleaned up
2389                // if they lost timeslice before-during a write
2390                libtrace_packet_t * packet;
2391                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
2392                        trace_destroy_packet(packet);
2393                if (trace_has_dedicated_hasher(libtrace)) {
2394                        /*assert(libtrace_ringbuffer_is_empty(&libtrace->perpkt_threads[i].rbuffer));*/
2395                        if (!(libtrace_ringbuffer_is_empty(&libtrace->perpkt_threads[i].rbuffer))) {
2396                                trace_set_err(libtrace, TRACE_ERR_THREAD,
2397                                        "Expected processing threads ring buffers to be empty in trace_join()");
2398                                return;
2399                        }
2400                        libtrace_ringbuffer_destroy(&libtrace->perpkt_threads[i].rbuffer);
2401                }
2402                // Cannot destroy vector yet, this happens with trace_destroy
2403        }
2404
2405        if (trace_has_reporter(libtrace)) {
2406                pthread_join(libtrace->reporter_thread.tid, NULL);
2407                /*assert(libtrace->reporter_thread.state == THREAD_FINISHED);*/
2408                if (!(libtrace->reporter_thread.state == THREAD_FINISHED)) {
2409                        trace_set_err(libtrace, TRACE_ERR_THREAD_STATE,
2410                                "Expected reporting thread state to be THREAD_FINISHED in trace_join()");
2411                        return;
2412                }
2413        }
2414
2415        // Wait for the tick (keepalive) thread if it has been started
2416        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
2417                libtrace_message_t msg = {0, {.uint64=0}, NULL};
2418                msg.code = MESSAGE_DO_STOP;
2419                trace_message_thread(libtrace, &libtrace->keepalive_thread, &msg);
2420                pthread_join(libtrace->keepalive_thread.tid, NULL);
2421        }
2422
2423        libtrace_change_state(libtrace, STATE_JOINED, true);
2424        print_memory_stats();
2425}
2426
2427DLLEXPORT int libtrace_thread_get_message_count(libtrace_t * libtrace,
2428                                                libtrace_thread_t *t)
2429{
2430        int ret;
2431        if (t == NULL)
2432                t = get_thread_descriptor(libtrace);
2433        if (t == NULL)
2434                return -1;
2435        ret = libtrace_message_queue_count(&t->messages);
2436        return ret < 0 ? 0 : ret;
2437}
2438
2439DLLEXPORT int libtrace_thread_get_message(libtrace_t * libtrace,
2440                                          libtrace_thread_t *t,
2441                                          libtrace_message_t * message)
2442{
2443        int ret;
2444        if (t == NULL)
2445                t = get_thread_descriptor(libtrace);
2446        if (t == NULL)
2447                return -1;
2448        ret = libtrace_message_queue_get(&t->messages, message);
2449        return ret < 0 ? 0 : ret;
2450}
2451
2452DLLEXPORT int libtrace_thread_try_get_message(libtrace_t * libtrace,
2453                                              libtrace_thread_t *t,
2454                                              libtrace_message_t * message)
2455{
2456        if (t == NULL)
2457                t = get_thread_descriptor(libtrace);
2458        if (t == NULL)
2459                return -1;
2460        if (libtrace_message_queue_try_get(&t->messages, message) != LIBTRACE_MQ_FAILED)
2461                return 0;
2462        else
2463                return -1;
2464}
2465
2466DLLEXPORT int trace_message_thread(libtrace_t * libtrace, libtrace_thread_t *t, libtrace_message_t * message)
2467{
2468        int ret;
2469        if (!message->sender)
2470                message->sender = get_thread_descriptor(libtrace);
2471
2472        ret = libtrace_message_queue_put(&t->messages, message);
2473        return ret < 0 ? 0 : ret;
2474}
2475
2476DLLEXPORT int trace_message_reporter(libtrace_t * libtrace, libtrace_message_t * message)
2477{
2478        if (!trace_has_reporter(libtrace) ||
2479            !(libtrace->reporter_thread.state == THREAD_RUNNING
2480              || libtrace->reporter_thread.state == THREAD_PAUSED))
2481                return -1;
2482
2483        return trace_message_thread(libtrace, &libtrace->reporter_thread, message);
2484}
2485
2486DLLEXPORT int trace_post_reporter(libtrace_t *libtrace)
2487{
2488        libtrace_message_t message = {0, {.uint64=0}, NULL};
2489        message.code = MESSAGE_POST_REPORTER;
2490        return trace_message_reporter(libtrace, (void *) &message);
2491}
2492
2493DLLEXPORT int trace_message_perpkts(libtrace_t * libtrace, libtrace_message_t * message)
2494{
2495        int i;
2496        int missed = 0;
2497        if (message->sender == NULL)
2498                message->sender = get_thread_descriptor(libtrace);
2499        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2500                if (libtrace->perpkt_threads[i].state == THREAD_RUNNING ||
2501                    libtrace->perpkt_threads[i].state == THREAD_PAUSED) {
2502                        libtrace_message_queue_put(&libtrace->perpkt_threads[i].messages, message);
2503                } else {
2504                        missed += 1;
2505                }
2506        }
2507        return -missed;
2508}
2509
2510/**
2511 * Publishes a result to the reduce queue
2512 * Should only be called by a perpkt thread, i.e. from a perpkt handler
2513 */
2514DLLEXPORT void trace_publish_result(libtrace_t *libtrace, libtrace_thread_t *t, uint64_t key, libtrace_generic_t value, int type) {
2515        libtrace_result_t res;
2516        res.type = type;
2517        res.key = key;
2518        res.value = value;
2519        /*assert(libtrace->combiner.publish);*/
2520        if (!libtrace->combiner.publish) {
2521                fprintf(stderr, "Unable to publish result in trace_publish_result()\n");
2522                return;
2523        }
2524        libtrace->combiner.publish(libtrace, t->perpkt_num, &libtrace->combiner, &res);
2525        return;
2526}
2527
2528DLLEXPORT void trace_set_combiner(libtrace_t *trace, const libtrace_combine_t *combiner, libtrace_generic_t config){
2529        if (combiner) {
2530                trace->combiner = *combiner;
2531                trace->combiner.configuration = config;
2532        } else {
2533                // No combiner, so don't try use it
2534                memset(&trace->combiner, 0, sizeof(trace->combiner));
2535        }
2536}
2537
2538DLLEXPORT uint64_t trace_packet_get_order(libtrace_packet_t * packet) {
2539        return packet->order;
2540}
2541
2542DLLEXPORT uint64_t trace_packet_get_hash(libtrace_packet_t * packet) {
2543        return packet->hash;
2544}
2545
2546DLLEXPORT void trace_packet_set_order(libtrace_packet_t * packet, uint64_t order) {
2547        packet->order = order;
2548}
2549
2550DLLEXPORT void trace_packet_set_hash(libtrace_packet_t * packet, uint64_t hash) {
2551        packet->hash = hash;
2552}
2553
2554DLLEXPORT bool trace_has_finished(libtrace_t * libtrace) {
2555        return libtrace->state == STATE_FINISHED || libtrace->state == STATE_JOINED;
2556}
2557
2558/**
2559 * @return True if the trace is not running such that it can be configured
2560 */
2561static inline bool trace_is_configurable(libtrace_t *trace) {
2562        return trace->state == STATE_NEW ||
2563                        trace->state == STATE_PAUSED;
2564}
2565
2566DLLEXPORT int trace_set_perpkt_threads(libtrace_t *trace, int nb) {
2567        // Only supported on new traces not paused traces
2568        if (trace->state != STATE_NEW) return -1;
2569
2570        /* TODO consider allowing an offset from the total number of cores i.e.
2571         * -1 reserve 1 core */
2572        if (nb >= 0) {
2573                trace->config.perpkt_threads = nb;
2574                return 0;
2575        } else {
2576                return -1;
2577        }
2578}
2579
2580DLLEXPORT int trace_set_tick_interval(libtrace_t *trace, size_t millisec) {
2581        if (!trace_is_configurable(trace)) return -1;
2582
2583        trace->config.tick_interval = millisec;
2584        return 0;
2585}
2586
2587DLLEXPORT int trace_set_tick_count(libtrace_t *trace, size_t count) {
2588        if (!trace_is_configurable(trace)) return -1;
2589
2590        trace->config.tick_count = count;
2591        return 0;
2592}
2593
2594DLLEXPORT int trace_set_tracetime(libtrace_t *trace, bool tracetime) {
2595        if (!trace_is_configurable(trace)) return -1;
2596
2597        trace->tracetime = tracetime;
2598        return 0;
2599}
2600
2601DLLEXPORT int trace_set_cache_size(libtrace_t *trace, size_t size) {
2602        if (!trace_is_configurable(trace)) return -1;
2603
2604        trace->config.cache_size = size;
2605        return 0;
2606}
2607
2608DLLEXPORT int trace_set_thread_cache_size(libtrace_t *trace, size_t size) {
2609        if (!trace_is_configurable(trace)) return -1;
2610
2611        trace->config.thread_cache_size = size;
2612        return 0;
2613}
2614
2615DLLEXPORT int trace_set_fixed_count(libtrace_t *trace, bool fixed) {
2616        if (!trace_is_configurable(trace)) return -1;
2617
2618        trace->config.fixed_count = fixed;
2619        return 0;
2620}
2621
2622DLLEXPORT int trace_set_burst_size(libtrace_t *trace, size_t size) {
2623        if (!trace_is_configurable(trace)) return -1;
2624
2625        trace->config.burst_size = size;
2626        return 0;
2627}
2628
2629DLLEXPORT int trace_set_hasher_queue_size(libtrace_t *trace, size_t size) {
2630        if (!trace_is_configurable(trace)) return -1;
2631
2632        trace->config.hasher_queue_size = size;
2633        return 0;
2634}
2635
2636DLLEXPORT int trace_set_hasher_polling(libtrace_t *trace, bool polling) {
2637        if (!trace_is_configurable(trace)) return -1;
2638
2639        trace->config.hasher_polling = polling;
2640        return 0;
2641}
2642
2643DLLEXPORT int trace_set_reporter_polling(libtrace_t *trace, bool polling) {
2644        if (!trace_is_configurable(trace)) return -1;
2645
2646        trace->config.reporter_polling = polling;
2647        return 0;
2648}
2649
2650DLLEXPORT int trace_set_reporter_thold(libtrace_t *trace, size_t thold) {
2651        if (!trace_is_configurable(trace)) return -1;
2652
2653        trace->config.reporter_thold = thold;
2654        return 0;
2655}
2656
2657DLLEXPORT int trace_set_debug_state(libtrace_t *trace, bool debug_state) {
2658        if (!trace_is_configurable(trace)) return -1;
2659
2660        trace->config.debug_state = debug_state;
2661        return 0;
2662}
2663
2664static bool config_bool_parse(char *value, size_t nvalue) {
2665        if (strncmp(value, "true", nvalue) == 0)
2666                return true;
2667        else if (strncmp(value, "false", nvalue) == 0)
2668                return false;
2669        else
2670                return strtoll(value, NULL, 10) != 0;
2671}
2672
2673/* Note update documentation on trace_set_configuration */
2674static void config_string(struct user_configuration *uc, char *key, size_t nkey, char *value, size_t nvalue) {
2675        /*assert(key);*/
2676        if (!key) {
2677                fprintf(stderr, "NULL key passed to config_string()\n");
2678                return;
2679        }
2680        /*assert(value);*/
2681        if (!value) {
2682                fprintf(stderr, "NULL value passed to config_string()\n");
2683                return;
2684        }
2685        /*assert(uc);*/
2686        if (!uc) {
2687                fprintf(stderr, "NULL uc (user_configuration) passed to config_string()\n");
2688                return;
2689        }
2690        if (strncmp(key, "cache_size", nkey) == 0
2691            || strncmp(key, "cs", nkey) == 0) {
2692                uc->cache_size = strtoll(value, NULL, 10);
2693        } else if (strncmp(key, "thread_cache_size", nkey) == 0
2694                   || strncmp(key, "tcs", nkey) == 0) {
2695                uc->thread_cache_size = strtoll(value, NULL, 10);
2696        } else if (strncmp(key, "fixed_count", nkey) == 0
2697                   || strncmp(key, "fc", nkey) == 0) {
2698                uc->fixed_count = config_bool_parse(value, nvalue);
2699        } else if (strncmp(key, "burst_size", nkey) == 0
2700                   || strncmp(key, "bs", nkey) == 0) {
2701                uc->burst_size = strtoll(value, NULL, 10);
2702        } else if (strncmp(key, "tick_interval", nkey) == 0
2703                   || strncmp(key, "ti", nkey) == 0) {
2704                uc->tick_interval = strtoll(value, NULL, 10);
2705        } else if (strncmp(key, "tick_count", nkey) == 0
2706                   || strncmp(key, "tc", nkey) == 0) {
2707                uc->tick_count = strtoll(value, NULL, 10);
2708        } else if (strncmp(key, "perpkt_threads", nkey) == 0
2709                   || strncmp(key, "pt", nkey) == 0) {
2710                uc->perpkt_threads = strtoll(value, NULL, 10);
2711        } else if (strncmp(key, "hasher_queue_size", nkey) == 0
2712                   || strncmp(key, "hqs", nkey) == 0) {
2713                uc->hasher_queue_size = strtoll(value, NULL, 10);
2714        } else if (strncmp(key, "hasher_polling", nkey) == 0
2715                   || strncmp(key, "hp", nkey) == 0) {
2716                uc->hasher_polling = config_bool_parse(value, nvalue);
2717        } else if (strncmp(key, "reporter_polling", nkey) == 0
2718                   || strncmp(key, "rp", nkey) == 0) {
2719                uc->reporter_polling = config_bool_parse(value, nvalue);
2720        } else if (strncmp(key, "reporter_thold", nkey) == 0
2721                   || strncmp(key, "rt", nkey) == 0) {
2722                uc->reporter_thold = strtoll(value, NULL, 10);
2723        } else if (strncmp(key, "debug_state", nkey) == 0
2724                   || strncmp(key, "ds", nkey) == 0) {
2725                uc->debug_state = config_bool_parse(value, nvalue);
2726        } else {
2727                fprintf(stderr, "No matching option %s(=%s), ignoring\n", key, value);
2728        }
2729}
2730
2731DLLEXPORT int trace_set_configuration(libtrace_t *trace, const char *str) {
2732        char *pch;
2733        char key[100];
2734        char value[100];
2735        char *dup;
2736        /*assert(trace);*/
2737        if (!trace) {
2738                fprintf(stderr, "NULL trace passed into trace_set_configuration()\n");
2739                return TRACE_ERR_NULL_TRACE;
2740        }
2741        /*assert(str);*/
2742        if (!str) {
2743                trace_set_err(trace, TRACE_ERR_CONFIG, "NULL configuration string passed to trace_set_configuration()");
2744                return -1;
2745        }
2746
2747        if (!trace_is_configurable(trace)) return -1;
2748
2749        dup = strdup(str);
2750        pch = strtok (dup," ,.-");
2751        while (pch != NULL)
2752        {
2753                if (sscanf(pch, "%99[^=]=%99s", key, value) == 2) {
2754                        config_string(&trace->config, key, sizeof(key), value, sizeof(value));
2755                } else {
2756                        fprintf(stderr, "Error parsing option %s\n", pch);
2757                }
2758                pch = strtok (NULL," ,.-");
2759        }
2760        free(dup);
2761
2762        return 0;
2763}
2764
2765DLLEXPORT int trace_set_configuration_file(libtrace_t *trace, FILE *file) {
2766        char line[1024];
2767        if (!trace_is_configurable(trace)) return -1;
2768
2769        while (fgets(line, sizeof(line), file) != NULL)
2770        {
2771                trace_set_configuration(trace, line);
2772        }
2773
2774        if(ferror(file))
2775                return -1;
2776        else
2777                return 0;
2778}
2779
2780DLLEXPORT void trace_free_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
2781        /*assert(packet);*/
2782        if (!packet) {
2783                trace_set_err(libtrace, TRACE_ERR_NULL_PACKET,
2784                        "NULL packet passed to trace_free_packet()");
2785                return;
2786        }
2787        /* Always release any resources this might be holding */
2788        trace_fin_packet(packet);
2789        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packet, 1, 1);
2790}
2791
2792DLLEXPORT void trace_increment_packet_refcount(libtrace_packet_t *packet) {
2793        pthread_mutex_lock(&(packet->ref_lock));
2794        if (packet->refcount < 0) {
2795                packet->refcount = 1;
2796        } else {
2797                packet->refcount ++;
2798        }
2799        pthread_mutex_unlock(&(packet->ref_lock));
2800}
2801
2802DLLEXPORT void trace_decrement_packet_refcount(libtrace_packet_t *packet) {
2803        pthread_mutex_lock(&(packet->ref_lock));
2804        packet->refcount --;
2805
2806        if (packet->refcount <= 0) {
2807                trace_free_packet(packet->trace, packet);
2808        }
2809        pthread_mutex_unlock(&(packet->ref_lock));
2810}
2811
2812
2813DLLEXPORT libtrace_info_t *trace_get_information(libtrace_t * libtrace) {
2814        if (libtrace->format)
2815                return &libtrace->format->info;
2816        else
2817                return NULL;
2818}
Note: See TracBrowser for help on using the repository browser.