source: lib/trace_parallel.c @ 62b2d97

develop
Last change on this file since 62b2d97 was 62b2d97, checked in by Jacob Van Walraven <jcv9@…>, 22 months ago

Add callback for meta packets

  • Property mode set to 100644
File size: 89.7 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27#define _GNU_SOURCE
28#include "common.h"
29#include "config.h"
30#include <assert.h>
31#include <errno.h>
32#include <fcntl.h>
33#include <stdio.h>
34#include <stdlib.h>
35#include <string.h>
36#include <sys/stat.h>
37#include <sys/types.h>
38#ifndef WIN32
39#include <sys/socket.h>
40#endif
41#include <stdarg.h>
42#include <sys/param.h>
43
44#ifdef HAVE_LIMITS_H
45#  include <limits.h>
46#endif
47
48#ifdef HAVE_SYS_LIMITS_H
49#  include <sys/limits.h>
50#endif
51
52#ifdef HAVE_NET_IF_ARP_H
53#  include <net/if_arp.h>
54#endif
55
56#ifdef HAVE_NET_IF_H
57#  include <net/if.h>
58#endif
59
60#ifdef HAVE_NETINET_IN_H
61#  include <netinet/in.h>
62#endif
63
64#ifdef HAVE_NET_ETHERNET_H
65#  include <net/ethernet.h>
66#endif
67
68#ifdef HAVE_NETINET_IF_ETHER_H
69#  include <netinet/if_ether.h>
70#endif
71
72#include <time.h>
73#ifdef WIN32
74#include <sys/timeb.h>
75#endif
76
77#include "libtrace.h"
78#include "libtrace_parallel.h"
79
80#ifdef HAVE_NET_BPF_H
81#  include <net/bpf.h>
82#else
83#  ifdef HAVE_PCAP_BPF_H
84#    include <pcap-bpf.h>
85#  endif
86#endif
87
88
89#include "libtrace_int.h"
90#include "format_helper.h"
91#include "rt_protocol.h"
92#include "hash_toeplitz.h"
93
94#include <pthread.h>
95#include <signal.h>
96#include <unistd.h>
97#include <ctype.h>
98
99static inline int delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t);
100extern int libtrace_parallel;
101
102struct mem_stats {
103        struct memfail {
104           uint64_t cache_hit;
105           uint64_t ring_hit;
106           uint64_t miss;
107           uint64_t recycled;
108        } readbulk, read, write, writebulk;
109};
110
111
112#ifdef ENABLE_MEM_STATS
113// Grrr gcc wants this spelt out
114__thread struct mem_stats mem_hits = {{0},{0},{0},{0}};
115
116
117static void print_memory_stats() {
118        uint64_t total;
119#if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__)
120        char t_name[50];
121        pthread_getname_np(pthread_self(), t_name, sizeof(t_name));
122
123        fprintf(stderr, "Thread ID#%d - %s\n", (int) pthread_self(), t_name);
124#else
125        fprintf(stderr, "Thread ID#%d\n", (int) pthread_self());
126#endif
127
128        total = mem_hits.read.cache_hit + mem_hits.read.ring_hit + mem_hits.read.miss;
129        if (total) {
130                fprintf(stderr, "\tRead:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
131                                mem_hits.read.cache_hit, mem_hits.read.ring_hit, mem_hits.read.miss, mem_hits.read.recycled);
132                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
133                                total, (double) mem_hits.read.miss / (double) total * 100.0);
134        }
135
136        total = mem_hits.readbulk.cache_hit + mem_hits.readbulk.ring_hit + mem_hits.readbulk.miss;
137        if (total) {
138                fprintf(stderr, "\tReadbulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
139                                mem_hits.readbulk.cache_hit, mem_hits.readbulk.ring_hit, mem_hits.readbulk.miss, mem_hits.readbulk.recycled);
140
141
142                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
143                                total, (double) mem_hits.readbulk.miss / (double) total * 100.0);
144        }
145
146        total = mem_hits.write.cache_hit + mem_hits.write.ring_hit + mem_hits.write.miss;
147        if (total) {
148                fprintf(stderr, "\tWrite:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
149                                mem_hits.write.cache_hit, mem_hits.write.ring_hit, mem_hits.write.miss, mem_hits.write.recycled);
150
151                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
152                                total, (double) mem_hits.write.miss / (double) total * 100.0);
153        }
154
155        total = mem_hits.writebulk.cache_hit + mem_hits.writebulk.ring_hit + mem_hits.writebulk.miss;
156        if (total) {
157                fprintf(stderr, "\tWritebulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
158                                mem_hits.writebulk.cache_hit, mem_hits.writebulk.ring_hit, mem_hits.writebulk.miss, mem_hits.writebulk.recycled);
159
160                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
161                                total, (double) mem_hits.writebulk.miss / (double) total * 100.0);
162        }
163}
164#else
165static void print_memory_stats() {}
166#endif
167
168static const libtrace_generic_t gen_zero = {0};
169
170/* This should optimise away the switch to nothing in the explict cases */
171inline void send_message(libtrace_t *trace, libtrace_thread_t *thread,
172                const enum libtrace_messages type,
173                libtrace_generic_t data, libtrace_thread_t *sender) {
174
175        fn_cb_dataless fn = NULL;
176        enum libtrace_messages switchtype;
177        libtrace_callback_set_t *cbs = NULL;
178
179        if (thread == &trace->reporter_thread) {
180                cbs = trace->reporter_cbs;
181        } else {
182                cbs = trace->perpkt_cbs;
183        }
184
185        if (cbs == NULL)
186                return;
187
188        if (type >= MESSAGE_USER)
189                switchtype = MESSAGE_USER;
190        else
191                switchtype = (enum libtrace_messages) type;
192
193        switch (switchtype) {
194        case MESSAGE_STARTING:
195                if (cbs->message_starting)
196                        thread->user_data = (*cbs->message_starting)(trace,
197                                        thread, trace->global_blob);
198                return;
199        case MESSAGE_FIRST_PACKET:
200                if (cbs->message_first_packet)
201                                (*cbs->message_first_packet)(trace, thread,
202                                trace->global_blob, thread->user_data,
203                                sender);
204                return;
205        case MESSAGE_TICK_COUNT:
206                if (cbs->message_tick_count)
207                        (*cbs->message_tick_count)(trace, thread,
208                                        trace->global_blob, thread->user_data,
209                                        data.uint64);
210                return;
211        case MESSAGE_TICK_INTERVAL:
212                if (cbs->message_tick_interval)
213                        (*cbs->message_tick_interval)(trace, thread,
214                                        trace->global_blob, thread->user_data,
215                                        data.uint64);
216                return;
217        case MESSAGE_STOPPING:
218                fn = cbs->message_stopping;
219                break;
220        case MESSAGE_RESUMING:
221                fn = cbs->message_resuming;
222                break;
223        case MESSAGE_PAUSING:
224                fn = cbs->message_pausing;
225                break;
226        case MESSAGE_USER:
227                if (cbs->message_user)
228                        (*cbs->message_user)(trace, thread, trace->global_blob,
229                                        thread->user_data, type, data, sender);
230                return;
231        case MESSAGE_RESULT:
232                if (cbs->message_result)
233                        (*cbs->message_result)(trace, thread,
234                                        trace->global_blob, thread->user_data,
235                                        data.res);
236                return;
237
238        /* These should be unused */
239        case MESSAGE_DO_PAUSE:
240        case MESSAGE_DO_STOP:
241        case MESSAGE_POST_REPORTER:
242        case MESSAGE_PACKET:
243                return;
244        case MESSAGE_META_PACKET:
245                return;
246        }
247        if (fn)
248                (*fn)(trace, thread, trace->global_blob, thread->user_data);
249}
250
251DLLEXPORT void trace_destroy_callback_set(libtrace_callback_set_t *cbset) {
252        free(cbset);
253}
254
255DLLEXPORT libtrace_callback_set_t *trace_create_callback_set() {
256        libtrace_callback_set_t *cbset;
257
258        cbset = (libtrace_callback_set_t *)malloc(sizeof(libtrace_callback_set_t));
259        memset(cbset, 0, sizeof(libtrace_callback_set_t));
260        return cbset;
261}
262
263/*
264 * This can be used once the hasher thread has been started and internally after
265 * verify_configuration.
266 */
267DLLEXPORT bool trace_has_dedicated_hasher(libtrace_t * libtrace)
268{
269        return libtrace->hasher_thread.type == THREAD_HASHER;
270}
271
272DLLEXPORT bool trace_has_reporter(libtrace_t * libtrace)
273{
274        if (!(libtrace->state != STATE_NEW)) {
275                trace_set_err(libtrace, TRACE_ERR_BAD_STATE, "Cannot check reporter for the current state in trace_has_reporter()");
276                return false;
277        }
278        return libtrace->reporter_thread.type == THREAD_REPORTER && libtrace->reporter_cbs;
279}
280
281/**
282 * When running the number of perpkt threads in use.
283 * TODO what if the trace is not running yet, or has finished??
284 *
285 * @brief libtrace_perpkt_thread_nb
286 * @param t The trace
287 * @return
288 */
289DLLEXPORT int trace_get_perpkt_threads(libtrace_t * t) {
290        return t->perpkt_thread_count;
291}
292
293DLLEXPORT int trace_get_perpkt_thread_id(libtrace_thread_t *thread) {
294        return thread->perpkt_num;
295}
296
297/**
298 * Changes the overall traces state and signals the condition.
299 *
300 * @param trace A pointer to the trace
301 * @param new_state The new state of the trace
302 * @param need_lock Set to true if libtrace_lock is not held, otherwise
303 *        false in the case the lock is currently held by this thread.
304 */
305static inline void libtrace_change_state(libtrace_t *trace,
306        const enum trace_state new_state, const bool need_lock)
307{
308        UNUSED enum trace_state prev_state;
309        if (need_lock)
310                pthread_mutex_lock(&trace->libtrace_lock);
311        prev_state = trace->state;
312        trace->state = new_state;
313
314        if (trace->config.debug_state)
315                fprintf(stderr, "Trace(%s) state changed from %s to %s\n",
316                        trace->uridata, get_trace_state_name(prev_state),
317                        get_trace_state_name(trace->state));
318
319        pthread_cond_broadcast(&trace->perpkt_cond);
320        if (need_lock)
321                pthread_mutex_unlock(&trace->libtrace_lock);
322}
323
324/**
325 * Changes a thread's state and broadcasts the condition variable. This
326 * should always be done when the lock is held.
327 *
328 * Additionally for perpkt threads the state counts are updated.
329 *
330 * @param trace A pointer to the trace
331 * @param t A pointer to the thread to modify
332 * @param new_state The new state of the thread
333 * @param need_lock Set to true if libtrace_lock is not held, otherwise
334 *        false in the case the lock is currently held by this thread.
335 */
336static inline void thread_change_state(libtrace_t *trace, libtrace_thread_t *t,
337        const enum thread_states new_state, const bool need_lock)
338{
339        enum thread_states prev_state;
340        if (need_lock)
341                pthread_mutex_lock(&trace->libtrace_lock);
342        prev_state = t->state;
343        t->state = new_state;
344        if (t->type == THREAD_PERPKT) {
345                --trace->perpkt_thread_states[prev_state];
346                ++trace->perpkt_thread_states[new_state];
347        }
348
349        if (trace->config.debug_state)
350                fprintf(stderr, "Thread %d state changed from %d to %d\n",
351                        (int) t->tid, prev_state, t->state);
352
353        if (trace->perpkt_thread_states[THREAD_FINISHED] == trace->perpkt_thread_count) {
354                /* Make sure we save our final stats in case someone wants
355                 * them at the end of their program.
356                 */
357
358                trace_get_statistics(trace, NULL);
359                libtrace_change_state(trace, STATE_FINISHED, false);
360        }
361
362        pthread_cond_broadcast(&trace->perpkt_cond);
363        if (need_lock)
364                pthread_mutex_unlock(&trace->libtrace_lock);
365}
366
367/**
368 * This is valid once a trace is initialised
369 *
370 * @return True if the format supports parallel threads.
371 */
372static inline bool trace_supports_parallel(libtrace_t *trace)
373{
374        if (!trace) {
375                fprintf(stderr, "NULL trace passed into trace_supports_parallel()\n");
376                return false;
377        }
378        if (!trace->format) {
379                trace_set_err(trace, TRACE_ERR_BAD_FORMAT,
380                        "NULL capture format associated with trace in trace_supports_parallel()");
381                return false;
382        }
383        if (trace->format->pstart_input)
384                return true;
385        else
386                return false;
387}
388
389void libtrace_zero_thread(libtrace_thread_t * t) {
390        t->accepted_packets = 0;
391        t->filtered_packets = 0;
392        t->recorded_first = false;
393        t->tracetime_offset_usec = 0;
394        t->user_data = 0;
395        t->format_data = 0;
396        libtrace_zero_ringbuffer(&t->rbuffer);
397        t->trace = NULL;
398        t->ret = NULL;
399        t->type = THREAD_EMPTY;
400        t->perpkt_num = -1;
401}
402
403// Ints are aligned int is atomic so safe to read and write at same time
404// However write must be locked, read doesn't (We never try read before written to table)
405libtrace_thread_t * get_thread_table(libtrace_t *libtrace) {
406        int i = 0;
407        pthread_t tid = pthread_self();
408
409        for (;i<libtrace->perpkt_thread_count ;++i) {
410                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
411                        return &libtrace->perpkt_threads[i];
412        }
413        return NULL;
414}
415
416static libtrace_thread_t * get_thread_descriptor(libtrace_t *libtrace) {
417        libtrace_thread_t *ret;
418        if (!(ret = get_thread_table(libtrace))) {
419                pthread_t tid = pthread_self();
420                // Check if we are reporter or something else
421                if (libtrace->hasher_thread.type == THREAD_REPORTER &&
422                                pthread_equal(tid, libtrace->reporter_thread.tid))
423                        ret = &libtrace->reporter_thread;
424                else if (libtrace->hasher_thread.type == THREAD_HASHER &&
425                         pthread_equal(tid, libtrace->hasher_thread.tid))
426                        ret = &libtrace->hasher_thread;
427                else
428                        ret = NULL;
429        }
430        return ret;
431}
432
433DLLEXPORT void libtrace_make_packet_safe(libtrace_packet_t *pkt) {
434        // Duplicate the packet in standard malloc'd memory and free the
435        // original, This is a 1:1 exchange so the ocache count remains unchanged.
436        if (pkt->buf_control != TRACE_CTRL_PACKET) {
437                libtrace_packet_t *dup;
438                dup = trace_copy_packet(pkt);
439                /* Release the external buffer */
440                trace_fin_packet(pkt);
441                /* Copy the duplicated packet over the existing */
442                memcpy(pkt, dup, sizeof(libtrace_packet_t));
443                /* Free the packet structure */
444                free(dup);
445        }
446}
447
448/**
449 * Makes a libtrace_result_t safe, used when pausing a trace.
450 * This will call libtrace_make_packet_safe if the result is
451 * a packet.
452 */
453DLLEXPORT void libtrace_make_result_safe(libtrace_result_t *res) {
454        if (res->type == RESULT_PACKET) {
455                libtrace_make_packet_safe(res->value.pkt);
456        }
457}
458
459/**
460 * Holds threads in a paused state, until released by broadcasting
461 * the condition mutex.
462 */
463static void trace_thread_pause(libtrace_t *trace, libtrace_thread_t *t) {
464        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
465        thread_change_state(trace, t, THREAD_PAUSED, false);
466        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
467                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
468        }
469        thread_change_state(trace, t, THREAD_RUNNING, false);
470        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
471}
472
473/**
474 * Sends a packet to the user, expects either a valid packet or a TICK packet.
475 *
476 * @param trace The trace
477 * @param t The current thread
478 * @param packet A pointer to the packet storage, which may be set to null upon
479 *               return, or a packet to be finished.
480 * @param tracetime If true packets are delayed to match with tracetime
481 * @return 0 is successful, otherwise if playing back in tracetime
482 *         READ_MESSAGE(-2) can be returned in which case the packet is not sent.
483 *
484 * @note READ_MESSAGE will only be returned if tracetime is true.
485 */
486static inline int dispatch_packet(libtrace_t *trace,
487                                  libtrace_thread_t *t,
488                                  libtrace_packet_t **packet,
489                                  bool tracetime) {
490
491        if ((*packet)->error > 0) {
492                if (tracetime) {
493                        if (delay_tracetime(trace, packet[0], t) == READ_MESSAGE)
494                                return READ_MESSAGE;
495                }
496                if (!IS_LIBTRACE_META_PACKET((*packet))) {
497                        t->accepted_packets++;
498                }
499
500                /* If packet is meta call the meta callback */
501                if (IS_LIBTRACE_META_PACKET((*packet))) {
502                        if (trace->perpkt_cbs->message_meta_packet) {
503                                *packet = (*trace->perpkt_cbs->message_meta_packet)(trace, t, trace->global_blob,
504                                        t->user_data, *packet);
505                        }
506                } else {
507                        if (trace->perpkt_cbs->message_packet) {
508                                *packet = (*trace->perpkt_cbs->message_packet)(trace, t, trace->global_blob, t->user_data,
509                                        *packet);
510                        }
511                }
512                trace_fin_packet(*packet);
513        } else {
514                if ((*packet)->error != READ_TICK) {
515                        trace_set_err(trace, TRACE_ERR_BAD_STATE,
516                                "dispatch_packet() called with invalid 'packet'");
517                        return -1;
518                }
519                libtrace_generic_t data = {.uint64 = trace_packet_get_order(*packet)};
520                send_message(trace, t, MESSAGE_TICK_COUNT, data, t);
521        }
522        return 0;
523}
524
525/**
526 * Sends a batch of packets to the user, expects either a valid packet or a
527 * TICK packet.
528 *
529 * @param trace The trace
530 * @param t The current thread
531 * @param packets [in,out] An array of packets, these may be null upon return
532 * @param nb_packets The total number of packets in the list
533 * @param empty [in,out] A pointer to an integer storing the first empty slot,
534 * upon return this is updated
535 * @param offset [in,out] The offset into the array, upon return this is updated
536 * @param tracetime If true packets are delayed to match with tracetime
537 * @return 0 is successful, otherwise if playing back in tracetime
538 *         READ_MESSAGE(-2) can be returned in which case the packet is not sent.
539 *
540 * @note READ_MESSAGE will only be returned if tracetime is true.
541 */
542static inline int dispatch_packets(libtrace_t *trace,
543                                  libtrace_thread_t *t,
544                                  libtrace_packet_t *packets[],
545                                  int nb_packets, int *empty, int *offset,
546                                  bool tracetime) {
547        for (;*offset < nb_packets; ++*offset) {
548                int ret;
549                ret = dispatch_packet(trace, t, &packets[*offset], tracetime);
550                if (ret == 0) {
551                        /* Move full slots to front as we go */
552                        if (packets[*offset]) {
553                                if (*empty != *offset) {
554                                        packets[*empty] = packets[*offset];
555                                        packets[*offset] = NULL;
556                                }
557                                ++*empty;
558                        }
559                } else {
560                        /* Break early */
561                        if (ret != READ_MESSAGE) {
562                                trace_set_err(trace, TRACE_ERR_UNKNOWN_OPTION,
563                                        "dispatch_packets() called with at least one invalid packet");
564                                return -1;
565                        }
566                        return READ_MESSAGE;
567                }
568        }
569
570        return 0;
571}
572
573/**
574 * Pauses a per packet thread, messages will not be processed when the thread
575 * is paused.
576 *
577 * This process involves reading packets if a hasher thread is used. As such
578 * this function can fail to pause due to errors when reading in which case
579 * the thread should be stopped instead.
580 *
581 *
582 * @brief trace_perpkt_thread_pause
583 * @return READ_ERROR(-1) or READ_EOF(0) or 1 if successfull
584 */
585static int trace_perpkt_thread_pause(libtrace_t *trace, libtrace_thread_t *t,
586                                     libtrace_packet_t *packets[],
587                                     int nb_packets, int *empty, int *offset) {
588        libtrace_packet_t * packet = NULL;
589
590        /* Let the user thread know we are going to pause */
591        send_message(trace, t, MESSAGE_PAUSING, gen_zero, t);
592
593        /* Send through any remaining packets (or messages) without delay */
594
595        /* First send those packets already read, as fast as possible
596         * This should never fail or check for messages etc. */
597        ASSERT_RET(dispatch_packets(trace, t, packets, nb_packets, empty,
598                                    offset, false), == 0);
599
600        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
601        /* If a hasher thread is running, empty input queues so we don't lose data */
602        if (trace_has_dedicated_hasher(trace)) {
603                // The hasher has stopped by this point, so the queue shouldn't be filling
604                while(!libtrace_ringbuffer_is_empty(&t->rbuffer) || t->format_data) {
605                        int ret = trace->pread(trace, t, &packet, 1);
606                        if (ret == 1) {
607                                if (packet->error > 0) {
608                                        store_first_packet(trace, packet, t);
609                                }
610                                ASSERT_RET(dispatch_packet(trace, t, &packet, false), == 0);
611                                if (packet == NULL)
612                                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
613                        } else if (ret != READ_MESSAGE) {
614                                /* Ignore messages we pick these up next loop */
615                                if (!(ret == READ_EOF || ret == READ_ERROR)) {
616                                        trace_set_err(trace, TRACE_ERR_PAUSE_PTHREAD,
617                                                "Error pausing processing thread in trace_perpkt_thread_pause()");
618                                        return -1;
619                                }
620                                /* Verify no packets are remaining */
621                                /* TODO refactor this sanity check out!! */
622                                while (!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
623                                        ASSERT_RET(trace->pread(trace, t, &packet, 1), <= 0);
624                                        // No packets after this should have any data in them
625                                        if (packet->error > 0) {
626                                                trace_set_err(trace, TRACE_ERR_BAD_PACKET, "Bogus data in "
627                                                        "libtrace ring buffer after pausing perpkt thread");
628                                                return -1;
629                                        }
630                                }
631                                libtrace_ocache_free(&trace->packet_freelist, (void **) &packet, 1, 1);
632                                return -1;
633                        }
634                }
635        }
636        libtrace_ocache_free(&trace->packet_freelist, (void **) &packet, 1, 1);
637
638        /* Now we do the actual pause, this returns when we resumed */
639        trace_thread_pause(trace, t);
640        send_message(trace, t, MESSAGE_RESUMING, gen_zero, t);
641        return 1;
642}
643
644/**
645 * The is the entry point for our packet processing threads.
646 */
647static void* perpkt_threads_entry(void *data) {
648        libtrace_t *trace = (libtrace_t *)data;
649        libtrace_thread_t *t;
650        libtrace_message_t message = {0, {.uint64=0}, NULL};
651        libtrace_packet_t *packets[trace->config.burst_size];
652        size_t i;
653        //int ret;
654        /* The current reading position into the packets */
655        int offset = 0;
656        /* The number of packets last read */
657        int nb_packets = 0;
658        /* The offset to the first NULL packet upto offset */
659        int empty = 0;
660        int j;
661
662        /* Wait until trace_pstart has been completed */
663        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
664        t = get_thread_table(trace);
665        if (!t) {
666                trace_set_err(trace, TRACE_ERR_THREAD, "Unable to get thread table in perpkt_threads_entry()");
667                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
668                pthread_exit(NULL);
669        }
670        if (trace->state == STATE_ERROR) {
671                thread_change_state(trace, t, THREAD_FINISHED, false);
672                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
673                pthread_exit(NULL);
674        }
675        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
676
677        if (trace->format->pregister_thread) {
678                if (trace->format->pregister_thread(trace, t, 
679                                trace_is_parallel(trace)) < 0) {
680                        thread_change_state(trace, t, THREAD_FINISHED, false);
681                        pthread_exit(NULL);
682                }
683        }
684
685        /* Fill our buffer with empty packets */
686        memset(&packets, 0, sizeof(void*) * trace->config.burst_size);
687        libtrace_ocache_alloc(&trace->packet_freelist, (void **) packets,
688                              trace->config.burst_size,
689                              trace->config.burst_size);
690
691        /* ~~~~~~~~~~~ Setup complete now we loop ~~~~~~~~~~~~~~~ */
692
693        /* Let the per_packet function know we have started */
694        send_message(trace, t, MESSAGE_STARTING, gen_zero, t);
695        send_message(trace, t, MESSAGE_RESUMING, gen_zero, t);
696
697        for (;;) {
698
699                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
700                        int ret;
701                        switch (message.code) {
702                                case MESSAGE_DO_PAUSE: // This is internal
703                                        ret = trace_perpkt_thread_pause(trace, t,
704                                              packets, nb_packets, &empty, &offset);
705                                        if (ret == READ_EOF) {
706                                                goto eof;
707                                        } else if (ret == READ_ERROR) {
708                                                goto error;
709                                        }
710                                        if (ret != 1) {
711                                                fprintf(stderr, "Unknown error pausing thread in perpkt_threads_entry()\n");
712                                                pthread_exit(NULL);
713                                        }
714                                        continue;
715                                case MESSAGE_DO_STOP: // This is internal
716                                        goto eof;
717                        }
718                        send_message(trace, t, message.code, message.data, 
719                                        message.sender);
720                        /* Continue and the empty messages out before packets */
721                        continue;
722                }
723
724
725                /* Do we need to read a new set of packets MOST LIKELY we do */
726                if (offset == nb_packets) {
727                        /* Refill the packet buffer */
728                        if (empty != nb_packets) {
729                                // Refill the empty packets
730                                libtrace_ocache_alloc(&trace->packet_freelist,
731                                                      (void **) &packets[empty],
732                                                      nb_packets - empty,
733                                                      nb_packets - empty);
734                        }
735                        if (!trace->pread) {
736                                if (!packets[0]) {
737                                        fprintf(stderr, "Unable to read into NULL packet structure\n");
738                                        pthread_exit(NULL);
739                                }
740                                nb_packets = trace_read_packet(trace, packets[0]);
741                                packets[0]->error = nb_packets;
742                                if (nb_packets > 0)
743                                        nb_packets = 1;
744                        } else {
745                                nb_packets = trace->pread(trace, t, packets, trace->config.burst_size);
746                        }
747                        offset = 0;
748                        empty = 0;
749                }
750
751                /* Handle error/message cases */
752                if (nb_packets > 0) {
753                        /* Store the first non-meta packet */
754                        for (j = 0; j < nb_packets; j++) {
755                                if (t->recorded_first)
756                                        break;
757                                if (packets[j]->error > 0) {
758                                        store_first_packet(trace, packets[j], t);
759                                }
760                        }
761                        dispatch_packets(trace, t, packets, nb_packets, &empty,
762                                         &offset, trace->tracetime);
763                } else {
764                        switch (nb_packets) {
765                        case READ_EOF:
766                                goto eof;
767                        case READ_ERROR:
768                                goto error;
769                        case READ_MESSAGE:
770                                nb_packets = 0;
771                                continue;
772                        default:
773                                fprintf(stderr, "Unexpected error %d!!\n", nb_packets);
774                                goto error;
775                        }
776                }
777
778        }
779
780error:
781        message.code = MESSAGE_DO_STOP;
782        message.sender = t;
783        message.data.uint64 = 0;
784        trace_message_perpkts(trace, &message);
785eof:
786        /* ~~~~~~~~~~~~~~ Trace is finished do tear down ~~~~~~~~~~~~~~~~~~~~~ */
787
788        // Let the per_packet function know we have stopped
789        send_message(trace, t, MESSAGE_PAUSING, gen_zero, t);
790        send_message(trace, t, MESSAGE_STOPPING, gen_zero, t);
791
792        // Free any remaining packets
793        for (i = 0; i < trace->config.burst_size; i++) {
794                if (packets[i]) {
795                        libtrace_ocache_free(&trace->packet_freelist, (void **) &packets[i], 1, 1);
796                        packets[i] = NULL;
797                }
798        }
799
800        thread_change_state(trace, t, THREAD_FINISHED, true);
801
802        /* Make sure the reporter sees we have finished */
803        if (trace_has_reporter(trace))
804                trace_post_reporter(trace);
805
806        // Release all ocache memory before unregistering with the format
807        // because this might(it does in DPDK) unlink the formats mempool
808        // causing destroy/finish packet to fail.
809        libtrace_ocache_unregister_thread(&trace->packet_freelist);
810        if (trace->format->punregister_thread) {
811                trace->format->punregister_thread(trace, t);
812        }
813        print_memory_stats();
814
815        pthread_exit(NULL);
816}
817
818/**
819 * The start point for our single threaded hasher thread, this will read
820 * and hash a packet from a data source and queue it against the correct
821 * core to process it.
822 */
823static void* hasher_entry(void *data) {
824        libtrace_t *trace = (libtrace_t *)data;
825        libtrace_thread_t * t;
826        int i;
827        libtrace_packet_t * packet;
828        libtrace_message_t message = {0, {.uint64=0}, NULL};
829        int pkt_skipped = 0;
830
831        if (!trace_has_dedicated_hasher(trace)) {
832                fprintf(stderr, "Trace does not have hasher associated with it in hasher_entry()\n");
833                pthread_exit(NULL);
834        }
835        /* Wait until all threads are started and objects are initialised (ring buffers) */
836        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
837        t = &trace->hasher_thread;
838        if (!(t->type == THREAD_HASHER && pthread_equal(pthread_self(), t->tid))) {
839                fprintf(stderr, "Incorrect thread type or non matching thread IDs in hasher_entry()\n");
840                pthread_exit(NULL);
841        }
842
843        if (trace->state == STATE_ERROR) {
844                thread_change_state(trace, t, THREAD_FINISHED, false);
845                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
846                pthread_exit(NULL);
847        }
848        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
849
850        /* We are reading but it is not the parallel API */
851        if (trace->format->pregister_thread) {
852                trace->format->pregister_thread(trace, t, true);
853        }
854
855        /* Read all packets in then hash and queue against the correct thread */
856        while (1) {
857                int thread;
858                if (!pkt_skipped)
859                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
860                if (!packet) {
861                        fprintf(stderr, "Hasher thread was unable to get a fresh packet from the "
862                                "object cache\n");
863                        pthread_exit(NULL);
864                }
865
866                // Check for messages that we expect MESSAGE_DO_PAUSE, (internal messages only)
867                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
868                        switch(message.code) {
869                                case MESSAGE_DO_PAUSE:
870                                        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
871                                        thread_change_state(trace, t, THREAD_PAUSED, false);
872                                        pthread_cond_broadcast(&trace->perpkt_cond);
873                                        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
874                                                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
875                                        }
876                                        thread_change_state(trace, t, THREAD_RUNNING, false);
877                                        pthread_cond_broadcast(&trace->perpkt_cond);
878                                        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
879                                        break;
880                                case MESSAGE_DO_STOP:
881                                        /* Either FINISHED or FINISHING */
882                                        if (!(trace->started == false)) {
883                                                fprintf(stderr, "STOP message received by hasher, but "
884                                                        "trace is still active\n");
885                                                pthread_exit(NULL);
886                                        }
887                                        /* Mark the current packet as EOF */
888                                        packet->error = 0;
889                                        goto hasher_eof;
890                                default:
891                                        fprintf(stderr, "Hasher thread didn't expect message code=%d\n", message.code);
892                        }
893                        pkt_skipped = 1;
894                        continue;
895                }
896
897                if ((packet->error = trace_read_packet(trace, packet)) <1) {
898                        if (packet->error == READ_MESSAGE) {
899                                pkt_skipped = 1;
900                                continue;
901                        } else {
902                                break; /* We are EOF or error'd either way we stop  */
903                        }
904                }
905
906                /* We are guaranteed to have a hash function i.e. != NULL */
907                trace_packet_set_hash(packet, (*trace->hasher)(packet, trace->hasher_data));
908                thread = trace_packet_get_hash(packet) % trace->perpkt_thread_count;
909                /* Blocking write to the correct queue - I'm the only writer */
910                if (trace->perpkt_threads[thread].state != THREAD_FINISHED) {
911                        uint64_t order = trace_packet_get_order(packet);
912                        libtrace_ringbuffer_write(&trace->perpkt_threads[thread].rbuffer, packet);
913                        if (trace->config.tick_count && order % trace->config.tick_count == 0) {
914                                // Write ticks to everyone else
915                                libtrace_packet_t * pkts[trace->perpkt_thread_count];
916                                memset(pkts, 0, sizeof(void *) * trace->perpkt_thread_count);
917                                libtrace_ocache_alloc(&trace->packet_freelist, (void **) pkts, trace->perpkt_thread_count, trace->perpkt_thread_count);
918                                for (i = 0; i < trace->perpkt_thread_count; i++) {
919                                        pkts[i]->error = READ_TICK;
920                                        trace_packet_set_order(pkts[i], order);
921                                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, pkts[i]);
922                                }
923                        }
924                        pkt_skipped = 0;
925                }
926        }
927hasher_eof:
928        /* Broadcast our last failed read to all threads */
929        for (i = 0; i < trace->perpkt_thread_count; i++) {
930                libtrace_packet_t * bcast;
931                if (i == trace->perpkt_thread_count - 1) {
932                        bcast = packet;
933                } else {
934                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &bcast, 1, 1);
935                        bcast->error = packet->error;
936                }
937                ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
938                if (trace->perpkt_threads[i].state != THREAD_FINISHED) {
939                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, bcast);
940                } else {
941                        libtrace_ocache_free(&trace->packet_freelist, (void **) &bcast, 1, 1);
942                }
943                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
944        }
945
946        // We don't need to free the packet
947        thread_change_state(trace, t, THREAD_FINISHED, true);
948
949        libtrace_ocache_unregister_thread(&trace->packet_freelist);
950        if (trace->format->punregister_thread) {
951                trace->format->punregister_thread(trace, t);
952        }
953        print_memory_stats();
954
955        // TODO remove from TTABLE t sometime
956        pthread_exit(NULL);
957}
958
959/* Our simplest case when a thread becomes ready it can obtain an exclusive
960 * lock to read packets from the underlying trace.
961 */
962static int trace_pread_packet_first_in_first_served(libtrace_t *libtrace,
963                                                    libtrace_thread_t *t,
964                                                    libtrace_packet_t *packets[],
965                                                    size_t nb_packets) {
966        size_t i = 0;
967        //bool tick_hit = false;
968
969        ASSERT_RET(pthread_mutex_lock(&libtrace->read_packet_lock), == 0);
970        /* Read nb_packets */
971        for (i = 0; i < nb_packets; ++i) {
972                if (libtrace_message_queue_count(&t->messages) > 0) {
973                        if ( i==0 ) {
974                                ASSERT_RET(pthread_mutex_unlock(&libtrace->read_packet_lock), == 0);
975                                return READ_MESSAGE;
976                        } else {
977                                break;
978                        }
979                }
980                packets[i]->error = trace_read_packet(libtrace, packets[i]);
981
982                if (packets[i]->error <= 0) {
983                        /* We'll catch this next time if we have already got packets */
984                        if ( i==0 ) {
985                                ASSERT_RET(pthread_mutex_unlock(&libtrace->read_packet_lock), == 0);
986                                return packets[i]->error;
987                        } else {
988                                break;
989                        }
990                }
991                /*
992                if (libtrace->config.tick_count && trace_packet_get_order(packets[i]) % libtrace->config.tick_count == 0) {
993                        tick_hit = true;
994                }*/
995
996                // Doing this inside the lock ensures the first packet is
997                // always recorded first
998                if (!t->recorded_first && packets[0]->error > 0) {
999                        store_first_packet(libtrace, packets[0], t);
1000                }
1001        }
1002        ASSERT_RET(pthread_mutex_unlock(&libtrace->read_packet_lock), == 0);
1003        /* XXX TODO this needs to be inband with packets, or we don't bother in this case
1004        if (tick_hit) {
1005                libtrace_message_t tick;
1006                tick.additional.uint64 = trace_packet_get_order(packets[i]);
1007                tick.code = MESSAGE_TICK;
1008                trace_send_message_to_perpkts(libtrace, &tick);
1009        } */
1010        return i;
1011}
1012
1013/**
1014 * For the case that we have a dedicated hasher thread
1015 * 1. We read a packet from our buffer
1016 * 2. Move that into the packet provided (packet)
1017 */
1018inline static int trace_pread_packet_hasher_thread(libtrace_t *libtrace,
1019                                                   libtrace_thread_t *t,
1020                                                   libtrace_packet_t *packets[],
1021                                                   size_t nb_packets) {
1022        size_t i;
1023
1024        /* We store the last error message here */
1025        if (t->format_data) {
1026                return ((libtrace_packet_t *)t->format_data)->error;
1027        }
1028
1029        // Always grab at least one
1030        if (packets[0]) // Recycle the old get the new
1031                libtrace_ocache_free(&libtrace->packet_freelist, (void **) packets, 1, 1);
1032        packets[0] = libtrace_ringbuffer_read(&t->rbuffer);
1033
1034        if (packets[0]->error <= 0 && packets[0]->error != READ_TICK) {
1035                return packets[0]->error;
1036        }
1037
1038        for (i = 1; i < nb_packets; i++) {
1039                if (packets[i]) // Recycle the old get the new
1040                        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packets[i], 1, 1);
1041                if (!libtrace_ringbuffer_try_read(&t->rbuffer, (void **) &packets[i])) {
1042                        packets[i] = NULL;
1043                        break;
1044                }
1045
1046                /* We will return an error or EOF the next time around */
1047                if (packets[i]->error <= 0 && packets[0]->error != READ_TICK) {
1048                        /* The message case will be checked automatically -
1049                           However other cases like EOF and error will only be
1050                           sent once*/
1051                        if (packets[i]->error != READ_MESSAGE) {
1052                                t->format_data = packets[i];
1053                        }
1054                        break;
1055                }
1056        }
1057
1058        return i;
1059}
1060
1061/**
1062 * For the first packet of each queue we keep a copy and note the system
1063 * time it was received at.
1064 *
1065 * This is used for finding the first packet when playing back a trace
1066 * in trace time. And can be used by real time applications to print
1067 * results out every XXX seconds.
1068 */
1069void store_first_packet(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t)
1070{
1071
1072        libtrace_message_t mesg = {0, {.uint64=0}, NULL};
1073        struct timeval tv;
1074        libtrace_packet_t * dup;
1075
1076        if (t->recorded_first) {
1077                return;
1078        }
1079
1080        if (IS_LIBTRACE_META_PACKET(packet)) {
1081                return;
1082        }
1083
1084        /* We mark system time against a copy of the packet */
1085        gettimeofday(&tv, NULL);
1086        dup = trace_copy_packet(packet);
1087
1088        ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
1089        libtrace->first_packets.packets[t->perpkt_num].packet = dup;
1090        memcpy(&libtrace->first_packets.packets[t->perpkt_num].tv, &tv, sizeof(tv));
1091        libtrace->first_packets.count++;
1092
1093        /* Now update the first */
1094        if (libtrace->first_packets.count == 1) {
1095                /* We the first entry hence also the first known packet */
1096                libtrace->first_packets.first = t->perpkt_num;
1097        } else {
1098                /* Check if we are newer than the previous 'first' packet */
1099                size_t first = libtrace->first_packets.first;
1100                struct timeval cur_ts = trace_get_timeval(dup);
1101                struct timeval first_ts = trace_get_timeval(libtrace->first_packets.packets[first].packet);
1102                if (timercmp(&cur_ts, &first_ts, <))
1103                        libtrace->first_packets.first = t->perpkt_num;
1104        }
1105        ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
1106
1107        memset(&mesg, 0, sizeof(libtrace_message_t));
1108        mesg.code = MESSAGE_FIRST_PACKET;
1109        trace_message_reporter(libtrace, &mesg);
1110        trace_message_perpkts(libtrace, &mesg);
1111        t->recorded_first = true;
1112}
1113
1114DLLEXPORT int trace_get_first_packet(libtrace_t *libtrace,
1115                                     libtrace_thread_t *t,
1116                                     const libtrace_packet_t **packet,
1117                                     const struct timeval **tv)
1118{
1119        void * tmp;
1120        int ret = 0;
1121
1122        if (t) {
1123                if (t->type != THREAD_PERPKT || t->trace != libtrace)
1124                        return -1;
1125        }
1126
1127        /* Throw away these which we don't use */
1128        if (!packet)
1129                packet = (const libtrace_packet_t **) &tmp;
1130        if (!tv)
1131                tv = (const struct timeval **) &tmp;
1132
1133        ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
1134        if (t) {
1135                /* Get the requested thread */
1136                *packet = libtrace->first_packets.packets[t->perpkt_num].packet;
1137                *tv = &libtrace->first_packets.packets[t->perpkt_num].tv;
1138        } else if (libtrace->first_packets.count) {
1139                /* Get the first packet across all threads */
1140                *packet = libtrace->first_packets.packets[libtrace->first_packets.first].packet;
1141                *tv = &libtrace->first_packets.packets[libtrace->first_packets.first].tv;
1142                if (libtrace->first_packets.count == (size_t) libtrace->perpkt_thread_count) {
1143                        ret = 1;
1144                } else {
1145                        struct timeval curr_tv;
1146                        // If a second has passed since the first entry we will assume this is the very first packet
1147                        gettimeofday(&curr_tv, NULL);
1148                        if (curr_tv.tv_sec > (*tv)->tv_sec) {
1149                                if(curr_tv.tv_usec > (*tv)->tv_usec || curr_tv.tv_sec - (*tv)->tv_sec > 1) {
1150                                        ret = 1;
1151                                }
1152                        }
1153                }
1154        } else {
1155                *packet = NULL;
1156                *tv = NULL;
1157        }
1158        ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
1159        return ret;
1160}
1161
1162
1163DLLEXPORT uint64_t tv_to_usec(const struct timeval *tv)
1164{
1165        return (uint64_t) tv->tv_sec*1000000ull + (uint64_t) tv->tv_usec;
1166}
1167
1168inline static struct timeval usec_to_tv(uint64_t usec)
1169{
1170        struct timeval tv;
1171        tv.tv_sec = usec / 1000000;
1172        tv.tv_usec = usec % 1000000;
1173        return tv;
1174}
1175
1176/** Similar to delay_tracetime but send messages to all threads periodically */
1177static void* reporter_entry(void *data) {
1178        libtrace_message_t message = {0, {.uint64=0}, NULL};
1179        libtrace_t *trace = (libtrace_t *)data;
1180        libtrace_thread_t *t = &trace->reporter_thread;
1181
1182        /* Wait until all threads are started */
1183        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
1184        if (trace->state == STATE_ERROR) {
1185                thread_change_state(trace, t, THREAD_FINISHED, false);
1186                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1187                pthread_exit(NULL);
1188        }
1189        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1190
1191        if (trace->format->pregister_thread) {
1192                trace->format->pregister_thread(trace, t, false);
1193        }
1194
1195        send_message(trace, t, MESSAGE_STARTING, (libtrace_generic_t){0}, t);
1196        send_message(trace, t, MESSAGE_RESUMING, (libtrace_generic_t){0}, t);
1197
1198        while (!trace_has_finished(trace)) {
1199                if (trace->config.reporter_polling) {
1200                        if (libtrace_message_queue_try_get(&t->messages, &message) == LIBTRACE_MQ_FAILED)
1201                                message.code = MESSAGE_POST_REPORTER;
1202                } else {
1203                        libtrace_message_queue_get(&t->messages, &message);
1204                }
1205                switch (message.code) {
1206                        // Check for results
1207                        case MESSAGE_POST_REPORTER:
1208                                trace->combiner.read(trace, &trace->combiner);
1209                                break;
1210                        case MESSAGE_DO_PAUSE:
1211                                if(trace->combiner.pause) {
1212                                        trace->combiner.pause(trace, &trace->combiner);
1213                                }
1214                                send_message(trace, t, MESSAGE_PAUSING,
1215                                                (libtrace_generic_t) {0}, t);
1216                                trace_thread_pause(trace, t);
1217                                send_message(trace, t, MESSAGE_RESUMING,
1218                                                (libtrace_generic_t) {0}, t);
1219                                break;
1220                default:
1221                        send_message(trace, t, message.code, message.data,
1222                                        message.sender);
1223                }
1224        }
1225
1226        // Flush out whats left now all our threads have finished
1227        trace->combiner.read_final(trace, &trace->combiner);
1228
1229        // GOODBYE
1230        send_message(trace, t, MESSAGE_PAUSING,(libtrace_generic_t) {0}, t);
1231        send_message(trace, t, MESSAGE_STOPPING,(libtrace_generic_t) {0}, t);
1232
1233        thread_change_state(trace, &trace->reporter_thread, THREAD_FINISHED, true);
1234        print_memory_stats();
1235        pthread_exit(NULL);
1236}
1237
1238/** Similar to delay_tracetime but send messages to all threads periodically */
1239static void* keepalive_entry(void *data) {
1240        struct timeval prev, next;
1241        libtrace_message_t message = {0, {.uint64=0}, NULL};
1242        libtrace_t *trace = (libtrace_t *)data;
1243        uint64_t next_release;
1244        libtrace_thread_t *t = &trace->keepalive_thread;
1245
1246        /* Wait until all threads are started */
1247        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
1248        if (trace->state == STATE_ERROR) {
1249                thread_change_state(trace, t, THREAD_FINISHED, false);
1250                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1251                pthread_exit(NULL);
1252        }
1253        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1254
1255        gettimeofday(&prev, NULL);
1256        memset(&message, 0, sizeof(libtrace_message_t));
1257        message.code = MESSAGE_TICK_INTERVAL;
1258
1259        while (trace->state != STATE_FINISHED) {
1260                fd_set rfds;
1261                next_release = tv_to_usec(&prev) + (trace->config.tick_interval * 1000);
1262                gettimeofday(&next, NULL);
1263                if (next_release > tv_to_usec(&next)) {
1264                        next = usec_to_tv(next_release - tv_to_usec(&next));
1265                        // Wait for timeout or a message
1266                        FD_ZERO(&rfds);
1267                        FD_SET(libtrace_message_queue_get_fd(&t->messages), &rfds);
1268                        if (select(libtrace_message_queue_get_fd(&t->messages)+1, &rfds, NULL, NULL, &next) == 1) {
1269                                libtrace_message_t msg;
1270                                libtrace_message_queue_get(&t->messages, &msg);
1271                                if (msg.code != MESSAGE_DO_STOP) {
1272                                        fprintf(stderr, "Unexpected message code in keepalive_entry()\n");
1273                                        pthread_exit(NULL);
1274                                }
1275                                goto done;
1276                        }
1277                }
1278                prev = usec_to_tv(next_release);
1279                if (trace->state == STATE_RUNNING) {
1280                        message.data.uint64 = ((((uint64_t)prev.tv_sec) << 32) +
1281                                               (((uint64_t)prev.tv_usec << 32)/1000000));
1282                        trace_message_perpkts(trace, &message);
1283                }
1284        }
1285done:
1286
1287        thread_change_state(trace, t, THREAD_FINISHED, true);
1288        pthread_exit(NULL);
1289}
1290
1291/**
1292 * Delays a packets playback so the playback will be in trace time.
1293 * This may break early if a message becomes available.
1294 *
1295 * Requires the first packet for this thread to be received.
1296 * @param libtrace  The trace
1297 * @param packet    The packet to delay
1298 * @param t         The current thread
1299 * @return Either READ_MESSAGE(-2) or 0 is successful
1300 */
1301static inline int delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t) {
1302        struct timeval curr_tv, pkt_tv;
1303        uint64_t next_release = t->tracetime_offset_usec;
1304        uint64_t curr_usec;
1305
1306        if (!t->tracetime_offset_usec) {
1307                const libtrace_packet_t *first_pkt;
1308                const struct timeval *sys_tv;
1309                int64_t initial_offset;
1310                int stable = trace_get_first_packet(libtrace, NULL, &first_pkt, &sys_tv);
1311                if (!first_pkt)
1312                        return 0;
1313                pkt_tv = trace_get_timeval(first_pkt);
1314                initial_offset = (int64_t)tv_to_usec(sys_tv) - (int64_t)tv_to_usec(&pkt_tv);
1315                /* In the unlikely case offset is 0, change it to 1 */
1316                if (stable)
1317                        t->tracetime_offset_usec = initial_offset ? initial_offset: 1;
1318                next_release = initial_offset;
1319        }
1320        /* next_release == offset */
1321        pkt_tv = trace_get_timeval(packet);
1322        next_release += tv_to_usec(&pkt_tv);
1323        gettimeofday(&curr_tv, NULL);
1324        curr_usec = tv_to_usec(&curr_tv);
1325        if (next_release > curr_usec) {
1326                int ret, mesg_fd = libtrace_message_queue_get_fd(&t->messages);
1327                struct timeval delay_tv = usec_to_tv(next_release-curr_usec);
1328                fd_set rfds;
1329                FD_ZERO(&rfds);
1330                FD_SET(mesg_fd, &rfds);
1331                // We need to wait
1332                ret = select(mesg_fd+1, &rfds, NULL, NULL, &delay_tv);
1333                if (ret == 0) {
1334                        return 0;
1335                } else if (ret > 0) {
1336                        return READ_MESSAGE;
1337                } else {
1338                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, "Unexpected return from select in delay_tracetime()");
1339                        return -1;
1340                }
1341        }
1342        return 0;
1343}
1344
1345/* Discards packets that don't match the filter.
1346 * Discarded packets are emptied and then moved to the end of the packet list.
1347 *
1348 * @param trace       The trace format, containing the filter
1349 * @param packets     An array of packets
1350 * @param nb_packets  The number of valid items in packets
1351 *
1352 * @return The number of packets that passed the filter, which are moved to
1353 *          the start of the packets array
1354 */
1355static inline size_t filter_packets(libtrace_t *trace,
1356                                    libtrace_packet_t **packets,
1357                                    size_t nb_packets) {
1358        size_t offset = 0;
1359        size_t i;
1360
1361        for (i = 0; i < nb_packets; ++i) {
1362                // The filter needs the trace attached to receive the link type
1363                packets[i]->trace = trace;
1364                if (trace_apply_filter(trace->filter, packets[i])) {
1365                        libtrace_packet_t *tmp;
1366                        tmp = packets[offset];
1367                        packets[offset++] = packets[i];
1368                        packets[i] = tmp;
1369                } else {
1370                        trace_fin_packet(packets[i]);
1371                }
1372        }
1373
1374        return offset;
1375}
1376
1377/* Read a batch of packets from the trace into a buffer.
1378 * Note that this function will block until a packet is read (or EOF is reached)
1379 *
1380 * @param libtrace    The trace
1381 * @param t           The thread
1382 * @param packets     An array of packets
1383 * @param nb_packets  The number of empty packets in packets
1384 * @return The number of packets read, 0 on EOF (or an error/message -1,-2).
1385 */
1386static int trace_pread_packet_wrapper(libtrace_t *libtrace,
1387                                      libtrace_thread_t *t,
1388                                      libtrace_packet_t *packets[],
1389                                      size_t nb_packets) {
1390        int i;
1391        if (!libtrace) {
1392                fprintf(stderr, "NULL trace passed into trace_read_packet()\n");
1393                return TRACE_ERR_NULL_TRACE;
1394        }
1395        if (nb_packets <= 0) {
1396                trace_set_err(libtrace, TRACE_ERR_NULL,
1397                        "nb_packets must be greater than zero in trace_pread_packet_wrapper()");
1398                return -1;
1399        }
1400        if (trace_is_err(libtrace))
1401                return -1;
1402        if (!libtrace->started) {
1403                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
1404                              "You must call libtrace_start() before trace_read_packet()\n");
1405                return -1;
1406        }
1407
1408        if (libtrace->format->pread_packets) {
1409                int ret;
1410                for (i = 0; i < (int) nb_packets; ++i) {
1411                        if (!i[packets]) {
1412                                trace_set_err(libtrace, TRACE_ERR_BAD_STATE, "NULL packets in "
1413                                        "trace_pread_packet_wrapper()");
1414                                return -1;
1415                        }
1416                        if (!(packets[i]->buf_control==TRACE_CTRL_PACKET ||
1417                              packets[i]->buf_control==TRACE_CTRL_EXTERNAL)) {
1418                                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,
1419                                              "Packet passed to trace_read_packet() is invalid\n");
1420                                return -1;
1421                        }
1422                        packets[i]->which_trace_start = libtrace->startcount;
1423                }
1424                do {
1425                        ret=libtrace->format->pread_packets(libtrace, t,
1426                                                            packets,
1427                                                            nb_packets);
1428                        /* Error, EOF or message? */
1429                        if (ret <= 0) {
1430                                return ret;
1431                        }
1432
1433                        if (libtrace->filter) {
1434                                int remaining;
1435                                remaining = filter_packets(libtrace,
1436                                                           packets, ret);
1437                                t->filtered_packets += ret - remaining;
1438                                ret = remaining;
1439                        }
1440                        for (i = 0; i < ret; ++i) {
1441                                /* We do not mark the packet against the trace,
1442                                 * before hand or after. After breaks DAG meta
1443                                 * packets and before is inefficient */
1444                                //packets[i]->trace = libtrace;
1445                                /* TODO IN FORMAT?? Like traditional libtrace */
1446                                if (libtrace->snaplen>0)
1447                                        trace_set_capture_length(packets[i],
1448                                                        libtrace->snaplen);
1449                        }
1450                } while(ret == 0);
1451                return ret;
1452        }
1453        trace_set_err(libtrace, TRACE_ERR_UNSUPPORTED,
1454                      "This format does not support reading packets\n");
1455        return ~0U;
1456}
1457
1458/* Restarts a parallel trace, this is called from trace_pstart.
1459 * The libtrace lock is held upon calling this function.
1460 * Typically with a parallel trace the threads are not
1461 * killed rather.
1462 */
1463static int trace_prestart(libtrace_t * libtrace, void *global_blob,
1464                          libtrace_callback_set_t *per_packet_cbs, 
1465                          libtrace_callback_set_t *reporter_cbs) {
1466        int i, err = 0;
1467        if (libtrace->state != STATE_PAUSED) {
1468                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
1469                        "trace(%s) is not currently paused",
1470                              libtrace->uridata);
1471                return -1;
1472        }
1473
1474        if (!libtrace_parallel) {
1475                trace_set_err(libtrace, TRACE_ERR_THREAD, "Trace_prestart() has been called on a "
1476                        "non-parallel libtrace input?");
1477                return -1;
1478        }
1479        if (libtrace->perpkt_thread_states[THREAD_RUNNING]) {
1480                trace_set_err(libtrace, TRACE_ERR_THREAD, "Cannot restart a parallel libtrace input "
1481                        "while it is still running");
1482                return -1;
1483        }
1484
1485        /* Reset first packets */
1486        pthread_spin_lock(&libtrace->first_packets.lock);
1487        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1488                if (libtrace->first_packets.packets[i].packet) {
1489                        trace_destroy_packet(libtrace->first_packets.packets[i].packet);
1490                        libtrace->first_packets.packets[i].packet = NULL;
1491                        libtrace->first_packets.packets[i].tv.tv_sec = 0;
1492                        libtrace->first_packets.packets[i].tv.tv_usec = 0;
1493                        libtrace->first_packets.count--;
1494                        libtrace->perpkt_threads[i].recorded_first = false;
1495                }
1496        }
1497        if (libtrace->first_packets.count != 0) {
1498                trace_set_err(libtrace, TRACE_ERR_THREAD, "Expected a first packets count of 0 in trace_pstart()");
1499                return -1;
1500        }
1501        libtrace->first_packets.first = 0;
1502        pthread_spin_unlock(&libtrace->first_packets.lock);
1503
1504        /* Reset delay */
1505        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1506                libtrace->perpkt_threads[i].tracetime_offset_usec = 0;
1507        }
1508
1509        /* Reset statistics */
1510        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1511                libtrace->perpkt_threads[i].accepted_packets = 0;
1512                libtrace->perpkt_threads[i].filtered_packets = 0;
1513        }
1514        libtrace->accepted_packets = 0;
1515        libtrace->filtered_packets = 0;
1516
1517        /* Update functions if requested */
1518        if(global_blob)
1519                libtrace->global_blob = global_blob;
1520
1521        if (per_packet_cbs) {
1522                if (libtrace->perpkt_cbs)
1523                        trace_destroy_callback_set(libtrace->perpkt_cbs);
1524                libtrace->perpkt_cbs = trace_create_callback_set();
1525                memcpy(libtrace->perpkt_cbs, per_packet_cbs, 
1526                                sizeof(libtrace_callback_set_t));
1527        }
1528
1529        if (reporter_cbs) {
1530                if (libtrace->reporter_cbs)
1531                        trace_destroy_callback_set(libtrace->reporter_cbs);
1532
1533                libtrace->reporter_cbs = trace_create_callback_set();
1534                memcpy(libtrace->reporter_cbs, reporter_cbs,
1535                                sizeof(libtrace_callback_set_t));
1536        }
1537
1538        if (trace_is_parallel(libtrace)) {
1539                err = libtrace->format->pstart_input(libtrace);
1540        } else {
1541                if (libtrace->format->start_input) {
1542                        err = libtrace->format->start_input(libtrace);
1543                }
1544        }
1545
1546        if (err == 0) {
1547                libtrace->started = true;
1548                libtrace->startcount ++;
1549                libtrace_change_state(libtrace, STATE_RUNNING, false);
1550        }
1551        return err;
1552}
1553
1554/**
1555 * @return the number of CPU cores on the machine. -1 if unknown.
1556 */
1557SIMPLE_FUNCTION static int get_nb_cores() {
1558        int numCPU;
1559#ifdef _SC_NPROCESSORS_ONLN
1560        /* Most systems do this now */
1561        numCPU = sysconf(_SC_NPROCESSORS_ONLN);
1562
1563#else
1564        int mib[] = {CTL_HW, HW_AVAILCPU};
1565        size_t len = sizeof(numCPU);
1566
1567        /* get the number of CPUs from the system */
1568        sysctl(mib, 2, &numCPU, &len, NULL, 0);
1569#endif
1570        return numCPU <= 0 ? 1 : numCPU;
1571}
1572
1573/**
1574 * Verifies the configuration and sets default values for any values not
1575 * specified by the user.
1576 */
1577static void verify_configuration(libtrace_t *libtrace) {
1578
1579        if (libtrace->config.hasher_queue_size <= 0)
1580                libtrace->config.hasher_queue_size = 1000;
1581
1582        if (libtrace->config.perpkt_threads <= 0) {
1583                libtrace->perpkt_thread_count = get_nb_cores();
1584                if (libtrace->perpkt_thread_count <= 0)
1585                        // Lets just use one
1586                        libtrace->perpkt_thread_count = 1;
1587        } else {
1588                libtrace->perpkt_thread_count = libtrace->config.perpkt_threads;
1589        }
1590
1591        if (libtrace->config.reporter_thold <= 0)
1592                libtrace->config.reporter_thold = 100;
1593        if (libtrace->config.burst_size <= 0)
1594                libtrace->config.burst_size = 32;
1595        if (libtrace->config.thread_cache_size <= 0)
1596                libtrace->config.thread_cache_size = 64;
1597        if (libtrace->config.cache_size <= 0)
1598                libtrace->config.cache_size = (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count;
1599
1600        if (libtrace->config.cache_size <
1601                (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count)
1602                fprintf(stderr, "WARNING deadlocks may occur and extra memory allocating buffer sizes (packet_freelist_size) mismatched\n");
1603
1604        if (libtrace->combiner.initialise == NULL && libtrace->combiner.publish == NULL)
1605                libtrace->combiner = combiner_unordered;
1606
1607        /* Figure out if we are using a dedicated hasher thread? */
1608        if (libtrace->hasher && libtrace->perpkt_thread_count > 1) {
1609                libtrace->hasher_thread.type = THREAD_HASHER;
1610        }
1611}
1612
1613/**
1614 * Starts a libtrace_thread, including allocating memory for messaging.
1615 * Threads are expected to wait until the libtrace look is released.
1616 * Hence why we don't init structures until later.
1617 *
1618 * @param trace The trace the thread is associated with
1619 * @param t The thread that is filled when the thread is started
1620 * @param type The type of thread
1621 * @param start_routine The entry location of the thread
1622 * @param perpkt_num The perpkt thread number (should be set -1 if not perpkt)
1623 * @param name For debugging purposes set the threads name (Optional)
1624 *
1625 * @return 0 on success or -1 upon error in which case the libtrace error is set.
1626 *         In this situation the thread structure is zeroed.
1627 */
1628static int trace_start_thread(libtrace_t *trace,
1629                       libtrace_thread_t *t,
1630                       enum thread_types type,
1631                       void *(*start_routine) (void *),
1632                       int perpkt_num,
1633                       const char *name) {
1634#ifdef __linux__
1635        cpu_set_t cpus;
1636        int i;
1637#endif
1638        int ret;
1639        if (t->type != THREAD_EMPTY) {
1640                trace_set_err(trace, TRACE_ERR_THREAD,
1641                        "Expected thread type of THREAD_EMPTY in trace_start_thread()");
1642                return -1;
1643        }
1644        t->trace = trace;
1645        t->ret = NULL;
1646        t->user_data = NULL;
1647        t->type = type;
1648        t->state = THREAD_RUNNING;
1649
1650        if (!name) {
1651                trace_set_err(trace, TRACE_ERR_THREAD, "NULL thread name in trace_start_thread()");
1652                return -1;
1653        }
1654
1655#ifdef __linux__
1656        CPU_ZERO(&cpus);
1657        for (i = 0; i < get_nb_cores(); i++)
1658                CPU_SET(i, &cpus);
1659
1660        ret = pthread_create(&t->tid, NULL, start_routine, (void *) trace);
1661        if( ret == 0 ) {
1662                ret = pthread_setaffinity_np(t->tid, sizeof(cpus), &cpus);
1663        }
1664
1665#else
1666        ret = pthread_create(&t->tid, NULL, start_routine, (void *) trace);
1667#endif
1668        if (ret != 0) {
1669                libtrace_zero_thread(t);
1670                trace_set_err(trace, ret, "Failed to create a thread of type=%d\n", type);
1671                return -1;
1672        }
1673        libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
1674        if (trace_has_dedicated_hasher(trace) && type == THREAD_PERPKT) {
1675                libtrace_ringbuffer_init(&t->rbuffer,
1676                                         trace->config.hasher_queue_size,
1677                                         trace->config.hasher_polling?
1678                                                 LIBTRACE_RINGBUFFER_POLLING:
1679                                                 LIBTRACE_RINGBUFFER_BLOCKING);
1680        }
1681#if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__)
1682        if(name)
1683                pthread_setname_np(t->tid, name);
1684#endif
1685        t->perpkt_num = perpkt_num;
1686        return 0;
1687}
1688
1689/** Parses the environment variable LIBTRACE_CONF into the supplied
1690 * configuration structure.
1691 *
1692 * @param[in,out] libtrace The trace from which we determine the URI and set
1693 * the configuration.
1694 *
1695 * We search for 3 environment variables and apply them to the config in the
1696 * following order. Such that the first has the lowest priority.
1697 *
1698 * 1. LIBTRACE_CONF, The global environment configuration
1699 * 2. LIBTRACE_CONF_<FORMAT>, Applied to a given format
1700 * 3. LIBTRACE_CONF_<FORMAT_URI>, Applied the specified trace
1701 *
1702 * E.g.
1703 * - int:eth0 would match LIBTRACE_CONF, LIBTRACE_CONF_INT, LIBTRACE_CONF_INT_ETH0
1704 * - dag:/dev/dag0,0 would match LIBTRACE_CONF, LIBTRACE_CONF_DAG, LIBTRACE_CONF_DAG__DEV_DAG0_0
1705 * - test.erf would match LIBTRACE_CONF, LIBTRACE_CONF_ERF, LIBTRACE_CONF_ERF_TEST_ERF
1706 *
1707 * @note All environment variables names MUST only contian
1708 * [A-Z], [0-9] and [_] (underscore) and not start with a number. Any characters
1709 * outside of this range should be captilised if possible or replaced with an
1710 * underscore.
1711 */
1712static void parse_env_config (libtrace_t *libtrace) {
1713        char env_name[1024] = "LIBTRACE_CONF_";
1714        size_t len = strlen(env_name);
1715        size_t mark = 0;
1716        size_t i;
1717        char * env;
1718
1719        /* Make our compound string */
1720        strncpy(&env_name[len], libtrace->format->name, sizeof(env_name) - len);
1721        len += strlen(libtrace->format->name);
1722        strncpy(&env_name[len], ":", sizeof(env_name) - len);
1723        len += 1;
1724        strncpy(&env_name[len], libtrace->uridata, sizeof(env_name) - len);
1725
1726        /* env names are allowed to be A-Z (CAPS) 0-9 and _ */
1727        for (i = 0; env_name[i] != 0; ++i) {
1728                env_name[i] = toupper(env_name[i]);
1729                if(env_name[i] == ':') {
1730                        mark = i;
1731                }
1732                if (!( (env_name[i] >= 'A' && env_name[i] <= 'Z') ||
1733                       (env_name[i] >= '0' && env_name[i] <= '9') )) {
1734                        env_name[i] = '_';
1735                }
1736        }
1737
1738        /* First apply global env settings LIBTRACE_CONF */
1739        env = getenv("LIBTRACE_CONF");
1740        if (env)
1741        {
1742                printf("Got env %s", env);
1743                trace_set_configuration(libtrace, env);
1744        }
1745
1746        /* Then format settings LIBTRACE_CONF_<FORMAT> */
1747        if (mark != 0) {
1748                env_name[mark] = 0;
1749                env = getenv(env_name);
1750                if (env) {
1751                        trace_set_configuration(libtrace, env);
1752                }
1753                env_name[mark] = '_';
1754        }
1755
1756        /* Finally this specific trace LIBTRACE_CONF_<FORMAT_URI> */
1757        env = getenv(env_name);
1758        if (env) {
1759                trace_set_configuration(libtrace, env);
1760        }
1761}
1762
1763DLLEXPORT bool trace_is_parallel(libtrace_t * libtrace) {
1764        if (libtrace->state == STATE_NEW)
1765                return trace_supports_parallel(libtrace);
1766        return libtrace->pread == trace_pread_packet_wrapper;
1767}
1768
1769DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob,
1770                           libtrace_callback_set_t *per_packet_cbs,
1771                           libtrace_callback_set_t *reporter_cbs) {
1772        int i;
1773        int ret = -1;
1774        char name[24];
1775        sigset_t sig_before, sig_block_all;
1776        if (!libtrace) {
1777                fprintf(stderr, "NULL trace passed to trace_pstart()\n");
1778                return -1;
1779        }
1780
1781        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1782        if (trace_is_err(libtrace)) {
1783                goto cleanup_none;
1784        }
1785
1786        if (libtrace->state == STATE_PAUSED) {
1787                ret = trace_prestart(libtrace, global_blob, per_packet_cbs, 
1788                                reporter_cbs);
1789                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1790                return ret;
1791        }
1792
1793        if (libtrace->state != STATE_NEW) {
1794                trace_set_err(libtrace, TRACE_ERR_BAD_STATE, "trace_pstart "
1795                              "should be called on a NEW or PAUSED trace but "
1796                              "instead was called from %s",
1797                              get_trace_state_name(libtrace->state));
1798                goto cleanup_none;
1799        }
1800
1801        /* Store the user defined things against the trace */
1802        libtrace->global_blob = global_blob;
1803
1804        /* Save a copy of the callbacks in case the user tries to change them
1805         * on us later */
1806        if (!per_packet_cbs) {
1807                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "trace_pstart "
1808                                "requires a non-NULL set of per packet "
1809                                "callbacks.");
1810                goto cleanup_none;
1811        }
1812
1813        if (per_packet_cbs->message_packet == NULL) {
1814                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "The per "
1815                                "packet callbacks must include a handler "
1816                                "for a packet. Please set this using "
1817                                "trace_set_packet_cb().");
1818                goto cleanup_none;
1819        }
1820
1821        libtrace->perpkt_cbs = trace_create_callback_set();
1822        memcpy(libtrace->perpkt_cbs, per_packet_cbs, sizeof(libtrace_callback_set_t));
1823       
1824        if (reporter_cbs) {
1825                libtrace->reporter_cbs = trace_create_callback_set();
1826                memcpy(libtrace->reporter_cbs, reporter_cbs, sizeof(libtrace_callback_set_t));
1827        }
1828
1829       
1830
1831
1832        /* And zero other fields */
1833        for (i = 0; i < THREAD_STATE_MAX; ++i) {
1834                libtrace->perpkt_thread_states[i] = 0;
1835        }
1836        libtrace->first_packets.first = 0;
1837        libtrace->first_packets.count = 0;
1838        libtrace->first_packets.packets = NULL;
1839        libtrace->perpkt_threads = NULL;
1840        /* Set a global which says we are using a parallel trace. This is
1841         * for backwards compatibility due to changes when destroying packets */
1842        libtrace_parallel = 1;
1843
1844        /* Parses configuration passed through environment variables */
1845        parse_env_config(libtrace);
1846        verify_configuration(libtrace);
1847
1848        ret = -1;
1849        /* Try start the format - we prefer parallel over single threaded, as
1850         * these formats should support messages better */
1851
1852        if (trace_supports_parallel(libtrace) &&
1853            !trace_has_dedicated_hasher(libtrace)) {
1854                ret = libtrace->format->pstart_input(libtrace);
1855                libtrace->pread = trace_pread_packet_wrapper;
1856        }
1857        if (ret != 0) {
1858                if (libtrace->format->start_input) {
1859                        ret = libtrace->format->start_input(libtrace);
1860                }
1861                if (libtrace->perpkt_thread_count > 1) {
1862                        libtrace->pread = trace_pread_packet_first_in_first_served;
1863                        /* Don't wait for a burst of packets if the format is
1864                         * live as this could block ring based formats and
1865                         * introduces delay. */
1866                        if (libtrace->format->info.live) {
1867                                libtrace->config.burst_size = 1;
1868                        }
1869                }
1870                else {
1871                        /* Use standard read_packet */
1872                        libtrace->pread = NULL;
1873                }
1874        }
1875
1876        if (ret < 0) {
1877                goto cleanup_none;
1878        }
1879
1880        /* --- Start all the threads we need --- */
1881        /* Disable signals because it is inherited by the threads we start */
1882        sigemptyset(&sig_block_all);
1883        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_block_all, &sig_before), == 0);
1884
1885        /* If we need a hasher thread start it
1886         * Special Case: If single threaded we don't need a hasher
1887         */
1888        if (trace_has_dedicated_hasher(libtrace)) {
1889                libtrace->hasher_thread.type = THREAD_EMPTY;
1890                ret = trace_start_thread(libtrace, &libtrace->hasher_thread,
1891                                   THREAD_HASHER, hasher_entry, -1,
1892                                   "hasher-thread");
1893                if (ret != 0)
1894                        goto cleanup_started;
1895                libtrace->pread = trace_pread_packet_hasher_thread;
1896        } else {
1897                libtrace->hasher_thread.type = THREAD_EMPTY;
1898        }
1899
1900        /* Start up our perpkt threads */
1901        libtrace->perpkt_threads = calloc(sizeof(libtrace_thread_t),
1902                                          libtrace->perpkt_thread_count);
1903        if (!libtrace->perpkt_threads) {
1904                trace_set_err(libtrace, errno, "trace_pstart "
1905                              "failed to allocate memory.");
1906                goto cleanup_threads;
1907        }
1908        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1909                snprintf(name, sizeof(name), "perpkt-%d", i);
1910                libtrace_zero_thread(&libtrace->perpkt_threads[i]);
1911                ret = trace_start_thread(libtrace, &libtrace->perpkt_threads[i],
1912                                   THREAD_PERPKT, perpkt_threads_entry, i,
1913                                   name);
1914                if (ret != 0)
1915                        goto cleanup_threads;
1916        }
1917
1918        /* Start the reporter thread */
1919        if (reporter_cbs) {
1920                if (libtrace->combiner.initialise)
1921                        libtrace->combiner.initialise(libtrace, &libtrace->combiner);
1922                ret = trace_start_thread(libtrace, &libtrace->reporter_thread,
1923                                   THREAD_REPORTER, reporter_entry, -1,
1924                                   "reporter_thread");
1925                if (ret != 0)
1926                        goto cleanup_threads;
1927        }
1928
1929        /* Start the keepalive thread */
1930        if (libtrace->config.tick_interval > 0) {
1931                ret = trace_start_thread(libtrace, &libtrace->keepalive_thread,
1932                                   THREAD_KEEPALIVE, keepalive_entry, -1,
1933                                   "keepalive_thread");
1934                if (ret != 0)
1935                        goto cleanup_threads;
1936        }
1937
1938        /* Init other data structures */
1939        libtrace->perpkt_thread_states[THREAD_RUNNING] = libtrace->perpkt_thread_count;
1940        ASSERT_RET(pthread_spin_init(&libtrace->first_packets.lock, 0), == 0);
1941        libtrace->first_packets.packets = calloc(libtrace->perpkt_thread_count,
1942                                                 sizeof(*libtrace->first_packets.packets));
1943        if (libtrace->first_packets.packets == NULL) {
1944                trace_set_err(libtrace, errno, "trace_pstart "
1945                              "failed to allocate memory.");
1946                goto cleanup_threads;
1947        }
1948
1949        if (libtrace_ocache_init(&libtrace->packet_freelist,
1950                             (void* (*)()) trace_create_packet,
1951                             (void (*)(void *))trace_destroy_packet,
1952                             libtrace->config.thread_cache_size,
1953                             libtrace->config.cache_size * 4,
1954                             libtrace->config.fixed_count) != 0) {
1955                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "trace_pstart "
1956                              "failed to allocate ocache.");
1957                goto cleanup_threads;
1958        }
1959
1960        /* Threads don't start */
1961        libtrace->started = true;
1962        libtrace->startcount ++;
1963        libtrace_change_state(libtrace, STATE_RUNNING, false);
1964
1965        ret = 0;
1966        goto success;
1967cleanup_threads:
1968        if (libtrace->first_packets.packets) {
1969                free(libtrace->first_packets.packets);
1970                libtrace->first_packets.packets = NULL;
1971        }
1972        libtrace_change_state(libtrace, STATE_ERROR, false);
1973        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1974        if (libtrace->hasher_thread.type == THREAD_HASHER) {
1975                pthread_join(libtrace->hasher_thread.tid, NULL);
1976                libtrace_zero_thread(&libtrace->hasher_thread);
1977        }
1978
1979        if (libtrace->perpkt_threads) {
1980                for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1981                        if (libtrace->perpkt_threads[i].type == THREAD_PERPKT) {
1982                                pthread_join(libtrace->perpkt_threads[i].tid, NULL);
1983                                libtrace_zero_thread(&libtrace->perpkt_threads[i]);
1984                        } else break;
1985                }
1986                free(libtrace->perpkt_threads);
1987                libtrace->perpkt_threads = NULL;
1988        }
1989
1990        if (libtrace->reporter_thread.type == THREAD_REPORTER) {
1991                pthread_join(libtrace->reporter_thread.tid, NULL);
1992                libtrace_zero_thread(&libtrace->reporter_thread);
1993        }
1994
1995        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
1996                pthread_join(libtrace->keepalive_thread.tid, NULL);
1997                libtrace_zero_thread(&libtrace->keepalive_thread);
1998        }
1999        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
2000        libtrace_change_state(libtrace, STATE_NEW, false);
2001        if (libtrace->perpkt_thread_states[THREAD_RUNNING] != 0) {
2002                trace_set_err(libtrace, TRACE_ERR_THREAD, "Expected 0 running threads in trace_pstart()");
2003                return -1;
2004        }
2005        libtrace->perpkt_thread_states[THREAD_FINISHED] = 0;
2006cleanup_started:
2007        if (libtrace->pread == trace_pread_packet_wrapper) {
2008                if (libtrace->format->ppause_input)
2009                        libtrace->format->ppause_input(libtrace);
2010        } else {
2011                if (libtrace->format->pause_input)
2012                        libtrace->format->pause_input(libtrace);
2013        }
2014        ret = -1;
2015success:
2016        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_before, NULL), == 0);
2017cleanup_none:
2018        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2019        return ret;
2020}
2021
2022DLLEXPORT int trace_set_starting_cb(libtrace_callback_set_t *cbset,
2023                fn_cb_starting handler) {
2024        cbset->message_starting = handler;
2025        return 0;
2026}
2027
2028DLLEXPORT int trace_set_pausing_cb(libtrace_callback_set_t *cbset,
2029                fn_cb_dataless handler) {
2030        cbset->message_pausing = handler;
2031        return 0;
2032}
2033
2034DLLEXPORT int trace_set_resuming_cb(libtrace_callback_set_t *cbset,
2035                fn_cb_dataless handler) {
2036        cbset->message_resuming = handler;
2037        return 0;
2038}
2039
2040DLLEXPORT int trace_set_stopping_cb(libtrace_callback_set_t *cbset,
2041                fn_cb_dataless handler) {
2042        cbset->message_stopping = handler;
2043        return 0;
2044}
2045
2046DLLEXPORT int trace_set_packet_cb(libtrace_callback_set_t *cbset,
2047                fn_cb_packet handler) {
2048        cbset->message_packet = handler;
2049        return 0;
2050}
2051
2052DLLEXPORT int trace_set_meta_packet_cb(libtrace_callback_set_t *cbset,
2053                fn_cb_meta_packet handler) {
2054        cbset->message_meta_packet = handler;
2055        return 0;
2056}
2057
2058DLLEXPORT int trace_set_first_packet_cb(libtrace_callback_set_t *cbset,
2059                fn_cb_first_packet handler) {
2060        cbset->message_first_packet = handler;
2061        return 0;
2062}
2063
2064DLLEXPORT int trace_set_tick_count_cb(libtrace_callback_set_t *cbset,
2065                fn_cb_tick handler) {
2066        cbset->message_tick_count = handler;
2067        return 0;
2068}
2069
2070DLLEXPORT int trace_set_tick_interval_cb(libtrace_callback_set_t *cbset,
2071                fn_cb_tick handler) {
2072        cbset->message_tick_interval = handler;
2073        return 0;
2074}
2075
2076DLLEXPORT int trace_set_result_cb(libtrace_callback_set_t *cbset,
2077                fn_cb_result handler) {
2078        cbset->message_result = handler;
2079        return 0;
2080}
2081
2082DLLEXPORT int trace_set_user_message_cb(libtrace_callback_set_t *cbset,
2083                fn_cb_usermessage handler) {
2084        cbset->message_user = handler;
2085        return 0;
2086}
2087
2088/*
2089 * Pauses a trace, this should only be called by the main thread
2090 * 1. Set started = false
2091 * 2. All perpkt threads are paused waiting on a condition var
2092 * 3. Then call ppause on the underlying format if found
2093 * 4. The traces state is paused
2094 *
2095 * Once done you should be able to modify the trace setup and call pstart again
2096 * TODO add support to change the number of threads.
2097 */
2098DLLEXPORT int trace_ppause(libtrace_t *libtrace)
2099{
2100        libtrace_thread_t *t;
2101        int i;
2102        if (!libtrace) {
2103                fprintf(stderr, "NULL trace passed into trace_ppause()\n");
2104                return TRACE_ERR_NULL_TRACE;
2105        }
2106
2107        t = get_thread_table(libtrace);
2108        // Check state from within the lock if we are going to change it
2109        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
2110
2111        /* If we are already paused, just treat this as a NOOP */
2112        if (libtrace->state == STATE_PAUSED) {
2113                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2114                return 0;
2115        }
2116        if (!libtrace->started || libtrace->state != STATE_RUNNING) {
2117                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2118                trace_set_err(libtrace,TRACE_ERR_BAD_STATE, "You must call trace_start() before calling trace_ppause()");
2119                return -1;
2120        }
2121
2122        libtrace_change_state(libtrace, STATE_PAUSING, false);
2123        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2124
2125        // Special case handle the hasher thread case
2126        if (trace_has_dedicated_hasher(libtrace)) {
2127                if (libtrace->config.debug_state)
2128                        fprintf(stderr, "Hasher thread is running, asking it to pause ...");
2129                libtrace_message_t message = {0, {.uint64=0}, NULL};
2130                message.code = MESSAGE_DO_PAUSE;
2131                trace_message_thread(libtrace, &libtrace->hasher_thread, &message);
2132                // Wait for it to pause
2133                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
2134                while (libtrace->hasher_thread.state == THREAD_RUNNING) {
2135                        ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
2136                }
2137                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2138                if (libtrace->config.debug_state)
2139                        fprintf(stderr, " DONE\n");
2140        }
2141
2142        if (libtrace->config.debug_state)
2143                fprintf(stderr, "Asking perpkt threads to pause ...");
2144        // Stop threads, skip this one if it's a perpkt
2145        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2146                if (&libtrace->perpkt_threads[i] != t) {
2147                        libtrace_message_t message = {0, {.uint64=0}, NULL};
2148                        message.code = MESSAGE_DO_PAUSE;
2149                        ASSERT_RET(trace_message_thread(libtrace, &libtrace->perpkt_threads[i], &message), != -1);
2150                        if(trace_has_dedicated_hasher(libtrace)) {
2151                                // The hasher has stopped and other threads have messages waiting therefore
2152                                // If the queues are empty the other threads would have no data
2153                                // So send some message packets to simply ask the threads to check
2154                                // We are the only writer since hasher has paused
2155                                libtrace_packet_t *pkt;
2156                                libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &pkt, 1, 1);
2157                                pkt->error = READ_MESSAGE;
2158                                libtrace_ringbuffer_write(&libtrace->perpkt_threads[i].rbuffer, pkt);
2159                        }
2160                } else {
2161                        fprintf(stderr, "Mapper threads should not be used to pause a trace this could cause any number of problems!!\n");
2162                }
2163        }
2164
2165        if (t) {
2166                // A perpkt is doing the pausing, interesting, fake an extra thread paused
2167                // We rely on the user to *not* return before starting the trace again
2168                thread_change_state(libtrace, t, THREAD_PAUSED, true);
2169        }
2170
2171        // Wait for all threads to pause
2172        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
2173        while(libtrace->perpkt_thread_states[THREAD_RUNNING]) {
2174                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
2175        }
2176        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2177
2178        if (libtrace->config.debug_state)
2179                fprintf(stderr, " DONE\n");
2180
2181        // Deal with the reporter
2182        if (trace_has_reporter(libtrace)) {
2183                if (libtrace->config.debug_state)
2184                        fprintf(stderr, "Reporter thread is running, asking it to pause ...");
2185                if (pthread_equal(pthread_self(), libtrace->reporter_thread.tid)) {
2186                        libtrace->combiner.pause(libtrace, &libtrace->combiner);
2187                        thread_change_state(libtrace, &libtrace->reporter_thread, THREAD_PAUSED, true);
2188               
2189                } else {
2190                        libtrace_message_t message = {0, {.uint64=0}, NULL};
2191                        message.code = MESSAGE_DO_PAUSE;
2192                        trace_message_thread(libtrace, &libtrace->reporter_thread, &message);
2193                        // Wait for it to pause
2194                        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
2195                        while (libtrace->reporter_thread.state == THREAD_RUNNING) {
2196                                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
2197                        }
2198                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2199                }
2200                if (libtrace->config.debug_state)
2201                        fprintf(stderr, " DONE\n");
2202        }
2203
2204        /* Cache values before we pause */
2205        if (libtrace->stats == NULL)
2206                libtrace->stats = trace_create_statistics();
2207        // Save the statistics against the trace
2208        trace_get_statistics(libtrace, NULL);
2209        if (trace_is_parallel(libtrace)) {
2210                libtrace->started = false;
2211                if (libtrace->format->ppause_input)
2212                        libtrace->format->ppause_input(libtrace);
2213                // TODO What happens if we don't have pause input??
2214        } else {
2215                int err;
2216                err = trace_pause(libtrace);
2217                // We should handle this a bit better
2218                if (err)
2219                        return err;
2220        }
2221
2222        // Only set as paused after the pause has been called on the trace
2223        libtrace_change_state(libtrace, STATE_PAUSED, true);
2224        return 0;
2225}
2226
2227/**
2228 * Stop trace finish prematurely as though it meet an EOF
2229 * This should only be called by the main thread
2230 * 1. Calls ppause
2231 * 2. Sends a message asking for threads to finish
2232 * 3. Releases threads which will pause
2233 */
2234DLLEXPORT int trace_pstop(libtrace_t *libtrace)
2235{
2236        int i, err;
2237        libtrace_message_t message = {0, {.uint64=0}, NULL};
2238        if (!libtrace) {
2239                fprintf(stderr, "NULL trace passed into trace_pstop()\n");
2240                return TRACE_ERR_NULL_TRACE;
2241        }
2242
2243        // Ensure all threads have paused and the underlying trace format has
2244        // been closed and all packets associated are cleaned up
2245        // Pause will do any state checks for us
2246        err = trace_ppause(libtrace);
2247        if (err)
2248                return err;
2249
2250        // Now send a message asking the threads to stop
2251        // This will be retrieved before trying to read another packet
2252        message.code = MESSAGE_DO_STOP;
2253        trace_message_perpkts(libtrace, &message);
2254        if (trace_has_dedicated_hasher(libtrace))
2255                trace_message_thread(libtrace, &libtrace->hasher_thread, &message);
2256
2257        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2258                trace_message_thread(libtrace, &libtrace->perpkt_threads[i], &message);
2259        }
2260
2261        /* Now release the threads and let them stop - when the threads finish
2262         * the state will be set to finished */
2263        libtrace_change_state(libtrace, STATE_FINISHING, true);
2264        return 0;
2265}
2266
2267DLLEXPORT int trace_set_hasher(libtrace_t *trace, enum hasher_types type, fn_hasher hasher, void *data) {
2268        int ret = -1;
2269        if ((type == HASHER_CUSTOM && !hasher) || (type == HASHER_BALANCE && hasher)) {
2270                return -1;
2271        }
2272
2273        // Save the requirements
2274        trace->hasher_type = type;
2275        if (hasher) {
2276                if (trace->hasher_owner == HASH_OWNED_LIBTRACE) {
2277                        if (trace->hasher_data) {
2278                                free(trace->hasher_data);
2279                        }
2280                }
2281                trace->hasher = hasher;
2282                trace->hasher_data = data;
2283                trace->hasher_owner = HASH_OWNED_EXTERNAL;
2284        } else {
2285                trace->hasher = NULL;
2286                trace->hasher_data = NULL;
2287                trace->hasher_owner = HASH_OWNED_LIBTRACE;
2288        }
2289
2290        // Try push this to hardware - NOTE hardware could do custom if
2291        // there is a more efficient way to apply it, in this case
2292        // it will simply grab the function out of libtrace_t
2293        if (trace_supports_parallel(trace) && trace->format->config_input)
2294                ret = trace->format->config_input(trace, TRACE_OPTION_HASHER, &type);
2295
2296        if (ret == -1) {
2297                /* We have to deal with this ourself */
2298                if (!hasher) {
2299                        switch (type)
2300                        {
2301                                case HASHER_CUSTOM:
2302                                case HASHER_BALANCE:
2303                                        return 0;
2304                                case HASHER_BIDIRECTIONAL:
2305                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
2306                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
2307                                        toeplitz_init_config(trace->hasher_data, 1);
2308                                        return 0;
2309                                case HASHER_UNIDIRECTIONAL:
2310                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
2311                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
2312                                        toeplitz_init_config(trace->hasher_data, 0);
2313                                        return 0;
2314                        }
2315                        return -1;
2316                }
2317        } else {
2318                /* If the hasher is hardware we zero out the hasher and hasher
2319                 * data fields - only if we need a hasher do we do this */
2320                trace->hasher = NULL;
2321                trace->hasher_data = NULL;
2322        }
2323
2324        return 0;
2325}
2326
2327// Waits for all threads to finish
2328DLLEXPORT void trace_join(libtrace_t *libtrace) {
2329        int i;
2330
2331        /* Firstly wait for the perpkt threads to finish, since these are
2332         * user controlled */
2333        for (i=0; i< libtrace->perpkt_thread_count; i++) {
2334                ASSERT_RET(pthread_join(libtrace->perpkt_threads[i].tid, NULL), == 0);
2335                // So we must do our best effort to empty the queue - so
2336                // the producer (or any other threads) don't block.
2337                libtrace_packet_t * packet;
2338                if (libtrace->perpkt_threads[i].state != THREAD_FINISHED) {
2339                        trace_set_err(libtrace, TRACE_ERR_THREAD_STATE,
2340                                "Expected processing thread state to be THREAD_FINISHED in trace_join()");
2341                        return;
2342                }
2343                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
2344                        if (packet) // This could be NULL iff the perpkt finishes early
2345                                trace_destroy_packet(packet);
2346        }
2347
2348        /* Now the hasher */
2349        if (trace_has_dedicated_hasher(libtrace)) {
2350                pthread_join(libtrace->hasher_thread.tid, NULL);
2351                if (libtrace->hasher_thread.state != THREAD_FINISHED) {
2352                        trace_set_err(libtrace, TRACE_ERR_THREAD_STATE,
2353                                "Expected hasher thread state to be THREAD_FINISHED in trace_join()");
2354                        return;
2355                }
2356        }
2357
2358        // Now that everything is finished nothing can be touching our
2359        // buffers so clean them up
2360        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2361                // Its possible 1 packet got added by the reporter (or 1 per any other thread) since we cleaned up
2362                // if they lost timeslice before-during a write
2363                libtrace_packet_t * packet;
2364                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
2365                        trace_destroy_packet(packet);
2366                if (trace_has_dedicated_hasher(libtrace)) {
2367                        if (!libtrace_ringbuffer_is_empty(&libtrace->perpkt_threads[i].rbuffer)) {
2368                                trace_set_err(libtrace, TRACE_ERR_THREAD,
2369                                        "Expected processing threads ring buffers to be empty in trace_join()");
2370                                return;
2371                        }
2372                        libtrace_ringbuffer_destroy(&libtrace->perpkt_threads[i].rbuffer);
2373                }
2374                // Cannot destroy vector yet, this happens with trace_destroy
2375        }
2376
2377        if (trace_has_reporter(libtrace)) {
2378                pthread_join(libtrace->reporter_thread.tid, NULL);
2379                if (libtrace->reporter_thread.state != THREAD_FINISHED) {
2380                        trace_set_err(libtrace, TRACE_ERR_THREAD_STATE,
2381                                "Expected reporting thread state to be THREAD_FINISHED in trace_join()");
2382                        return;
2383                }
2384        }
2385
2386        // Wait for the tick (keepalive) thread if it has been started
2387        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
2388                libtrace_message_t msg = {0, {.uint64=0}, NULL};
2389                msg.code = MESSAGE_DO_STOP;
2390                trace_message_thread(libtrace, &libtrace->keepalive_thread, &msg);
2391                pthread_join(libtrace->keepalive_thread.tid, NULL);
2392        }
2393
2394        libtrace_change_state(libtrace, STATE_JOINED, true);
2395        print_memory_stats();
2396}
2397
2398DLLEXPORT int libtrace_thread_get_message_count(libtrace_t * libtrace,
2399                                                libtrace_thread_t *t)
2400{
2401        int ret;
2402        if (t == NULL)
2403                t = get_thread_descriptor(libtrace);
2404        if (t == NULL)
2405                return -1;
2406        ret = libtrace_message_queue_count(&t->messages);
2407        return ret < 0 ? 0 : ret;
2408}
2409
2410DLLEXPORT int libtrace_thread_get_message(libtrace_t * libtrace,
2411                                          libtrace_thread_t *t,
2412                                          libtrace_message_t * message)
2413{
2414        int ret;
2415        if (t == NULL)
2416                t = get_thread_descriptor(libtrace);
2417        if (t == NULL)
2418                return -1;
2419        ret = libtrace_message_queue_get(&t->messages, message);
2420        return ret < 0 ? 0 : ret;
2421}
2422
2423DLLEXPORT int libtrace_thread_try_get_message(libtrace_t * libtrace,
2424                                              libtrace_thread_t *t,
2425                                              libtrace_message_t * message)
2426{
2427        if (t == NULL)
2428                t = get_thread_descriptor(libtrace);
2429        if (t == NULL)
2430                return -1;
2431        if (libtrace_message_queue_try_get(&t->messages, message) != LIBTRACE_MQ_FAILED)
2432                return 0;
2433        else
2434                return -1;
2435}
2436
2437DLLEXPORT int trace_message_thread(libtrace_t * libtrace, libtrace_thread_t *t, libtrace_message_t * message)
2438{
2439        int ret;
2440        if (!message->sender)
2441                message->sender = get_thread_descriptor(libtrace);
2442
2443        ret = libtrace_message_queue_put(&t->messages, message);
2444        return ret < 0 ? 0 : ret;
2445}
2446
2447DLLEXPORT int trace_message_reporter(libtrace_t * libtrace, libtrace_message_t * message)
2448{
2449        if (!trace_has_reporter(libtrace) ||
2450            !(libtrace->reporter_thread.state == THREAD_RUNNING
2451              || libtrace->reporter_thread.state == THREAD_PAUSED))
2452                return -1;
2453
2454        return trace_message_thread(libtrace, &libtrace->reporter_thread, message);
2455}
2456
2457DLLEXPORT int trace_post_reporter(libtrace_t *libtrace)
2458{
2459        libtrace_message_t message = {0, {.uint64=0}, NULL};
2460        message.code = MESSAGE_POST_REPORTER;
2461        return trace_message_reporter(libtrace, (void *) &message);
2462}
2463
2464DLLEXPORT int trace_message_perpkts(libtrace_t * libtrace, libtrace_message_t * message)
2465{
2466        int i;
2467        int missed = 0;
2468        if (message->sender == NULL)
2469                message->sender = get_thread_descriptor(libtrace);
2470        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2471                if (libtrace->perpkt_threads[i].state == THREAD_RUNNING ||
2472                    libtrace->perpkt_threads[i].state == THREAD_PAUSED) {
2473                        libtrace_message_queue_put(&libtrace->perpkt_threads[i].messages, message);
2474                } else {
2475                        missed += 1;
2476                }
2477        }
2478        return -missed;
2479}
2480
2481/**
2482 * Publishes a result to the reduce queue
2483 * Should only be called by a perpkt thread, i.e. from a perpkt handler
2484 */
2485DLLEXPORT void trace_publish_result(libtrace_t *libtrace, libtrace_thread_t *t, uint64_t key, libtrace_generic_t value, int type) {
2486        libtrace_result_t res;
2487        res.type = type;
2488        res.key = key;
2489        res.value = value;
2490        if (!libtrace->combiner.publish) {
2491                fprintf(stderr, "Combiner has no publish method -- can not publish results!\n");
2492                return;
2493        }
2494        libtrace->combiner.publish(libtrace, t->perpkt_num, &libtrace->combiner, &res);
2495        return;
2496}
2497
2498DLLEXPORT void trace_set_combiner(libtrace_t *trace, const libtrace_combine_t *combiner, libtrace_generic_t config){
2499        if (combiner) {
2500                trace->combiner = *combiner;
2501                trace->combiner.configuration = config;
2502        } else {
2503                // No combiner, so don't try use it
2504                memset(&trace->combiner, 0, sizeof(trace->combiner));
2505        }
2506}
2507
2508DLLEXPORT uint64_t trace_packet_get_order(libtrace_packet_t * packet) {
2509        return packet->order;
2510}
2511
2512DLLEXPORT uint64_t trace_packet_get_hash(libtrace_packet_t * packet) {
2513        return packet->hash;
2514}
2515
2516DLLEXPORT void trace_packet_set_order(libtrace_packet_t * packet, uint64_t order) {
2517        packet->order = order;
2518}
2519
2520DLLEXPORT void trace_packet_set_hash(libtrace_packet_t * packet, uint64_t hash) {
2521        packet->hash = hash;
2522}
2523
2524DLLEXPORT bool trace_has_finished(libtrace_t * libtrace) {
2525        return libtrace->state == STATE_FINISHED || libtrace->state == STATE_JOINED;
2526}
2527
2528/**
2529 * @return True if the trace is not running such that it can be configured
2530 */
2531static inline bool trace_is_configurable(libtrace_t *trace) {
2532        return trace->state == STATE_NEW ||
2533                        trace->state == STATE_PAUSED;
2534}
2535
2536DLLEXPORT int trace_set_perpkt_threads(libtrace_t *trace, int nb) {
2537        // Only supported on new traces not paused traces
2538        if (trace->state != STATE_NEW) return -1;
2539
2540        /* TODO consider allowing an offset from the total number of cores i.e.
2541         * -1 reserve 1 core */
2542        if (nb >= 0) {
2543                trace->config.perpkt_threads = nb;
2544                return 0;
2545        } else {
2546                return -1;
2547        }
2548}
2549
2550DLLEXPORT int trace_set_tick_interval(libtrace_t *trace, size_t millisec) {
2551        if (!trace_is_configurable(trace)) return -1;
2552
2553        trace->config.tick_interval = millisec;
2554        return 0;
2555}
2556
2557DLLEXPORT int trace_set_tick_count(libtrace_t *trace, size_t count) {
2558        if (!trace_is_configurable(trace)) return -1;
2559
2560        trace->config.tick_count = count;
2561        return 0;
2562}
2563
2564DLLEXPORT int trace_set_tracetime(libtrace_t *trace, bool tracetime) {
2565        if (!trace_is_configurable(trace)) return -1;
2566
2567        trace->tracetime = tracetime;
2568        return 0;
2569}
2570
2571DLLEXPORT int trace_set_cache_size(libtrace_t *trace, size_t size) {
2572        if (!trace_is_configurable(trace)) return -1;
2573
2574        trace->config.cache_size = size;
2575        return 0;
2576}
2577
2578DLLEXPORT int trace_set_thread_cache_size(libtrace_t *trace, size_t size) {
2579        if (!trace_is_configurable(trace)) return -1;
2580
2581        trace->config.thread_cache_size = size;
2582        return 0;
2583}
2584
2585DLLEXPORT int trace_set_fixed_count(libtrace_t *trace, bool fixed) {
2586        if (!trace_is_configurable(trace)) return -1;
2587
2588        trace->config.fixed_count = fixed;
2589        return 0;
2590}
2591
2592DLLEXPORT int trace_set_burst_size(libtrace_t *trace, size_t size) {
2593        if (!trace_is_configurable(trace)) return -1;
2594
2595        trace->config.burst_size = size;
2596        return 0;
2597}
2598
2599DLLEXPORT int trace_set_hasher_queue_size(libtrace_t *trace, size_t size) {
2600        if (!trace_is_configurable(trace)) return -1;
2601
2602        trace->config.hasher_queue_size = size;
2603        return 0;
2604}
2605
2606DLLEXPORT int trace_set_hasher_polling(libtrace_t *trace, bool polling) {
2607        if (!trace_is_configurable(trace)) return -1;
2608
2609        trace->config.hasher_polling = polling;
2610        return 0;
2611}
2612
2613DLLEXPORT int trace_set_reporter_polling(libtrace_t *trace, bool polling) {
2614        if (!trace_is_configurable(trace)) return -1;
2615
2616        trace->config.reporter_polling = polling;
2617        return 0;
2618}
2619
2620DLLEXPORT int trace_set_reporter_thold(libtrace_t *trace, size_t thold) {
2621        if (!trace_is_configurable(trace)) return -1;
2622
2623        trace->config.reporter_thold = thold;
2624        return 0;
2625}
2626
2627DLLEXPORT int trace_set_debug_state(libtrace_t *trace, bool debug_state) {
2628        if (!trace_is_configurable(trace)) return -1;
2629
2630        trace->config.debug_state = debug_state;
2631        return 0;
2632}
2633
2634static bool config_bool_parse(char *value, size_t nvalue) {
2635        if (strncmp(value, "true", nvalue) == 0)
2636                return true;
2637        else if (strncmp(value, "false", nvalue) == 0)
2638                return false;
2639        else
2640                return strtoll(value, NULL, 10) != 0;
2641}
2642
2643/* Note update documentation on trace_set_configuration */
2644static void config_string(struct user_configuration *uc, char *key, size_t nkey, char *value, size_t nvalue) {
2645        if (!key) {
2646                fprintf(stderr, "NULL key passed to config_string()\n");
2647                return;
2648        }
2649        if (!value) {
2650                fprintf(stderr, "NULL value passed to config_string()\n");
2651                return;
2652        }
2653        if (!uc) {
2654                fprintf(stderr, "NULL uc (user_configuration) passed to config_string()\n");
2655                return;
2656        }
2657        if (strncmp(key, "cache_size", nkey) == 0
2658            || strncmp(key, "cs", nkey) == 0) {
2659                uc->cache_size = strtoll(value, NULL, 10);
2660        } else if (strncmp(key, "thread_cache_size", nkey) == 0
2661                   || strncmp(key, "tcs", nkey) == 0) {
2662                uc->thread_cache_size = strtoll(value, NULL, 10);
2663        } else if (strncmp(key, "fixed_count", nkey) == 0
2664                   || strncmp(key, "fc", nkey) == 0) {
2665                uc->fixed_count = config_bool_parse(value, nvalue);
2666        } else if (strncmp(key, "burst_size", nkey) == 0
2667                   || strncmp(key, "bs", nkey) == 0) {
2668                uc->burst_size = strtoll(value, NULL, 10);
2669        } else if (strncmp(key, "tick_interval", nkey) == 0
2670                   || strncmp(key, "ti", nkey) == 0) {
2671                uc->tick_interval = strtoll(value, NULL, 10);
2672        } else if (strncmp(key, "tick_count", nkey) == 0
2673                   || strncmp(key, "tc", nkey) == 0) {
2674                uc->tick_count = strtoll(value, NULL, 10);
2675        } else if (strncmp(key, "perpkt_threads", nkey) == 0
2676                   || strncmp(key, "pt", nkey) == 0) {
2677                uc->perpkt_threads = strtoll(value, NULL, 10);
2678        } else if (strncmp(key, "hasher_queue_size", nkey) == 0
2679                   || strncmp(key, "hqs", nkey) == 0) {
2680                uc->hasher_queue_size = strtoll(value, NULL, 10);
2681        } else if (strncmp(key, "hasher_polling", nkey) == 0
2682                   || strncmp(key, "hp", nkey) == 0) {
2683                uc->hasher_polling = config_bool_parse(value, nvalue);
2684        } else if (strncmp(key, "reporter_polling", nkey) == 0
2685                   || strncmp(key, "rp", nkey) == 0) {
2686                uc->reporter_polling = config_bool_parse(value, nvalue);
2687        } else if (strncmp(key, "reporter_thold", nkey) == 0
2688                   || strncmp(key, "rt", nkey) == 0) {
2689                uc->reporter_thold = strtoll(value, NULL, 10);
2690        } else if (strncmp(key, "debug_state", nkey) == 0
2691                   || strncmp(key, "ds", nkey) == 0) {
2692                uc->debug_state = config_bool_parse(value, nvalue);
2693        } else {
2694                fprintf(stderr, "No matching option %s(=%s), ignoring\n", key, value);
2695        }
2696}
2697
2698DLLEXPORT int trace_set_configuration(libtrace_t *trace, const char *str) {
2699        char *pch;
2700        char key[100];
2701        char value[100];
2702        char *dup;
2703        if (!trace) {
2704                fprintf(stderr, "NULL trace passed into trace_set_configuration()\n");
2705                return TRACE_ERR_NULL_TRACE;
2706        }
2707        if (!str) {
2708                trace_set_err(trace, TRACE_ERR_CONFIG, "NULL configuration string passed to trace_set_configuration()");
2709                return -1;
2710        }
2711
2712        if (!trace_is_configurable(trace)) return -1;
2713
2714        dup = strdup(str);
2715        pch = strtok (dup," ,.-");
2716        while (pch != NULL)
2717        {
2718                if (sscanf(pch, "%99[^=]=%99s", key, value) == 2) {
2719                        config_string(&trace->config, key, sizeof(key), value, sizeof(value));
2720                } else {
2721                        fprintf(stderr, "Error: parsing option %s\n", pch);
2722                }
2723                pch = strtok (NULL," ,.-");
2724        }
2725        free(dup);
2726
2727        return 0;
2728}
2729
2730DLLEXPORT int trace_set_configuration_file(libtrace_t *trace, FILE *file) {
2731        char line[1024];
2732        if (!trace_is_configurable(trace)) return -1;
2733
2734        while (fgets(line, sizeof(line), file) != NULL)
2735        {
2736                trace_set_configuration(trace, line);
2737        }
2738
2739        if(ferror(file))
2740                return -1;
2741        else
2742                return 0;
2743}
2744
2745DLLEXPORT void trace_free_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
2746        if (!packet) {
2747                trace_set_err(libtrace, TRACE_ERR_NULL_PACKET,
2748                        "NULL packet passed to trace_free_packet()");
2749                return;
2750        }
2751        /* Always release any resources this might be holding */
2752        trace_fin_packet(packet);
2753        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packet, 1, 1);
2754}
2755
2756DLLEXPORT void trace_increment_packet_refcount(libtrace_packet_t *packet) {
2757        pthread_mutex_lock(&(packet->ref_lock));
2758        if (packet->refcount < 0) {
2759                packet->refcount = 1;
2760        } else {
2761                packet->refcount ++;
2762        }
2763        pthread_mutex_unlock(&(packet->ref_lock));
2764}
2765
2766DLLEXPORT void trace_decrement_packet_refcount(libtrace_packet_t *packet) {
2767        pthread_mutex_lock(&(packet->ref_lock));
2768        packet->refcount --;
2769
2770        if (packet->refcount <= 0) {
2771                trace_free_packet(packet->trace, packet);
2772        }
2773        pthread_mutex_unlock(&(packet->ref_lock));
2774}
2775
2776
2777DLLEXPORT libtrace_info_t *trace_get_information(libtrace_t * libtrace) {
2778        if (libtrace->format)
2779                return &libtrace->format->info;
2780        else
2781                pthread_exit(NULL);
2782}
Note: See TracBrowser for help on using the repository browser.