source: lib/trace_parallel.c @ dea08f1

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivendag_formatrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since dea08f1 was dea08f1, checked in by Shane Alcock <salcock@…>, 4 years ago

Fix wandio race conditions

libwandio is not thread-safe, so file-based formats need to
be careful about when try to operate on the file.

This should fix most of the issues with crashes, segfaults, etc.
when reading from files (including most of our parallel file test
cases!)

  • Property mode set to 100644
File size: 82.8 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27#define _GNU_SOURCE
28#include "common.h"
29#include "config.h"
30#include <assert.h>
31#include <errno.h>
32#include <fcntl.h>
33#include <stdio.h>
34#include <stdlib.h>
35#include <string.h>
36#include <sys/stat.h>
37#include <sys/types.h>
38#ifndef WIN32
39#include <sys/socket.h>
40#endif
41#include <stdarg.h>
42#include <sys/param.h>
43
44#ifdef HAVE_LIMITS_H
45#  include <limits.h>
46#endif
47
48#ifdef HAVE_SYS_LIMITS_H
49#  include <sys/limits.h>
50#endif
51
52#ifdef HAVE_NET_IF_ARP_H
53#  include <net/if_arp.h>
54#endif
55
56#ifdef HAVE_NET_IF_H
57#  include <net/if.h>
58#endif
59
60#ifdef HAVE_NETINET_IN_H
61#  include <netinet/in.h>
62#endif
63
64#ifdef HAVE_NET_ETHERNET_H
65#  include <net/ethernet.h>
66#endif
67
68#ifdef HAVE_NETINET_IF_ETHER_H
69#  include <netinet/if_ether.h>
70#endif
71
72#include <time.h>
73#ifdef WIN32
74#include <sys/timeb.h>
75#endif
76
77#include "libtrace.h"
78#include "libtrace_parallel.h"
79
80#ifdef HAVE_NET_BPF_H
81#  include <net/bpf.h>
82#else
83#  ifdef HAVE_PCAP_BPF_H
84#    include <pcap-bpf.h>
85#  endif
86#endif
87
88
89#include "libtrace_int.h"
90#include "format_helper.h"
91#include "rt_protocol.h"
92#include "hash_toeplitz.h"
93
94#include <pthread.h>
95#include <signal.h>
96#include <unistd.h>
97#include <ctype.h>
98
99static inline int delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t);
100extern int libtrace_parallel;
101
102struct mem_stats {
103        struct memfail {
104           uint64_t cache_hit;
105           uint64_t ring_hit;
106           uint64_t miss;
107           uint64_t recycled;
108        } readbulk, read, write, writebulk;
109};
110
111
112#ifdef ENABLE_MEM_STATS
113// Grrr gcc wants this spelt out
114__thread struct mem_stats mem_hits = {{0},{0},{0},{0}};
115
116
117static void print_memory_stats() {
118        uint64_t total;
119#if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__)
120        char t_name[50];
121        pthread_getname_np(pthread_self(), t_name, sizeof(t_name));
122
123        fprintf(stderr, "Thread ID#%d - %s\n", (int) pthread_self(), t_name);
124#else
125        fprintf(stderr, "Thread ID#%d\n", (int) pthread_self());
126#endif
127
128        total = mem_hits.read.cache_hit + mem_hits.read.ring_hit + mem_hits.read.miss;
129        if (total) {
130                fprintf(stderr, "\tRead:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
131                                mem_hits.read.cache_hit, mem_hits.read.ring_hit, mem_hits.read.miss, mem_hits.read.recycled);
132                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
133                                total, (double) mem_hits.read.miss / (double) total * 100.0);
134        }
135
136        total = mem_hits.readbulk.cache_hit + mem_hits.readbulk.ring_hit + mem_hits.readbulk.miss;
137        if (total) {
138                fprintf(stderr, "\tReadbulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
139                                mem_hits.readbulk.cache_hit, mem_hits.readbulk.ring_hit, mem_hits.readbulk.miss, mem_hits.readbulk.recycled);
140
141
142                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
143                                total, (double) mem_hits.readbulk.miss / (double) total * 100.0);
144        }
145
146        total = mem_hits.write.cache_hit + mem_hits.write.ring_hit + mem_hits.write.miss;
147        if (total) {
148                fprintf(stderr, "\tWrite:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
149                                mem_hits.write.cache_hit, mem_hits.write.ring_hit, mem_hits.write.miss, mem_hits.write.recycled);
150
151                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
152                                total, (double) mem_hits.write.miss / (double) total * 100.0);
153        }
154
155        total = mem_hits.writebulk.cache_hit + mem_hits.writebulk.ring_hit + mem_hits.writebulk.miss;
156        if (total) {
157                fprintf(stderr, "\tWritebulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
158                                mem_hits.writebulk.cache_hit, mem_hits.writebulk.ring_hit, mem_hits.writebulk.miss, mem_hits.writebulk.recycled);
159
160                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
161                                total, (double) mem_hits.writebulk.miss / (double) total * 100.0);
162        }
163}
164#else
165static void print_memory_stats() {}
166#endif
167
168static const libtrace_generic_t gen_zero = {0};
169
170/* This should optimise away the switch to nothing in the explict cases */
171inline void send_message(libtrace_t *trace, libtrace_thread_t *thread,
172                const enum libtrace_messages type,
173                libtrace_generic_t data, libtrace_thread_t *sender) {
174
175        fn_cb_dataless fn = NULL;
176        enum libtrace_messages switchtype;
177        libtrace_callback_set_t *cbs = NULL;
178
179        if (thread == &trace->reporter_thread) {
180                cbs = trace->reporter_cbs;
181        } else {
182                cbs = trace->perpkt_cbs;
183        }
184
185        if (cbs == NULL)
186                return;
187
188        if (type >= MESSAGE_USER)
189                switchtype = MESSAGE_USER;
190        else
191                switchtype = (enum libtrace_messages) type;
192
193        switch (switchtype) {
194        case MESSAGE_STARTING:
195                if (cbs->message_starting)
196                        thread->user_data = (*cbs->message_starting)(trace,
197                                        thread, trace->global_blob);
198                return;
199        case MESSAGE_FIRST_PACKET:
200                if (cbs->message_first_packet)
201                                (*cbs->message_first_packet)(trace, thread,
202                                trace->global_blob, thread->user_data,
203                                sender);
204                return;
205        case MESSAGE_TICK_COUNT:
206                if (cbs->message_tick_count)
207                        (*cbs->message_tick_count)(trace, thread,
208                                        trace->global_blob, thread->user_data,
209                                        data.uint64);
210                return;
211        case MESSAGE_TICK_INTERVAL:
212                if (cbs->message_tick_interval)
213                        (*cbs->message_tick_interval)(trace, thread,
214                                        trace->global_blob, thread->user_data,
215                                        data.uint64);
216                return;
217        case MESSAGE_STOPPING:
218                fn = cbs->message_stopping;
219                break;
220        case MESSAGE_RESUMING:
221                fn = cbs->message_resuming;
222                break;
223        case MESSAGE_PAUSING:
224                fn = cbs->message_pausing;
225                break;
226        case MESSAGE_USER:
227                if (cbs->message_user)
228                        (*cbs->message_user)(trace, thread, trace->global_blob,
229                                        thread->user_data, type, data, sender);
230                return;
231        case MESSAGE_RESULT:
232                if (cbs->message_result)
233                        (*cbs->message_result)(trace, thread,
234                                        trace->global_blob, thread->user_data,
235                                        data.res);
236                return;
237
238        /* These should be unused */
239        case MESSAGE_DO_PAUSE:
240        case MESSAGE_DO_STOP:
241        case MESSAGE_POST_REPORTER:
242        case MESSAGE_PACKET:
243                return;
244        }
245
246        if (fn)
247                (*fn)(trace, thread, trace->global_blob, thread->user_data);
248}
249
250DLLEXPORT void trace_destroy_callback_set(libtrace_callback_set_t *cbset) {
251        free(cbset);
252}
253
254DLLEXPORT libtrace_callback_set_t *trace_create_callback_set() {
255        libtrace_callback_set_t *cbset;
256
257        cbset = (libtrace_callback_set_t *)malloc(sizeof(libtrace_callback_set_t));
258        memset(cbset, 0, sizeof(libtrace_callback_set_t));
259        return cbset;
260}
261
262/*
263 * This can be used once the hasher thread has been started and internally after
264 * verify_configuration.
265 */
266DLLEXPORT bool trace_has_dedicated_hasher(libtrace_t * libtrace)
267{
268        return libtrace->hasher_thread.type == THREAD_HASHER;
269}
270
271DLLEXPORT bool trace_has_reporter(libtrace_t * libtrace)
272{
273        assert(libtrace->state != STATE_NEW);
274        return libtrace->reporter_thread.type == THREAD_REPORTER && libtrace->reporter_cbs;
275}
276
277/**
278 * When running the number of perpkt threads in use.
279 * TODO what if the trace is not running yet, or has finished??
280 *
281 * @brief libtrace_perpkt_thread_nb
282 * @param t The trace
283 * @return
284 */
285DLLEXPORT int trace_get_perpkt_threads(libtrace_t * t) {
286        return t->perpkt_thread_count;
287}
288
289/**
290 * Changes the overall traces state and signals the condition.
291 *
292 * @param trace A pointer to the trace
293 * @param new_state The new state of the trace
294 * @param need_lock Set to true if libtrace_lock is not held, otherwise
295 *        false in the case the lock is currently held by this thread.
296 */
297static inline void libtrace_change_state(libtrace_t *trace,
298        const enum trace_state new_state, const bool need_lock)
299{
300        UNUSED enum trace_state prev_state;
301        if (need_lock)
302                pthread_mutex_lock(&trace->libtrace_lock);
303        prev_state = trace->state;
304        trace->state = new_state;
305
306        if (trace->config.debug_state)
307                fprintf(stderr, "Trace(%s) state changed from %s to %s\n",
308                        trace->uridata, get_trace_state_name(prev_state),
309                        get_trace_state_name(trace->state));
310
311        pthread_cond_broadcast(&trace->perpkt_cond);
312        if (need_lock)
313                pthread_mutex_unlock(&trace->libtrace_lock);
314}
315
316/**
317 * Changes a thread's state and broadcasts the condition variable. This
318 * should always be done when the lock is held.
319 *
320 * Additionally for perpkt threads the state counts are updated.
321 *
322 * @param trace A pointer to the trace
323 * @param t A pointer to the thread to modify
324 * @param new_state The new state of the thread
325 * @param need_lock Set to true if libtrace_lock is not held, otherwise
326 *        false in the case the lock is currently held by this thread.
327 */
328static inline void thread_change_state(libtrace_t *trace, libtrace_thread_t *t,
329        const enum thread_states new_state, const bool need_lock)
330{
331        enum thread_states prev_state;
332        if (need_lock)
333                pthread_mutex_lock(&trace->libtrace_lock);
334        prev_state = t->state;
335        t->state = new_state;
336        if (t->type == THREAD_PERPKT) {
337                --trace->perpkt_thread_states[prev_state];
338                ++trace->perpkt_thread_states[new_state];
339        }
340
341        if (trace->config.debug_state)
342                fprintf(stderr, "Thread %d state changed from %d to %d\n",
343                        (int) t->tid, prev_state, t->state);
344
345        if (trace->perpkt_thread_states[THREAD_FINISHED] == trace->perpkt_thread_count)
346                libtrace_change_state(trace, STATE_FINISHED, false);
347
348        pthread_cond_broadcast(&trace->perpkt_cond);
349        if (need_lock)
350                pthread_mutex_unlock(&trace->libtrace_lock);
351}
352
353/**
354 * This is valid once a trace is initialised
355 *
356 * @return True if the format supports parallel threads.
357 */
358static inline bool trace_supports_parallel(libtrace_t *trace)
359{
360        assert(trace);
361        assert(trace->format);
362        if (trace->format->pstart_input)
363                return true;
364        else
365                return false;
366}
367
368void libtrace_zero_thread(libtrace_thread_t * t) {
369        t->accepted_packets = 0;
370        t->filtered_packets = 0;
371        t->recorded_first = false;
372        t->tracetime_offset_usec = 0;
373        t->user_data = 0;
374        t->format_data = 0;
375        libtrace_zero_ringbuffer(&t->rbuffer);
376        t->trace = NULL;
377        t->ret = NULL;
378        t->type = THREAD_EMPTY;
379        t->perpkt_num = -1;
380}
381
382// Ints are aligned int is atomic so safe to read and write at same time
383// However write must be locked, read doesn't (We never try read before written to table)
384libtrace_thread_t * get_thread_table(libtrace_t *libtrace) {
385        int i = 0;
386        pthread_t tid = pthread_self();
387
388        for (;i<libtrace->perpkt_thread_count ;++i) {
389                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
390                        return &libtrace->perpkt_threads[i];
391        }
392        return NULL;
393}
394
395static libtrace_thread_t * get_thread_descriptor(libtrace_t *libtrace) {
396        libtrace_thread_t *ret;
397        if (!(ret = get_thread_table(libtrace))) {
398                pthread_t tid = pthread_self();
399                // Check if we are reporter or something else
400                if (libtrace->hasher_thread.type == THREAD_REPORTER &&
401                                pthread_equal(tid, libtrace->reporter_thread.tid))
402                        ret = &libtrace->reporter_thread;
403                else if (libtrace->hasher_thread.type == THREAD_HASHER &&
404                         pthread_equal(tid, libtrace->hasher_thread.tid))
405                        ret = &libtrace->hasher_thread;
406                else
407                        ret = NULL;
408        }
409        return ret;
410}
411
412DLLEXPORT void libtrace_make_packet_safe(libtrace_packet_t *pkt) {
413        // Duplicate the packet in standard malloc'd memory and free the
414        // original, This is a 1:1 exchange so the ocache count remains unchanged.
415        if (pkt->buf_control != TRACE_CTRL_PACKET) {
416                libtrace_packet_t *dup;
417                dup = trace_copy_packet(pkt);
418                /* Release the external buffer */
419                trace_fin_packet(pkt);
420                /* Copy the duplicated packet over the existing */
421                memcpy(pkt, dup, sizeof(libtrace_packet_t));
422                /* Free the packet structure */
423                free(dup);
424        }
425}
426
427/**
428 * Makes a libtrace_result_t safe, used when pausing a trace.
429 * This will call libtrace_make_packet_safe if the result is
430 * a packet.
431 */
432DLLEXPORT void libtrace_make_result_safe(libtrace_result_t *res) {
433        if (res->type == RESULT_PACKET) {
434                libtrace_make_packet_safe(res->value.pkt);
435        }
436}
437
438/**
439 * Holds threads in a paused state, until released by broadcasting
440 * the condition mutex.
441 */
442static void trace_thread_pause(libtrace_t *trace, libtrace_thread_t *t) {
443        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
444        thread_change_state(trace, t, THREAD_PAUSED, false);
445        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
446                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
447        }
448        thread_change_state(trace, t, THREAD_RUNNING, false);
449        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
450}
451
452/**
453 * Sends a packet to the user, expects either a valid packet or a TICK packet.
454 *
455 * @param trace The trace
456 * @param t The current thread
457 * @param packet A pointer to the packet storage, which may be set to null upon
458 *               return, or a packet to be finished.
459 * @param tracetime If true packets are delayed to match with tracetime
460 * @return 0 is successful, otherwise if playing back in tracetime
461 *         READ_MESSAGE(-2) can be returned in which case the packet is not sent.
462 *
463 * @note READ_MESSAGE will only be returned if tracetime is true.
464 */
465static inline int dispatch_packet(libtrace_t *trace,
466                                  libtrace_thread_t *t,
467                                  libtrace_packet_t **packet,
468                                  bool tracetime) {
469
470        if ((*packet)->error > 0) {
471                if (tracetime) {
472                        if (delay_tracetime(trace, packet[0], t) == READ_MESSAGE)
473                                return READ_MESSAGE;
474                }
475                t->accepted_packets++;
476                if (trace->perpkt_cbs->message_packet)
477                        *packet = (*trace->perpkt_cbs->message_packet)(trace, t, trace->global_blob, t->user_data, *packet);
478                trace_fin_packet(*packet);
479        } else {
480                assert((*packet)->error == READ_TICK);
481                libtrace_generic_t data = {.uint64 = trace_packet_get_order(*packet)};
482                send_message(trace, t, MESSAGE_TICK_COUNT, data, t);
483        }
484        return 0;
485}
486
487/**
488 * Sends a batch of packets to the user, expects either a valid packet or a
489 * TICK packet.
490 *
491 * @param trace The trace
492 * @param t The current thread
493 * @param packets [in,out] An array of packets, these may be null upon return
494 * @param nb_packets The total number of packets in the list
495 * @param empty [in,out] A pointer to an integer storing the first empty slot,
496 * upon return this is updated
497 * @param offset [in,out] The offset into the array, upon return this is updated
498 * @param tracetime If true packets are delayed to match with tracetime
499 * @return 0 is successful, otherwise if playing back in tracetime
500 *         READ_MESSAGE(-2) can be returned in which case the packet is not sent.
501 *
502 * @note READ_MESSAGE will only be returned if tracetime is true.
503 */
504static inline int dispatch_packets(libtrace_t *trace,
505                                  libtrace_thread_t *t,
506                                  libtrace_packet_t *packets[],
507                                  int nb_packets, int *empty, int *offset,
508                                  bool tracetime) {
509        for (;*offset < nb_packets; ++*offset) {
510                int ret;
511                ret = dispatch_packet(trace, t, &packets[*offset], tracetime);
512                if (ret == 0) {
513                        /* Move full slots to front as we go */
514                        if (packets[*offset]) {
515                                if (*empty != *offset) {
516                                        packets[*empty] = packets[*offset];
517                                        packets[*offset] = NULL;
518                                }
519                                ++*empty;
520                        }
521                } else {
522                        /* Break early */
523                        assert(ret == READ_MESSAGE);
524                        return READ_MESSAGE;
525                }
526        }
527
528        return 0;
529}
530
531/**
532 * Pauses a per packet thread, messages will not be processed when the thread
533 * is paused.
534 *
535 * This process involves reading packets if a hasher thread is used. As such
536 * this function can fail to pause due to errors when reading in which case
537 * the thread should be stopped instead.
538 *
539 *
540 * @brief trace_perpkt_thread_pause
541 * @return READ_ERROR(-1) or READ_EOF(0) or 1 if successfull
542 */
543static int trace_perpkt_thread_pause(libtrace_t *trace, libtrace_thread_t *t,
544                                     libtrace_packet_t *packets[],
545                                     int nb_packets, int *empty, int *offset) {
546        libtrace_packet_t * packet = NULL;
547
548        /* Let the user thread know we are going to pause */
549        send_message(trace, t, MESSAGE_PAUSING, gen_zero, t);
550
551        /* Send through any remaining packets (or messages) without delay */
552
553        /* First send those packets already read, as fast as possible
554         * This should never fail or check for messages etc. */
555        ASSERT_RET(dispatch_packets(trace, t, packets, nb_packets, empty,
556                                    offset, false), == 0);
557
558        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
559        /* If a hasher thread is running, empty input queues so we don't lose data */
560        if (trace_has_dedicated_hasher(trace)) {
561                // The hasher has stopped by this point, so the queue shouldn't be filling
562                while(!libtrace_ringbuffer_is_empty(&t->rbuffer) || t->format_data) {
563                        int ret = trace->pread(trace, t, &packet, 1);
564                        if (ret == 1) {
565                                if (packet->error > 0) {
566                                        store_first_packet(trace, packet, t);
567                                }
568                                ASSERT_RET(dispatch_packet(trace, t, &packet, false), == 0);
569                                if (packet == NULL)
570                                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
571                        } else if (ret != READ_MESSAGE) {
572                                /* Ignore messages we pick these up next loop */
573                                assert (ret == READ_EOF || ret == READ_ERROR);
574                                /* Verify no packets are remaining */
575                                /* TODO refactor this sanity check out!! */
576                                while (!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
577                                        ASSERT_RET(trace->pread(trace, t, &packet, 1), <= 0);
578                                        // No packets after this should have any data in them
579                                        assert(packet->error <= 0);
580                                }
581                                libtrace_ocache_free(&trace->packet_freelist, (void **) &packet, 1, 1);
582                                return -1;
583                        }
584                }
585        }
586        libtrace_ocache_free(&trace->packet_freelist, (void **) &packet, 1, 1);
587
588        /* Now we do the actual pause, this returns when we resumed */
589        trace_thread_pause(trace, t);
590        send_message(trace, t, MESSAGE_RESUMING, gen_zero, t);
591        return 1;
592}
593
594/**
595 * The is the entry point for our packet processing threads.
596 */
597static void* perpkt_threads_entry(void *data) {
598        libtrace_t *trace = (libtrace_t *)data;
599        libtrace_thread_t *t;
600        libtrace_message_t message = {0, {.uint64=0}, NULL};
601        libtrace_packet_t *packets[trace->config.burst_size];
602        size_t i;
603        //int ret;
604        /* The current reading position into the packets */
605        int offset = 0;
606        /* The number of packets last read */
607        int nb_packets = 0;
608        /* The offset to the first NULL packet upto offset */
609        int empty = 0;
610
611        /* Wait until trace_pstart has been completed */
612        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
613        t = get_thread_table(trace);
614        assert(t);
615        if (trace->state == STATE_ERROR) {
616                thread_change_state(trace, t, THREAD_FINISHED, false);
617                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
618                pthread_exit(NULL);
619        }
620        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
621
622        if (trace->format->pregister_thread) {
623                if (trace->format->pregister_thread(trace, t, 
624                                trace_is_parallel(trace)) < 0) {
625                        thread_change_state(trace, t, THREAD_FINISHED, false);
626                        pthread_exit(NULL);
627                }
628        }
629
630        /* Fill our buffer with empty packets */
631        memset(&packets, 0, sizeof(void*) * trace->config.burst_size);
632        libtrace_ocache_alloc(&trace->packet_freelist, (void **) packets,
633                              trace->config.burst_size,
634                              trace->config.burst_size);
635
636        /* ~~~~~~~~~~~ Setup complete now we loop ~~~~~~~~~~~~~~~ */
637
638        /* Let the per_packet function know we have started */
639        send_message(trace, t, MESSAGE_STARTING, gen_zero, t);
640        send_message(trace, t, MESSAGE_RESUMING, gen_zero, t);
641
642        for (;;) {
643
644                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
645                        int ret;
646                        switch (message.code) {
647                                case MESSAGE_DO_PAUSE: // This is internal
648                                        ret = trace_perpkt_thread_pause(trace, t,
649                                              packets, nb_packets, &empty, &offset);
650                                        if (ret == READ_EOF) {
651                                                goto eof;
652                                        } else if (ret == READ_ERROR) {
653                                                goto error;
654                                        }
655                                        assert(ret == 1);
656                                        continue;
657                                case MESSAGE_DO_STOP: // This is internal
658                                        goto eof;
659                        }
660                        send_message(trace, t, message.code, message.data, 
661                                        message.sender);
662                        /* Continue and the empty messages out before packets */
663                        continue;
664                }
665
666
667                /* Do we need to read a new set of packets MOST LIKELY we do */
668                if (offset == nb_packets) {
669                        /* Refill the packet buffer */
670                        if (empty != nb_packets) {
671                                // Refill the empty packets
672                                libtrace_ocache_alloc(&trace->packet_freelist,
673                                                      (void **) &packets[empty],
674                                                      nb_packets - empty,
675                                                      nb_packets - empty);
676                        }
677                        if (!trace->pread) {
678                                assert(packets[0]);
679                                ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
680                                nb_packets = trace_read_packet(trace, packets[0]);
681                                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
682                                packets[0]->error = nb_packets;
683                                if (nb_packets > 0)
684                                        nb_packets = 1;
685                        } else {
686                                nb_packets = trace->pread(trace, t, packets, trace->config.burst_size);
687                        }
688                        offset = 0;
689                        empty = 0;
690                }
691
692                /* Handle error/message cases */
693                if (nb_packets > 0) {
694                        /* Store the first packet */
695                        if (packets[0]->error > 0) {
696                                store_first_packet(trace, packets[0], t);
697                        }
698                        dispatch_packets(trace, t, packets, nb_packets, &empty,
699                                         &offset, trace->tracetime);
700                } else {
701                        switch (nb_packets) {
702                        case READ_EOF:
703                                goto eof;
704                        case READ_ERROR:
705                                goto error;
706                        case READ_MESSAGE:
707                                nb_packets = 0;
708                                continue;
709                        default:
710                                fprintf(stderr, "Unexpected error %d!!\n", nb_packets);
711                                goto error;
712                        }
713                }
714
715        }
716
717error:
718        message.code = MESSAGE_DO_STOP;
719        message.sender = t;
720        message.data.uint64 = 0;
721        trace_message_perpkts(trace, &message);
722eof:
723        /* ~~~~~~~~~~~~~~ Trace is finished do tear down ~~~~~~~~~~~~~~~~~~~~~ */
724
725        // Let the per_packet function know we have stopped
726        send_message(trace, t, MESSAGE_PAUSING, gen_zero, t);
727        send_message(trace, t, MESSAGE_STOPPING, gen_zero, t);
728
729        // Free any remaining packets
730        for (i = 0; i < trace->config.burst_size; i++) {
731                if (packets[i]) {
732                        libtrace_ocache_free(&trace->packet_freelist, (void **) &packets[i], 1, 1);
733                        packets[i] = NULL;
734                }
735        }
736
737        thread_change_state(trace, t, THREAD_FINISHED, true);
738
739        /* Make sure the reporter sees we have finished */
740        if (trace_has_reporter(trace))
741                trace_post_reporter(trace);
742
743        // Release all ocache memory before unregistering with the format
744        // because this might(it does in DPDK) unlink the formats mempool
745        // causing destroy/finish packet to fail.
746        libtrace_ocache_unregister_thread(&trace->packet_freelist);
747        if (trace->format->punregister_thread) {
748                trace->format->punregister_thread(trace, t);
749        }
750        print_memory_stats();
751
752        pthread_exit(NULL);
753}
754
755/**
756 * The start point for our single threaded hasher thread, this will read
757 * and hash a packet from a data source and queue it against the correct
758 * core to process it.
759 */
760static void* hasher_entry(void *data) {
761        libtrace_t *trace = (libtrace_t *)data;
762        libtrace_thread_t * t;
763        int i;
764        libtrace_packet_t * packet;
765        libtrace_message_t message = {0, {.uint64=0}, NULL};
766        int pkt_skipped = 0;
767
768        assert(trace_has_dedicated_hasher(trace));
769        /* Wait until all threads are started and objects are initialised (ring buffers) */
770        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
771        t = &trace->hasher_thread;
772        assert(t->type == THREAD_HASHER && pthread_equal(pthread_self(), t->tid));
773        if (trace->state == STATE_ERROR) {
774                thread_change_state(trace, t, THREAD_FINISHED, false);
775                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
776                pthread_exit(NULL);
777        }
778        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
779
780        /* We are reading but it is not the parallel API */
781        if (trace->format->pregister_thread) {
782                trace->format->pregister_thread(trace, t, true);
783        }
784
785        /* Read all packets in then hash and queue against the correct thread */
786        while (1) {
787                int thread;
788                if (!pkt_skipped)
789                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
790                assert(packet);
791
792                // Check for messages that we expect MESSAGE_DO_PAUSE, (internal messages only)
793                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
794                        switch(message.code) {
795                                case MESSAGE_DO_PAUSE:
796                                        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
797                                        thread_change_state(trace, t, THREAD_PAUSED, false);
798                                        pthread_cond_broadcast(&trace->perpkt_cond);
799                                        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
800                                                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
801                                        }
802                                        thread_change_state(trace, t, THREAD_RUNNING, false);
803                                        pthread_cond_broadcast(&trace->perpkt_cond);
804                                        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
805                                        break;
806                                case MESSAGE_DO_STOP:
807                                        assert(trace->started == false);
808                                        assert(trace->state == STATE_FINISHED);
809                                        /* Mark the current packet as EOF */
810                                        packet->error = 0;
811                                        break;
812                                default:
813                                        fprintf(stderr, "Hasher thread didn't expect message code=%d\n", message.code);
814                        }
815                        pkt_skipped = 1;
816                        continue;
817                }
818
819                if ((packet->error = trace_read_packet(trace, packet)) <1) {
820                        if (packet->error == READ_MESSAGE) {
821                                pkt_skipped = 1;
822                                continue;
823                        } else {
824                                break; /* We are EOF or error'd either way we stop  */
825                        }
826                }
827
828                /* We are guaranteed to have a hash function i.e. != NULL */
829                trace_packet_set_hash(packet, (*trace->hasher)(packet, trace->hasher_data));
830                thread = trace_packet_get_hash(packet) % trace->perpkt_thread_count;
831                /* Blocking write to the correct queue - I'm the only writer */
832                if (trace->perpkt_threads[thread].state != THREAD_FINISHED) {
833                        uint64_t order = trace_packet_get_order(packet);
834                        libtrace_ringbuffer_write(&trace->perpkt_threads[thread].rbuffer, packet);
835                        if (trace->config.tick_count && order % trace->config.tick_count == 0) {
836                                // Write ticks to everyone else
837                                libtrace_packet_t * pkts[trace->perpkt_thread_count];
838                                memset(pkts, 0, sizeof(void *) * trace->perpkt_thread_count);
839                                libtrace_ocache_alloc(&trace->packet_freelist, (void **) pkts, trace->perpkt_thread_count, trace->perpkt_thread_count);
840                                for (i = 0; i < trace->perpkt_thread_count; i++) {
841                                        pkts[i]->error = READ_TICK;
842                                        trace_packet_set_order(pkts[i], order);
843                                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, pkts[i]);
844                                }
845                        }
846                        pkt_skipped = 0;
847                } else {
848                        assert(!"Dropping a packet!!");
849                        pkt_skipped = 1; // Reuse that packet no one read it
850                }
851        }
852
853        /* Broadcast our last failed read to all threads */
854        for (i = 0; i < trace->perpkt_thread_count; i++) {
855                libtrace_packet_t * bcast;
856                if (i == trace->perpkt_thread_count - 1) {
857                        bcast = packet;
858                } else {
859                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &bcast, 1, 1);
860                        bcast->error = packet->error;
861                }
862                ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
863                if (trace->perpkt_threads[i].state != THREAD_FINISHED) {
864                        // Unlock early otherwise we could deadlock
865                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, bcast);
866                }
867                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
868        }
869
870        // We don't need to free the packet
871        thread_change_state(trace, t, THREAD_FINISHED, true);
872
873        libtrace_ocache_unregister_thread(&trace->packet_freelist);
874        if (trace->format->punregister_thread) {
875                trace->format->punregister_thread(trace, t);
876        }
877        print_memory_stats();
878
879        // TODO remove from TTABLE t sometime
880        pthread_exit(NULL);
881}
882
883/* Our simplest case when a thread becomes ready it can obtain an exclusive
884 * lock to read packets from the underlying trace.
885 */
886static int trace_pread_packet_first_in_first_served(libtrace_t *libtrace,
887                                                    libtrace_thread_t *t,
888                                                    libtrace_packet_t *packets[],
889                                                    size_t nb_packets) {
890        size_t i = 0;
891        //bool tick_hit = false;
892
893        ASSERT_RET(pthread_mutex_lock(&libtrace->read_packet_lock), == 0);
894        /* Read nb_packets */
895        for (i = 0; i < nb_packets; ++i) {
896                if (libtrace_message_queue_count(&t->messages) > 0) {
897                        if ( i==0 ) {
898                                ASSERT_RET(pthread_mutex_unlock(&libtrace->read_packet_lock), == 0);
899                                return READ_MESSAGE;
900                        } else {
901                                break;
902                        }
903                }
904                packets[i]->error = trace_read_packet(libtrace, packets[i]);
905
906                if (packets[i]->error <= 0) {
907                        /* We'll catch this next time if we have already got packets */
908                        if ( i==0 ) {
909                                ASSERT_RET(pthread_mutex_unlock(&libtrace->read_packet_lock), == 0);
910                                return packets[i]->error;
911                        } else {
912                                break;
913                        }
914                }
915                /*
916                if (libtrace->config.tick_count && trace_packet_get_order(packets[i]) % libtrace->config.tick_count == 0) {
917                        tick_hit = true;
918                }*/
919        }
920        // Doing this inside the lock ensures the first packet is always
921        // recorded first
922        if (packets[0]->error > 0) {
923                store_first_packet(libtrace, packets[0], t);
924        }
925        ASSERT_RET(pthread_mutex_unlock(&libtrace->read_packet_lock), == 0);
926        /* XXX TODO this needs to be inband with packets, or we don't bother in this case
927        if (tick_hit) {
928                libtrace_message_t tick;
929                tick.additional.uint64 = trace_packet_get_order(packets[i]);
930                tick.code = MESSAGE_TICK;
931                trace_send_message_to_perpkts(libtrace, &tick);
932        } */
933        return i;
934}
935
936/**
937 * For the case that we have a dedicated hasher thread
938 * 1. We read a packet from our buffer
939 * 2. Move that into the packet provided (packet)
940 */
941inline static int trace_pread_packet_hasher_thread(libtrace_t *libtrace,
942                                                   libtrace_thread_t *t,
943                                                   libtrace_packet_t *packets[],
944                                                   size_t nb_packets) {
945        size_t i;
946
947        /* We store the last error message here */
948        if (t->format_data) {
949                return ((libtrace_packet_t *)t->format_data)->error;
950        }
951
952        // Always grab at least one
953        if (packets[0]) // Recycle the old get the new
954                libtrace_ocache_free(&libtrace->packet_freelist, (void **) packets, 1, 1);
955        packets[0] = libtrace_ringbuffer_read(&t->rbuffer);
956
957        if (packets[0]->error <= 0 && packets[0]->error != READ_TICK) {
958                return packets[0]->error;
959        }
960
961        for (i = 1; i < nb_packets; i++) {
962                if (packets[i]) // Recycle the old get the new
963                        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packets[i], 1, 1);
964                if (!libtrace_ringbuffer_try_read(&t->rbuffer, (void **) &packets[i])) {
965                        packets[i] = NULL;
966                        break;
967                }
968
969                /* We will return an error or EOF the next time around */
970                if (packets[i]->error <= 0 && packets[0]->error != READ_TICK) {
971                        /* The message case will be checked automatically -
972                           However other cases like EOF and error will only be
973                           sent once*/
974                        if (packets[i]->error != READ_MESSAGE) {
975                                assert(t->format_data == NULL);
976                                t->format_data = packets[i];
977                        }
978                        break;
979                }
980        }
981
982        return i;
983}
984
985/**
986 * For the first packet of each queue we keep a copy and note the system
987 * time it was received at.
988 *
989 * This is used for finding the first packet when playing back a trace
990 * in trace time. And can be used by real time applications to print
991 * results out every XXX seconds.
992 */
993void store_first_packet(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t)
994{
995        if (!t->recorded_first) {
996                libtrace_message_t mesg = {0, {.uint64=0}, NULL};
997                struct timeval tv;
998                libtrace_packet_t * dup;
999
1000                /* We mark system time against a copy of the packet */
1001                gettimeofday(&tv, NULL);
1002                dup = trace_copy_packet(packet);
1003
1004                ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
1005                libtrace->first_packets.packets[t->perpkt_num].packet = dup;
1006                memcpy(&libtrace->first_packets.packets[t->perpkt_num].tv, &tv, sizeof(tv));
1007                libtrace->first_packets.count++;
1008
1009                /* Now update the first */
1010                if (libtrace->first_packets.count == 1) {
1011                        /* We the first entry hence also the first known packet */
1012                        libtrace->first_packets.first = t->perpkt_num;
1013                } else {
1014                        /* Check if we are newer than the previous 'first' packet */
1015                        size_t first = libtrace->first_packets.first;
1016                        struct timeval cur_ts = trace_get_timeval(dup);
1017                        struct timeval first_ts = trace_get_timeval(libtrace->first_packets.packets[first].packet);
1018                        if (timercmp(&cur_ts, &first_ts, <))
1019                                libtrace->first_packets.first = t->perpkt_num;
1020                }
1021                ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
1022
1023                mesg.code = MESSAGE_FIRST_PACKET;
1024                trace_message_reporter(libtrace, &mesg);
1025                trace_message_perpkts(libtrace, &mesg);
1026                t->recorded_first = true;
1027        }
1028}
1029
1030DLLEXPORT int trace_get_first_packet(libtrace_t *libtrace,
1031                                     libtrace_thread_t *t,
1032                                     const libtrace_packet_t **packet,
1033                                     const struct timeval **tv)
1034{
1035        void * tmp;
1036        int ret = 0;
1037
1038        if (t) {
1039                if (t->type != THREAD_PERPKT || t->trace != libtrace)
1040                        return -1;
1041        }
1042
1043        /* Throw away these which we don't use */
1044        if (!packet)
1045                packet = (const libtrace_packet_t **) &tmp;
1046        if (!tv)
1047                tv = (const struct timeval **) &tmp;
1048
1049        ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
1050        if (t) {
1051                /* Get the requested thread */
1052                *packet = libtrace->first_packets.packets[t->perpkt_num].packet;
1053                *tv = &libtrace->first_packets.packets[t->perpkt_num].tv;
1054        } else if (libtrace->first_packets.count) {
1055                /* Get the first packet across all threads */
1056                *packet = libtrace->first_packets.packets[libtrace->first_packets.first].packet;
1057                *tv = &libtrace->first_packets.packets[libtrace->first_packets.first].tv;
1058                if (libtrace->first_packets.count == (size_t) libtrace->perpkt_thread_count) {
1059                        ret = 1;
1060                } else {
1061                        struct timeval curr_tv;
1062                        // If a second has passed since the first entry we will assume this is the very first packet
1063                        gettimeofday(&curr_tv, NULL);
1064                        if (curr_tv.tv_sec > (*tv)->tv_sec) {
1065                                if(curr_tv.tv_usec > (*tv)->tv_usec || curr_tv.tv_sec - (*tv)->tv_sec > 1) {
1066                                        ret = 1;
1067                                }
1068                        }
1069                }
1070        } else {
1071                *packet = NULL;
1072                *tv = NULL;
1073        }
1074        ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
1075        return ret;
1076}
1077
1078
1079DLLEXPORT uint64_t tv_to_usec(const struct timeval *tv)
1080{
1081        return (uint64_t) tv->tv_sec*1000000ull + (uint64_t) tv->tv_usec;
1082}
1083
1084inline static struct timeval usec_to_tv(uint64_t usec)
1085{
1086        struct timeval tv;
1087        tv.tv_sec = usec / 1000000;
1088        tv.tv_usec = usec % 1000000;
1089        return tv;
1090}
1091
1092/** Similar to delay_tracetime but send messages to all threads periodically */
1093static void* reporter_entry(void *data) {
1094        libtrace_message_t message = {0, {.uint64=0}, NULL};
1095        libtrace_t *trace = (libtrace_t *)data;
1096        libtrace_thread_t *t = &trace->reporter_thread;
1097
1098        /* Wait until all threads are started */
1099        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
1100        if (trace->state == STATE_ERROR) {
1101                thread_change_state(trace, t, THREAD_FINISHED, false);
1102                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1103                pthread_exit(NULL);
1104        }
1105        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1106
1107        if (trace->format->pregister_thread) {
1108                trace->format->pregister_thread(trace, t, false);
1109        }
1110
1111        send_message(trace, t, MESSAGE_STARTING, (libtrace_generic_t){0}, t);
1112        send_message(trace, t, MESSAGE_RESUMING, (libtrace_generic_t){0}, t);
1113
1114        while (!trace_has_finished(trace)) {
1115                if (trace->config.reporter_polling) {
1116                        if (libtrace_message_queue_try_get(&t->messages, &message) == LIBTRACE_MQ_FAILED)
1117                                message.code = MESSAGE_POST_REPORTER;
1118                } else {
1119                        libtrace_message_queue_get(&t->messages, &message);
1120                }
1121                switch (message.code) {
1122                        // Check for results
1123                        case MESSAGE_POST_REPORTER:
1124                                trace->combiner.read(trace, &trace->combiner);
1125                                break;
1126                        case MESSAGE_DO_PAUSE:
1127                                assert(trace->combiner.pause);
1128                                trace->combiner.pause(trace, &trace->combiner);
1129                                send_message(trace, t, MESSAGE_PAUSING,
1130                                                (libtrace_generic_t) {0}, t);
1131                                trace_thread_pause(trace, t);
1132                                send_message(trace, t, MESSAGE_RESUMING,
1133                                                (libtrace_generic_t) {0}, t);
1134                                break;
1135                default:
1136                        send_message(trace, t, message.code, message.data,
1137                                        message.sender);
1138                }
1139        }
1140
1141        // Flush out whats left now all our threads have finished
1142        trace->combiner.read_final(trace, &trace->combiner);
1143
1144        // GOODBYE
1145        send_message(trace, t, MESSAGE_PAUSING,(libtrace_generic_t) {0}, t);
1146        send_message(trace, t, MESSAGE_STOPPING,(libtrace_generic_t) {0}, t);
1147
1148        thread_change_state(trace, &trace->reporter_thread, THREAD_FINISHED, true);
1149        print_memory_stats();
1150        return NULL;
1151}
1152
1153/** Similar to delay_tracetime but send messages to all threads periodically */
1154static void* keepalive_entry(void *data) {
1155        struct timeval prev, next;
1156        libtrace_message_t message = {0, {.uint64=0}, NULL};
1157        libtrace_t *trace = (libtrace_t *)data;
1158        uint64_t next_release;
1159        libtrace_thread_t *t = &trace->keepalive_thread;
1160
1161        /* Wait until all threads are started */
1162        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
1163        if (trace->state == STATE_ERROR) {
1164                thread_change_state(trace, t, THREAD_FINISHED, false);
1165                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1166                pthread_exit(NULL);
1167        }
1168        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1169
1170        gettimeofday(&prev, NULL);
1171        message.code = MESSAGE_TICK_INTERVAL;
1172
1173        while (trace->state != STATE_FINISHED) {
1174                fd_set rfds;
1175                next_release = tv_to_usec(&prev) + (trace->config.tick_interval * 1000);
1176                gettimeofday(&next, NULL);
1177                if (next_release > tv_to_usec(&next)) {
1178                        next = usec_to_tv(next_release - tv_to_usec(&next));
1179                        // Wait for timeout or a message
1180                        FD_ZERO(&rfds);
1181                        FD_SET(libtrace_message_queue_get_fd(&t->messages), &rfds);
1182                        if (select(libtrace_message_queue_get_fd(&t->messages)+1, &rfds, NULL, NULL, &next) == 1) {
1183                                libtrace_message_t msg;
1184                                libtrace_message_queue_get(&t->messages, &msg);
1185                                assert(msg.code == MESSAGE_DO_STOP);
1186                                goto done;
1187                        }
1188                }
1189                prev = usec_to_tv(next_release);
1190                if (trace->state == STATE_RUNNING) {
1191                        message.data.uint64 = ((((uint64_t)prev.tv_sec) << 32) +
1192                                               (((uint64_t)prev.tv_usec << 32)/1000000));
1193                        trace_message_perpkts(trace, &message);
1194                }
1195        }
1196done:
1197
1198        thread_change_state(trace, t, THREAD_FINISHED, true);
1199        return NULL;
1200}
1201
1202/**
1203 * Delays a packets playback so the playback will be in trace time.
1204 * This may break early if a message becomes available.
1205 *
1206 * Requires the first packet for this thread to be received.
1207 * @param libtrace  The trace
1208 * @param packet    The packet to delay
1209 * @param t         The current thread
1210 * @return Either READ_MESSAGE(-2) or 0 is successful
1211 */
1212static inline int delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t) {
1213        struct timeval curr_tv, pkt_tv;
1214        uint64_t next_release = t->tracetime_offset_usec;
1215        uint64_t curr_usec;
1216
1217        if (!t->tracetime_offset_usec) {
1218                const libtrace_packet_t *first_pkt;
1219                const struct timeval *sys_tv;
1220                int64_t initial_offset;
1221                int stable = trace_get_first_packet(libtrace, NULL, &first_pkt, &sys_tv);
1222                assert(first_pkt);
1223                pkt_tv = trace_get_timeval(first_pkt);
1224                initial_offset = (int64_t)tv_to_usec(sys_tv) - (int64_t)tv_to_usec(&pkt_tv);
1225                /* In the unlikely case offset is 0, change it to 1 */
1226                if (stable)
1227                        t->tracetime_offset_usec = initial_offset ? initial_offset: 1;
1228                next_release = initial_offset;
1229        }
1230        /* next_release == offset */
1231        pkt_tv = trace_get_timeval(packet);
1232        next_release += tv_to_usec(&pkt_tv);
1233        gettimeofday(&curr_tv, NULL);
1234        curr_usec = tv_to_usec(&curr_tv);
1235        if (next_release > curr_usec) {
1236                int ret, mesg_fd = libtrace_message_queue_get_fd(&t->messages);
1237                struct timeval delay_tv = usec_to_tv(next_release-curr_usec);
1238                fd_set rfds;
1239                FD_ZERO(&rfds);
1240                FD_SET(mesg_fd, &rfds);
1241                // We need to wait
1242                ret = select(mesg_fd+1, &rfds, NULL, NULL, &delay_tv);
1243                if (ret == 0) {
1244                        return 0;
1245                } else if (ret > 0) {
1246                        return READ_MESSAGE;
1247                } else {
1248                        assert(!"trace_delay_packet: Unexpected return from select");
1249                }
1250        }
1251        return 0;
1252}
1253
1254/* Discards packets that don't match the filter.
1255 * Discarded packets are emptied and then moved to the end of the packet list.
1256 *
1257 * @param trace       The trace format, containing the filter
1258 * @param packets     An array of packets
1259 * @param nb_packets  The number of valid items in packets
1260 *
1261 * @return The number of packets that passed the filter, which are moved to
1262 *          the start of the packets array
1263 */
1264static inline size_t filter_packets(libtrace_t *trace,
1265                                    libtrace_packet_t **packets,
1266                                    size_t nb_packets) {
1267        size_t offset = 0;
1268        size_t i;
1269
1270        for (i = 0; i < nb_packets; ++i) {
1271                // The filter needs the trace attached to receive the link type
1272                packets[i]->trace = trace;
1273                if (trace_apply_filter(trace->filter, packets[i])) {
1274                        libtrace_packet_t *tmp;
1275                        tmp = packets[offset];
1276                        packets[offset++] = packets[i];
1277                        packets[i] = tmp;
1278                } else {
1279                        trace_fin_packet(packets[i]);
1280                }
1281        }
1282
1283        return offset;
1284}
1285
1286/* Read a batch of packets from the trace into a buffer.
1287 * Note that this function will block until a packet is read (or EOF is reached)
1288 *
1289 * @param libtrace    The trace
1290 * @param t           The thread
1291 * @param packets     An array of packets
1292 * @param nb_packets  The number of empty packets in packets
1293 * @return The number of packets read, 0 on EOF (or an error/message -1,-2).
1294 */
1295static int trace_pread_packet_wrapper(libtrace_t *libtrace,
1296                                      libtrace_thread_t *t,
1297                                      libtrace_packet_t *packets[],
1298                                      size_t nb_packets) {
1299        int i;
1300        assert(nb_packets);
1301        assert(libtrace && "libtrace is NULL in trace_read_packet()");
1302        if (trace_is_err(libtrace))
1303                return -1;
1304        if (!libtrace->started) {
1305                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
1306                              "You must call libtrace_start() before trace_read_packet()\n");
1307                return -1;
1308        }
1309
1310        if (libtrace->format->pread_packets) {
1311                int ret;
1312                for (i = 0; i < (int) nb_packets; ++i) {
1313                        assert(i[packets]);
1314                        if (!(packets[i]->buf_control==TRACE_CTRL_PACKET ||
1315                              packets[i]->buf_control==TRACE_CTRL_EXTERNAL)) {
1316                                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,
1317                                              "Packet passed to trace_read_packet() is invalid\n");
1318                                return -1;
1319                        }
1320                }
1321                do {
1322                        ret=libtrace->format->pread_packets(libtrace, t,
1323                                                            packets,
1324                                                            nb_packets);
1325                        /* Error, EOF or message? */
1326                        if (ret <= 0) {
1327                                return ret;
1328                        }
1329
1330                        if (libtrace->filter) {
1331                                int remaining;
1332                                remaining = filter_packets(libtrace,
1333                                                           packets, ret);
1334                                t->filtered_packets += ret - remaining;
1335                                ret = remaining;
1336                        }
1337                        for (i = 0; i < ret; ++i) {
1338                                /* We do not mark the packet against the trace,
1339                                 * before hand or after. After breaks DAG meta
1340                                 * packets and before is inefficient */
1341                                //packets[i]->trace = libtrace;
1342                                /* TODO IN FORMAT?? Like traditional libtrace */
1343                                if (libtrace->snaplen>0)
1344                                        trace_set_capture_length(packets[i],
1345                                                        libtrace->snaplen);
1346                                trace_packet_set_order(packets[i], trace_get_erf_timestamp(packets[i]));
1347                        }
1348                } while(ret == 0);
1349                return ret;
1350        }
1351        trace_set_err(libtrace, TRACE_ERR_UNSUPPORTED,
1352                      "This format does not support reading packets\n");
1353        return ~0U;
1354}
1355
1356/* Restarts a parallel trace, this is called from trace_pstart.
1357 * The libtrace lock is held upon calling this function.
1358 * Typically with a parallel trace the threads are not
1359 * killed rather.
1360 */
1361static int trace_prestart(libtrace_t * libtrace, void *global_blob,
1362                          libtrace_callback_set_t *per_packet_cbs, 
1363                          libtrace_callback_set_t *reporter_cbs) {
1364        int i, err = 0;
1365        if (libtrace->state != STATE_PAUSED) {
1366                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
1367                        "trace(%s) is not currently paused",
1368                              libtrace->uridata);
1369                return -1;
1370        }
1371
1372        assert(libtrace_parallel);
1373        assert(!libtrace->perpkt_thread_states[THREAD_RUNNING]);
1374
1375        /* Reset first packets */
1376        pthread_spin_lock(&libtrace->first_packets.lock);
1377        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1378                assert(!!libtrace->perpkt_threads[i].recorded_first == !!libtrace->first_packets.packets[i].packet);
1379                if (libtrace->first_packets.packets[i].packet) {
1380                        trace_destroy_packet(libtrace->first_packets.packets[i].packet);
1381                        libtrace->first_packets.packets[i].packet = NULL;
1382                        libtrace->first_packets.packets[i].tv.tv_sec = 0;
1383                        libtrace->first_packets.packets[i].tv.tv_usec = 0;
1384                        libtrace->first_packets.count--;
1385                        libtrace->perpkt_threads[i].recorded_first = false;
1386                }
1387        }
1388        assert(libtrace->first_packets.count == 0);
1389        libtrace->first_packets.first = 0;
1390        pthread_spin_unlock(&libtrace->first_packets.lock);
1391
1392        /* Reset delay */
1393        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1394                libtrace->perpkt_threads[i].tracetime_offset_usec = 0;
1395        }
1396
1397        /* Reset statistics */
1398        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1399                libtrace->perpkt_threads[i].accepted_packets = 0;
1400                libtrace->perpkt_threads[i].filtered_packets = 0;
1401        }
1402        libtrace->accepted_packets = 0;
1403        libtrace->filtered_packets = 0;
1404
1405        /* Update functions if requested */
1406        if(global_blob)
1407                libtrace->global_blob = global_blob;
1408
1409        if (per_packet_cbs) {
1410                if (libtrace->perpkt_cbs)
1411                        trace_destroy_callback_set(libtrace->perpkt_cbs);
1412                libtrace->perpkt_cbs = trace_create_callback_set();
1413                memcpy(libtrace->perpkt_cbs, per_packet_cbs, 
1414                                sizeof(libtrace_callback_set_t));
1415        }
1416
1417        if (reporter_cbs) {
1418                if (libtrace->reporter_cbs)
1419                        trace_destroy_callback_set(libtrace->reporter_cbs);
1420
1421                libtrace->reporter_cbs = trace_create_callback_set();
1422                memcpy(libtrace->reporter_cbs, reporter_cbs, 
1423                                sizeof(libtrace_callback_set_t));
1424        }
1425
1426        if (trace_is_parallel(libtrace)) {
1427                err = libtrace->format->pstart_input(libtrace);
1428        } else {
1429                if (libtrace->format->start_input) {
1430                        err = libtrace->format->start_input(libtrace);
1431                }
1432        }
1433
1434        if (err == 0) {
1435                libtrace->started = true;
1436                libtrace_change_state(libtrace, STATE_RUNNING, false);
1437        }
1438        return err;
1439}
1440
1441/**
1442 * @return the number of CPU cores on the machine. -1 if unknown.
1443 */
1444SIMPLE_FUNCTION static int get_nb_cores() {
1445        int numCPU;
1446#ifdef _SC_NPROCESSORS_ONLN
1447        /* Most systems do this now */
1448        numCPU = sysconf(_SC_NPROCESSORS_ONLN);
1449
1450#else
1451        int mib[] = {CTL_HW, HW_AVAILCPU};
1452        size_t len = sizeof(numCPU);
1453
1454        /* get the number of CPUs from the system */
1455        sysctl(mib, 2, &numCPU, &len, NULL, 0);
1456#endif
1457        return numCPU <= 0 ? 1 : numCPU;
1458}
1459
1460/**
1461 * Verifies the configuration and sets default values for any values not
1462 * specified by the user.
1463 */
1464static void verify_configuration(libtrace_t *libtrace) {
1465
1466        if (libtrace->config.hasher_queue_size <= 0)
1467                libtrace->config.hasher_queue_size = 1000;
1468
1469        if (libtrace->config.perpkt_threads <= 0) {
1470                libtrace->perpkt_thread_count = get_nb_cores();
1471                if (libtrace->perpkt_thread_count <= 0)
1472                        // Lets just use one
1473                        libtrace->perpkt_thread_count = 1;
1474        } else {
1475                libtrace->perpkt_thread_count = libtrace->config.perpkt_threads;
1476        }
1477
1478        if (libtrace->config.reporter_thold <= 0)
1479                libtrace->config.reporter_thold = 100;
1480        if (libtrace->config.burst_size <= 0)
1481                libtrace->config.burst_size = 32;
1482        if (libtrace->config.thread_cache_size <= 0)
1483                libtrace->config.thread_cache_size = 64;
1484        if (libtrace->config.cache_size <= 0)
1485                libtrace->config.cache_size = (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count;
1486
1487        if (libtrace->config.cache_size <
1488                (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count)
1489                fprintf(stderr, "WARNING deadlocks may occur and extra memory allocating buffer sizes (packet_freelist_size) mismatched\n");
1490
1491        if (libtrace->combiner.initialise == NULL && libtrace->combiner.publish == NULL)
1492                libtrace->combiner = combiner_unordered;
1493
1494        /* Figure out if we are using a dedicated hasher thread? */
1495        if (libtrace->hasher && libtrace->perpkt_thread_count > 1) {
1496                libtrace->hasher_thread.type = THREAD_HASHER;
1497        }
1498}
1499
1500/**
1501 * Starts a libtrace_thread, including allocating memory for messaging.
1502 * Threads are expected to wait until the libtrace look is released.
1503 * Hence why we don't init structures until later.
1504 *
1505 * @param trace The trace the thread is associated with
1506 * @param t The thread that is filled when the thread is started
1507 * @param type The type of thread
1508 * @param start_routine The entry location of the thread
1509 * @param perpkt_num The perpkt thread number (should be set -1 if not perpkt)
1510 * @param name For debugging purposes set the threads name (Optional)
1511 *
1512 * @return 0 on success or -1 upon error in which case the libtrace error is set.
1513 *         In this situation the thread structure is zeroed.
1514 */
1515static int trace_start_thread(libtrace_t *trace,
1516                       libtrace_thread_t *t,
1517                       enum thread_types type,
1518                       void *(*start_routine) (void *),
1519                       int perpkt_num,
1520                       const char *name) {
1521#ifdef __linux__
1522        pthread_attr_t attrib;
1523        cpu_set_t cpus;
1524        int i;
1525#endif
1526        int ret;
1527        assert(t->type == THREAD_EMPTY);
1528        t->trace = trace;
1529        t->ret = NULL;
1530        t->user_data = NULL;
1531        t->type = type;
1532        t->state = THREAD_RUNNING;
1533
1534        assert(name);
1535
1536#ifdef __linux__
1537        CPU_ZERO(&cpus);
1538        for (i = 0; i < get_nb_cores(); i++)
1539                CPU_SET(i, &cpus);
1540        pthread_attr_init(&attrib);
1541        pthread_attr_setaffinity_np(&attrib, sizeof(cpus), &cpus);
1542        ret = pthread_create(&t->tid, &attrib, start_routine, (void *) trace);
1543        pthread_attr_destroy(&attrib);
1544#else
1545        ret = pthread_create(&t->tid, NULL, start_routine, (void *) trace);
1546#endif
1547        if (ret != 0) {
1548                libtrace_zero_thread(t);
1549                trace_set_err(trace, ret, "Failed to create a thread of type=%d\n", type);
1550                return -1;
1551        }
1552        libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
1553        if (trace_has_dedicated_hasher(trace) && type == THREAD_PERPKT) {
1554                libtrace_ringbuffer_init(&t->rbuffer,
1555                                         trace->config.hasher_queue_size,
1556                                         trace->config.hasher_polling?
1557                                                 LIBTRACE_RINGBUFFER_POLLING:
1558                                                 LIBTRACE_RINGBUFFER_BLOCKING);
1559        }
1560#if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__)
1561        if(name)
1562                pthread_setname_np(t->tid, name);
1563#endif
1564        t->perpkt_num = perpkt_num;
1565        return 0;
1566}
1567
1568/** Parses the environment variable LIBTRACE_CONF into the supplied
1569 * configuration structure.
1570 *
1571 * @param[in,out] libtrace The trace from which we determine the URI and set
1572 * the configuration.
1573 *
1574 * We search for 3 environment variables and apply them to the config in the
1575 * following order. Such that the first has the lowest priority.
1576 *
1577 * 1. LIBTRACE_CONF, The global environment configuration
1578 * 2. LIBTRACE_CONF_<FORMAT>, Applied to a given format
1579 * 3. LIBTRACE_CONF_<FORMAT_URI>, Applied the specified trace
1580 *
1581 * E.g.
1582 * - int:eth0 would match LIBTRACE_CONF, LIBTRACE_CONF_INT, LIBTRACE_CONF_INT_ETH0
1583 * - dag:/dev/dag0,0 would match LIBTRACE_CONF, LIBTRACE_CONF_DAG, LIBTRACE_CONF_DAG__DEV_DAG0_0
1584 * - test.erf would match LIBTRACE_CONF, LIBTRACE_CONF_ERF, LIBTRACE_CONF_ERF_TEST_ERF
1585 *
1586 * @note All environment variables names MUST only contian
1587 * [A-Z], [0-9] and [_] (underscore) and not start with a number. Any characters
1588 * outside of this range should be captilised if possible or replaced with an
1589 * underscore.
1590 */
1591static void parse_env_config (libtrace_t *libtrace) {
1592        char env_name[1024] = "LIBTRACE_CONF_";
1593        size_t len = strlen(env_name);
1594        size_t mark = 0;
1595        size_t i;
1596        char * env;
1597
1598        /* Make our compound string */
1599        strncpy(&env_name[len], libtrace->format->name, sizeof(env_name) - len);
1600        len += strlen(libtrace->format->name);
1601        strncpy(&env_name[len], ":", sizeof(env_name) - len);
1602        len += 1;
1603        strncpy(&env_name[len], libtrace->uridata, sizeof(env_name) - len);
1604
1605        /* env names are allowed to be A-Z (CAPS) 0-9 and _ */
1606        for (i = 0; env_name[i] != 0; ++i) {
1607                env_name[i] = toupper(env_name[i]);
1608                if(env_name[i] == ':') {
1609                        mark = i;
1610                }
1611                if (!( (env_name[i] >= 'A' && env_name[i] <= 'Z') ||
1612                       (env_name[i] >= '0' && env_name[i] <= '9') )) {
1613                        env_name[i] = '_';
1614                }
1615        }
1616
1617        /* First apply global env settings LIBTRACE_CONF */
1618        env = getenv("LIBTRACE_CONF");
1619        if (env)
1620        {
1621                printf("Got env %s", env);
1622                trace_set_configuration(libtrace, env);
1623        }
1624
1625        /* Then format settings LIBTRACE_CONF_<FORMAT> */
1626        if (mark != 0) {
1627                env_name[mark] = 0;
1628                env = getenv(env_name);
1629                if (env) {
1630                        trace_set_configuration(libtrace, env);
1631                }
1632                env_name[mark] = '_';
1633        }
1634
1635        /* Finally this specific trace LIBTRACE_CONF_<FORMAT_URI> */
1636        env = getenv(env_name);
1637        if (env) {
1638                trace_set_configuration(libtrace, env);
1639        }
1640}
1641
1642DLLEXPORT bool trace_is_parallel(libtrace_t * libtrace) {
1643        if (libtrace->state == STATE_NEW)
1644                return trace_supports_parallel(libtrace);
1645        return libtrace->pread == trace_pread_packet_wrapper;
1646}
1647
1648DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob,
1649                           libtrace_callback_set_t *per_packet_cbs,
1650                           libtrace_callback_set_t *reporter_cbs) {
1651        int i;
1652        int ret = -1;
1653        char name[16];
1654        sigset_t sig_before, sig_block_all;
1655        assert(libtrace);
1656
1657        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1658        if (trace_is_err(libtrace)) {
1659                goto cleanup_none;
1660        }
1661
1662        if (libtrace->state == STATE_PAUSED) {
1663                ret = trace_prestart(libtrace, global_blob, per_packet_cbs, 
1664                                reporter_cbs);
1665                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1666                return ret;
1667        }
1668
1669        if (libtrace->state != STATE_NEW) {
1670                trace_set_err(libtrace, TRACE_ERR_BAD_STATE, "trace_pstart "
1671                              "should be called on a NEW or PAUSED trace but "
1672                              "instead was called from %s",
1673                              get_trace_state_name(libtrace->state));
1674                goto cleanup_none;
1675        }
1676
1677        /* Store the user defined things against the trace */
1678        libtrace->global_blob = global_blob;
1679
1680        /* Save a copy of the callbacks in case the user tries to change them
1681         * on us later */
1682        if (!per_packet_cbs) {
1683                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "trace_pstart "
1684                                "requires a non-NULL set of per packet "
1685                                "callbacks.");
1686                goto cleanup_none;
1687        }
1688
1689        if (per_packet_cbs->message_packet == NULL) {
1690                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "The per "
1691                                "packet callbacks must include a handler "
1692                                "for a packet. Please set this using "
1693                                "trace_set_packet_cb().");
1694                goto cleanup_none;
1695        }
1696
1697        libtrace->perpkt_cbs = trace_create_callback_set();
1698        memcpy(libtrace->perpkt_cbs, per_packet_cbs, sizeof(libtrace_callback_set_t));
1699       
1700        if (reporter_cbs) {
1701                libtrace->reporter_cbs = trace_create_callback_set();
1702                memcpy(libtrace->reporter_cbs, reporter_cbs, sizeof(libtrace_callback_set_t));
1703        }
1704
1705       
1706
1707
1708        /* And zero other fields */
1709        for (i = 0; i < THREAD_STATE_MAX; ++i) {
1710                libtrace->perpkt_thread_states[i] = 0;
1711        }
1712        libtrace->first_packets.first = 0;
1713        libtrace->first_packets.count = 0;
1714        libtrace->first_packets.packets = NULL;
1715        libtrace->perpkt_threads = NULL;
1716        /* Set a global which says we are using a parallel trace. This is
1717         * for backwards compatibility due to changes when destroying packets */
1718        libtrace_parallel = 1;
1719
1720        /* Parses configuration passed through environment variables */
1721        parse_env_config(libtrace);
1722        verify_configuration(libtrace);
1723
1724        ret = -1;
1725        /* Try start the format - we prefer parallel over single threaded, as
1726         * these formats should support messages better */
1727        if (trace_supports_parallel(libtrace) &&
1728            !trace_has_dedicated_hasher(libtrace)) {
1729                ret = libtrace->format->pstart_input(libtrace);
1730                libtrace->pread = trace_pread_packet_wrapper;
1731        }
1732        if (ret != 0) {
1733                if (libtrace->format->start_input) {
1734                        ret = libtrace->format->start_input(libtrace);
1735                }
1736                if (libtrace->perpkt_thread_count > 1)
1737                        libtrace->pread = trace_pread_packet_first_in_first_served;
1738                        /* Don't wait for a burst of packets if the format is
1739                         * live as this could block ring based formats and
1740                         * introduces delay. */
1741                        if (libtrace->format->info.live) {
1742                                libtrace->config.burst_size = 1;
1743                        }
1744                else
1745                        /* Use standard read_packet */
1746                        libtrace->pread = NULL;
1747        }
1748
1749        if (ret != 0) {
1750                goto cleanup_none;
1751        }
1752
1753        /* --- Start all the threads we need --- */
1754        /* Disable signals because it is inherited by the threads we start */
1755        sigemptyset(&sig_block_all);
1756        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_block_all, &sig_before), == 0);
1757
1758        /* If we need a hasher thread start it
1759         * Special Case: If single threaded we don't need a hasher
1760         */
1761        if (trace_has_dedicated_hasher(libtrace)) {
1762                libtrace->hasher_thread.type = THREAD_EMPTY;
1763                ret = trace_start_thread(libtrace, &libtrace->hasher_thread,
1764                                   THREAD_HASHER, hasher_entry, -1,
1765                                   "hasher-thread");
1766                if (ret != 0)
1767                        goto cleanup_started;
1768                libtrace->pread = trace_pread_packet_hasher_thread;
1769        } else {
1770                libtrace->hasher_thread.type = THREAD_EMPTY;
1771        }
1772
1773        /* Start up our perpkt threads */
1774        libtrace->perpkt_threads = calloc(sizeof(libtrace_thread_t),
1775                                          libtrace->perpkt_thread_count);
1776        if (!libtrace->perpkt_threads) {
1777                trace_set_err(libtrace, errno, "trace_pstart "
1778                              "failed to allocate memory.");
1779                goto cleanup_threads;
1780        }
1781        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1782                snprintf(name, sizeof(name), "perpkt-%d", i);
1783                libtrace_zero_thread(&libtrace->perpkt_threads[i]);
1784                ret = trace_start_thread(libtrace, &libtrace->perpkt_threads[i],
1785                                   THREAD_PERPKT, perpkt_threads_entry, i,
1786                                   name);
1787                if (ret != 0)
1788                        goto cleanup_threads;
1789        }
1790
1791        /* Start the reporter thread */
1792        if (reporter_cbs) {
1793                if (libtrace->combiner.initialise)
1794                        libtrace->combiner.initialise(libtrace, &libtrace->combiner);
1795                ret = trace_start_thread(libtrace, &libtrace->reporter_thread,
1796                                   THREAD_REPORTER, reporter_entry, -1,
1797                                   "reporter_thread");
1798                if (ret != 0)
1799                        goto cleanup_threads;
1800        }
1801
1802        /* Start the keepalive thread */
1803        if (libtrace->config.tick_interval > 0) {
1804                ret = trace_start_thread(libtrace, &libtrace->keepalive_thread,
1805                                   THREAD_KEEPALIVE, keepalive_entry, -1,
1806                                   "keepalive_thread");
1807                if (ret != 0)
1808                        goto cleanup_threads;
1809        }
1810
1811        /* Init other data structures */
1812        libtrace->perpkt_thread_states[THREAD_RUNNING] = libtrace->perpkt_thread_count;
1813        ASSERT_RET(pthread_spin_init(&libtrace->first_packets.lock, 0), == 0);
1814        libtrace->first_packets.packets = calloc(libtrace->perpkt_thread_count,
1815                                                 sizeof(*libtrace->first_packets.packets));
1816        if (libtrace->first_packets.packets == NULL) {
1817                trace_set_err(libtrace, errno, "trace_pstart "
1818                              "failed to allocate memory.");
1819                goto cleanup_threads;
1820        }
1821
1822        if (libtrace_ocache_init(&libtrace->packet_freelist,
1823                             (void* (*)()) trace_create_packet,
1824                             (void (*)(void *))trace_destroy_packet,
1825                             libtrace->config.thread_cache_size,
1826                             libtrace->config.cache_size * 4,
1827                             libtrace->config.fixed_count) != 0) {
1828                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "trace_pstart "
1829                              "failed to allocate ocache.");
1830                goto cleanup_threads;
1831        }
1832
1833        /* Threads don't start */
1834        libtrace->started = true;
1835        libtrace_change_state(libtrace, STATE_RUNNING, false);
1836
1837        ret = 0;
1838        goto success;
1839cleanup_threads:
1840        if (libtrace->first_packets.packets) {
1841                free(libtrace->first_packets.packets);
1842                libtrace->first_packets.packets = NULL;
1843        }
1844        libtrace_change_state(libtrace, STATE_ERROR, false);
1845        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1846        if (libtrace->hasher_thread.type == THREAD_HASHER) {
1847                pthread_join(libtrace->hasher_thread.tid, NULL);
1848                libtrace_zero_thread(&libtrace->hasher_thread);
1849        }
1850
1851        if (libtrace->perpkt_threads) {
1852                for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1853                        if (libtrace->perpkt_threads[i].type == THREAD_PERPKT) {
1854                                pthread_join(libtrace->perpkt_threads[i].tid, NULL);
1855                                libtrace_zero_thread(&libtrace->perpkt_threads[i]);
1856                        } else break;
1857                }
1858                free(libtrace->perpkt_threads);
1859                libtrace->perpkt_threads = NULL;
1860        }
1861
1862        if (libtrace->reporter_thread.type == THREAD_REPORTER) {
1863                pthread_join(libtrace->reporter_thread.tid, NULL);
1864                libtrace_zero_thread(&libtrace->reporter_thread);
1865        }
1866
1867        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
1868                pthread_join(libtrace->keepalive_thread.tid, NULL);
1869                libtrace_zero_thread(&libtrace->keepalive_thread);
1870        }
1871        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1872        libtrace_change_state(libtrace, STATE_NEW, false);
1873        assert(libtrace->perpkt_thread_states[THREAD_RUNNING] == 0);
1874        libtrace->perpkt_thread_states[THREAD_FINISHED] = 0;
1875cleanup_started:
1876        if (libtrace->pread == trace_pread_packet_wrapper) {
1877                if (libtrace->format->ppause_input)
1878                        libtrace->format->ppause_input(libtrace);
1879        } else {
1880                if (libtrace->format->pause_input)
1881                        libtrace->format->pause_input(libtrace);
1882        }
1883        ret = -1;
1884success:
1885        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_before, NULL), == 0);
1886cleanup_none:
1887        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1888        return ret;
1889}
1890
1891DLLEXPORT int trace_set_starting_cb(libtrace_callback_set_t *cbset,
1892                fn_cb_starting handler) {
1893        cbset->message_starting = handler;
1894        return 0;
1895}
1896
1897DLLEXPORT int trace_set_pausing_cb(libtrace_callback_set_t *cbset,
1898                fn_cb_dataless handler) {
1899        cbset->message_pausing = handler;
1900        return 0;
1901}
1902
1903DLLEXPORT int trace_set_resuming_cb(libtrace_callback_set_t *cbset,
1904                fn_cb_dataless handler) {
1905        cbset->message_resuming = handler;
1906        return 0;
1907}
1908
1909DLLEXPORT int trace_set_stopping_cb(libtrace_callback_set_t *cbset,
1910                fn_cb_dataless handler) {
1911        cbset->message_stopping = handler;
1912        return 0;
1913}
1914
1915DLLEXPORT int trace_set_packet_cb(libtrace_callback_set_t *cbset,
1916                fn_cb_packet handler) {
1917        cbset->message_packet = handler;
1918        return 0;
1919}
1920
1921DLLEXPORT int trace_set_first_packet_cb(libtrace_callback_set_t *cbset,
1922                fn_cb_first_packet handler) {
1923        cbset->message_first_packet = handler;
1924        return 0;
1925}
1926
1927DLLEXPORT int trace_set_tick_count_cb(libtrace_callback_set_t *cbset,
1928                fn_cb_tick handler) {
1929        cbset->message_tick_count = handler;
1930        return 0;
1931}
1932
1933DLLEXPORT int trace_set_tick_interval_cb(libtrace_callback_set_t *cbset,
1934                fn_cb_tick handler) {
1935        cbset->message_tick_interval = handler;
1936        return 0;
1937}
1938
1939DLLEXPORT int trace_set_result_cb(libtrace_callback_set_t *cbset,
1940                fn_cb_result handler) {
1941        cbset->message_result = handler;
1942        return 0;
1943}
1944
1945DLLEXPORT int trace_set_user_message_cb(libtrace_callback_set_t *cbset,
1946                fn_cb_usermessage handler) {
1947        cbset->message_user = handler;
1948        return 0;
1949}
1950
1951/*
1952 * Pauses a trace, this should only be called by the main thread
1953 * 1. Set started = false
1954 * 2. All perpkt threads are paused waiting on a condition var
1955 * 3. Then call ppause on the underlying format if found
1956 * 4. The traces state is paused
1957 *
1958 * Once done you should be able to modify the trace setup and call pstart again
1959 * TODO add support to change the number of threads.
1960 */
1961DLLEXPORT int trace_ppause(libtrace_t *libtrace)
1962{
1963        libtrace_thread_t *t;
1964        int i;
1965        assert(libtrace);
1966
1967        t = get_thread_table(libtrace);
1968        // Check state from within the lock if we are going to change it
1969        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1970
1971        /* If we are already paused, just treat this as a NOOP */
1972        if (libtrace->state == STATE_PAUSED) {
1973                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1974                return 0;
1975        }
1976        if (!libtrace->started || libtrace->state != STATE_RUNNING) {
1977                trace_set_err(libtrace,TRACE_ERR_BAD_STATE, "You must call trace_start() before calling trace_ppause()");
1978                return -1;
1979        }
1980
1981        libtrace_change_state(libtrace, STATE_PAUSING, false);
1982        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1983
1984        // Special case handle the hasher thread case
1985        if (trace_has_dedicated_hasher(libtrace)) {
1986                if (libtrace->config.debug_state)
1987                        fprintf(stderr, "Hasher thread is running, asking it to pause ...");
1988                libtrace_message_t message = {0, {.uint64=0}, NULL};
1989                message.code = MESSAGE_DO_PAUSE;
1990                trace_message_thread(libtrace, &libtrace->hasher_thread, &message);
1991                // Wait for it to pause
1992                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1993                while (libtrace->hasher_thread.state == THREAD_RUNNING) {
1994                        ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
1995                }
1996                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1997                if (libtrace->config.debug_state)
1998                        fprintf(stderr, " DONE\n");
1999        }
2000
2001        if (libtrace->config.debug_state)
2002                fprintf(stderr, "Asking perpkt threads to pause ...");
2003        // Stop threads, skip this one if it's a perpkt
2004        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2005                if (&libtrace->perpkt_threads[i] != t) {
2006                        libtrace_message_t message = {0, {.uint64=0}, NULL};
2007                        message.code = MESSAGE_DO_PAUSE;
2008                        ASSERT_RET(trace_message_thread(libtrace, &libtrace->perpkt_threads[i], &message), != -1);
2009                        if(trace_has_dedicated_hasher(libtrace)) {
2010                                // The hasher has stopped and other threads have messages waiting therefore
2011                                // If the queues are empty the other threads would have no data
2012                                // So send some message packets to simply ask the threads to check
2013                                // We are the only writer since hasher has paused
2014                                libtrace_packet_t *pkt;
2015                                libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &pkt, 1, 1);
2016                                pkt->error = READ_MESSAGE;
2017                                libtrace_ringbuffer_write(&libtrace->perpkt_threads[i].rbuffer, pkt);
2018                        }
2019                } else {
2020                        fprintf(stderr, "Mapper threads should not be used to pause a trace this could cause any number of problems!!\n");
2021                }
2022        }
2023
2024        if (t) {
2025                // A perpkt is doing the pausing, interesting, fake an extra thread paused
2026                // We rely on the user to *not* return before starting the trace again
2027                thread_change_state(libtrace, t, THREAD_PAUSED, true);
2028        }
2029
2030        // Wait for all threads to pause
2031        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
2032        while(libtrace->perpkt_thread_states[THREAD_RUNNING]) {
2033                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
2034        }
2035        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2036
2037        if (libtrace->config.debug_state)
2038                fprintf(stderr, " DONE\n");
2039
2040        // Deal with the reporter
2041        if (trace_has_reporter(libtrace)) {
2042                if (libtrace->config.debug_state)
2043                        fprintf(stderr, "Reporter thread is running, asking it to pause ...");
2044                if (pthread_equal(pthread_self(), libtrace->reporter_thread.tid)) {
2045                        libtrace->combiner.pause(libtrace, &libtrace->combiner);
2046                        thread_change_state(libtrace, &libtrace->reporter_thread, THREAD_PAUSED, true);
2047               
2048                } else {
2049                        libtrace_message_t message = {0, {.uint64=0}, NULL};
2050                        message.code = MESSAGE_DO_PAUSE;
2051                        trace_message_thread(libtrace, &libtrace->reporter_thread, &message);
2052                        // Wait for it to pause
2053                        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
2054                        while (libtrace->reporter_thread.state == THREAD_RUNNING) {
2055                                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
2056                        }
2057                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2058                }
2059                if (libtrace->config.debug_state)
2060                        fprintf(stderr, " DONE\n");
2061        }
2062
2063        /* Cache values before we pause */
2064        if (libtrace->stats == NULL)
2065                libtrace->stats = trace_create_statistics();
2066        // Save the statistics against the trace
2067        trace_get_statistics(libtrace, NULL);
2068        if (trace_is_parallel(libtrace)) {
2069                libtrace->started = false;
2070                if (libtrace->format->ppause_input)
2071                        libtrace->format->ppause_input(libtrace);
2072                // TODO What happens if we don't have pause input??
2073        } else {
2074                int err;
2075                err = trace_pause(libtrace);
2076                // We should handle this a bit better
2077                if (err)
2078                        return err;
2079        }
2080
2081        // Only set as paused after the pause has been called on the trace
2082        libtrace_change_state(libtrace, STATE_PAUSED, true);
2083        return 0;
2084}
2085
2086/**
2087 * Stop trace finish prematurely as though it meet an EOF
2088 * This should only be called by the main thread
2089 * 1. Calls ppause
2090 * 2. Sends a message asking for threads to finish
2091 * 3. Releases threads which will pause
2092 */
2093DLLEXPORT int trace_pstop(libtrace_t *libtrace)
2094{
2095        int i, err;
2096        libtrace_message_t message = {0, {.uint64=0}, NULL};
2097        assert(libtrace);
2098
2099        // Ensure all threads have paused and the underlying trace format has
2100        // been closed and all packets associated are cleaned up
2101        // Pause will do any state checks for us
2102        err = trace_ppause(libtrace);
2103        if (err)
2104                return err;
2105
2106        // Now send a message asking the threads to stop
2107        // This will be retrieved before trying to read another packet
2108
2109        message.code = MESSAGE_DO_STOP;
2110        trace_message_perpkts(libtrace, &message);
2111        if (trace_has_dedicated_hasher(libtrace))
2112                trace_message_thread(libtrace, &libtrace->hasher_thread, &message);
2113
2114        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2115                trace_message_thread(libtrace, &libtrace->perpkt_threads[i], &message);
2116        }
2117
2118        /* Now release the threads and let them stop - when the threads finish
2119         * the state will be set to finished */
2120        libtrace_change_state(libtrace, STATE_FINISHING, true);
2121        return 0;
2122}
2123
2124DLLEXPORT int trace_set_hasher(libtrace_t *trace, enum hasher_types type, fn_hasher hasher, void *data) {
2125        int ret = -1;
2126        if ((type == HASHER_CUSTOM && !hasher) || (type == HASHER_BALANCE && hasher)) {
2127                return -1;
2128        }
2129
2130        // Save the requirements
2131        trace->hasher_type = type;
2132        if (hasher) {
2133                trace->hasher = hasher;
2134                trace->hasher_data = data;
2135        } else {
2136                trace->hasher = NULL;
2137                trace->hasher_data = NULL;
2138        }
2139
2140        // Try push this to hardware - NOTE hardware could do custom if
2141        // there is a more efficient way to apply it, in this case
2142        // it will simply grab the function out of libtrace_t
2143        if (trace_supports_parallel(trace) && trace->format->config_input)
2144                ret = trace->format->config_input(trace, TRACE_OPTION_HASHER, &type);
2145
2146        if (ret == -1) {
2147                /* We have to deal with this ourself */
2148                if (!hasher) {
2149                        switch (type)
2150                        {
2151                                case HASHER_CUSTOM:
2152                                case HASHER_BALANCE:
2153                                        return 0;
2154                                case HASHER_BIDIRECTIONAL:
2155                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
2156                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
2157                                        toeplitz_init_config(trace->hasher_data, 1);
2158                                        return 0;
2159                                case HASHER_UNIDIRECTIONAL:
2160                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
2161                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
2162                                        toeplitz_init_config(trace->hasher_data, 0);
2163                                        return 0;
2164                        }
2165                        return -1;
2166                }
2167        } else {
2168                /* If the hasher is hardware we zero out the hasher and hasher
2169                 * data fields - only if we need a hasher do we do this */
2170                trace->hasher = NULL;
2171                trace->hasher_data = NULL;
2172        }
2173
2174        return 0;
2175}
2176
2177// Waits for all threads to finish
2178DLLEXPORT void trace_join(libtrace_t *libtrace) {
2179        int i;
2180
2181        /* Firstly wait for the perpkt threads to finish, since these are
2182         * user controlled */
2183        for (i=0; i< libtrace->perpkt_thread_count; i++) {
2184                ASSERT_RET(pthread_join(libtrace->perpkt_threads[i].tid, NULL), == 0);
2185                // So we must do our best effort to empty the queue - so
2186                // the producer (or any other threads) don't block.
2187                libtrace_packet_t * packet;
2188                assert(libtrace->perpkt_threads[i].state == THREAD_FINISHED);
2189                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
2190                        if (packet) // This could be NULL iff the perpkt finishes early
2191                                trace_destroy_packet(packet);
2192        }
2193
2194        /* Now the hasher */
2195        if (trace_has_dedicated_hasher(libtrace)) {
2196                pthread_join(libtrace->hasher_thread.tid, NULL);
2197                assert(libtrace->hasher_thread.state == THREAD_FINISHED);
2198        }
2199
2200        // Now that everything is finished nothing can be touching our
2201        // buffers so clean them up
2202        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2203                // Its possible 1 packet got added by the reporter (or 1 per any other thread) since we cleaned up
2204                // if they lost timeslice before-during a write
2205                libtrace_packet_t * packet;
2206                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
2207                        trace_destroy_packet(packet);
2208                if (trace_has_dedicated_hasher(libtrace)) {
2209                        assert(libtrace_ringbuffer_is_empty(&libtrace->perpkt_threads[i].rbuffer));
2210                        libtrace_ringbuffer_destroy(&libtrace->perpkt_threads[i].rbuffer);
2211                }
2212                // Cannot destroy vector yet, this happens with trace_destroy
2213        }
2214
2215        if (trace_has_reporter(libtrace)) {
2216                pthread_join(libtrace->reporter_thread.tid, NULL);
2217                assert(libtrace->reporter_thread.state == THREAD_FINISHED);
2218        }
2219
2220        // Wait for the tick (keepalive) thread if it has been started
2221        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
2222                libtrace_message_t msg = {0, {.uint64=0}, NULL};
2223                msg.code = MESSAGE_DO_STOP;
2224                trace_message_thread(libtrace, &libtrace->keepalive_thread, &msg);
2225                pthread_join(libtrace->keepalive_thread.tid, NULL);
2226        }
2227
2228        libtrace_change_state(libtrace, STATE_JOINED, true);
2229        print_memory_stats();
2230}
2231
2232DLLEXPORT int libtrace_thread_get_message_count(libtrace_t * libtrace,
2233                                                libtrace_thread_t *t)
2234{
2235        int ret;
2236        if (t == NULL)
2237                t = get_thread_descriptor(libtrace);
2238        if (t == NULL)
2239                return -1;
2240        ret = libtrace_message_queue_count(&t->messages);
2241        return ret < 0 ? 0 : ret;
2242}
2243
2244DLLEXPORT int libtrace_thread_get_message(libtrace_t * libtrace,
2245                                          libtrace_thread_t *t,
2246                                          libtrace_message_t * message)
2247{
2248        int ret;
2249        if (t == NULL)
2250                t = get_thread_descriptor(libtrace);
2251        if (t == NULL)
2252                return -1;
2253        ret = libtrace_message_queue_get(&t->messages, message);
2254        return ret < 0 ? 0 : ret;
2255}
2256
2257DLLEXPORT int libtrace_thread_try_get_message(libtrace_t * libtrace,
2258                                              libtrace_thread_t *t,
2259                                              libtrace_message_t * message)
2260{
2261        if (t == NULL)
2262                t = get_thread_descriptor(libtrace);
2263        if (t == NULL)
2264                return -1;
2265        if (libtrace_message_queue_try_get(&t->messages, message) != LIBTRACE_MQ_FAILED)
2266                return 0;
2267        else
2268                return -1;
2269}
2270
2271DLLEXPORT int trace_message_thread(libtrace_t * libtrace, libtrace_thread_t *t, libtrace_message_t * message)
2272{
2273        int ret;
2274        if (!message->sender)
2275                message->sender = get_thread_descriptor(libtrace);
2276
2277        ret = libtrace_message_queue_put(&t->messages, message);
2278        return ret < 0 ? 0 : ret;
2279}
2280
2281DLLEXPORT int trace_message_reporter(libtrace_t * libtrace, libtrace_message_t * message)
2282{
2283        if (!trace_has_reporter(libtrace) ||
2284            !(libtrace->reporter_thread.state == THREAD_RUNNING
2285              || libtrace->reporter_thread.state == THREAD_PAUSED))
2286                return -1;
2287
2288        return trace_message_thread(libtrace, &libtrace->reporter_thread, message);
2289}
2290
2291DLLEXPORT int trace_post_reporter(libtrace_t *libtrace)
2292{
2293        libtrace_message_t message = {0, {.uint64=0}, NULL};
2294        message.code = MESSAGE_POST_REPORTER;
2295        return trace_message_reporter(libtrace, (void *) &message);
2296}
2297
2298DLLEXPORT int trace_message_perpkts(libtrace_t * libtrace, libtrace_message_t * message)
2299{
2300        int i;
2301        int missed = 0;
2302        if (message->sender == NULL)
2303                message->sender = get_thread_descriptor(libtrace);
2304        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2305                if (libtrace->perpkt_threads[i].state == THREAD_RUNNING ||
2306                    libtrace->perpkt_threads[i].state == THREAD_PAUSED) {
2307                        libtrace_message_queue_put(&libtrace->perpkt_threads[i].messages, message);
2308                } else {
2309                        missed += 1;
2310                }
2311        }
2312        return -missed;
2313}
2314
2315/**
2316 * Publishes a result to the reduce queue
2317 * Should only be called by a perpkt thread, i.e. from a perpkt handler
2318 */
2319DLLEXPORT void trace_publish_result(libtrace_t *libtrace, libtrace_thread_t *t, uint64_t key, libtrace_generic_t value, int type) {
2320        libtrace_result_t res;
2321        res.type = type;
2322        res.key = key;
2323        res.value = value;
2324        assert(libtrace->combiner.publish);
2325        libtrace->combiner.publish(libtrace, t->perpkt_num, &libtrace->combiner, &res);
2326        return;
2327}
2328
2329DLLEXPORT void trace_set_combiner(libtrace_t *trace, const libtrace_combine_t *combiner, libtrace_generic_t config){
2330        if (combiner) {
2331                trace->combiner = *combiner;
2332                trace->combiner.configuration = config;
2333        } else {
2334                // No combiner, so don't try use it
2335                memset(&trace->combiner, 0, sizeof(trace->combiner));
2336        }
2337}
2338
2339DLLEXPORT uint64_t trace_packet_get_order(libtrace_packet_t * packet) {
2340        return packet->order;
2341}
2342
2343DLLEXPORT uint64_t trace_packet_get_hash(libtrace_packet_t * packet) {
2344        return packet->hash;
2345}
2346
2347DLLEXPORT void trace_packet_set_order(libtrace_packet_t * packet, uint64_t order) {
2348        packet->order = order;
2349}
2350
2351DLLEXPORT void trace_packet_set_hash(libtrace_packet_t * packet, uint64_t hash) {
2352        packet->hash = hash;
2353}
2354
2355DLLEXPORT bool trace_has_finished(libtrace_t * libtrace) {
2356        return libtrace->state == STATE_FINISHED || libtrace->state == STATE_JOINED;
2357}
2358
2359/**
2360 * @return True if the trace is not running such that it can be configured
2361 */
2362static inline bool trace_is_configurable(libtrace_t *trace) {
2363        return trace->state == STATE_NEW ||
2364                        trace->state == STATE_PAUSED;
2365}
2366
2367DLLEXPORT int trace_set_perpkt_threads(libtrace_t *trace, int nb) {
2368        if (!trace_is_configurable(trace)) return -1;
2369
2370        /* TODO consider allowing an offset from the total number of cores i.e.
2371         * -1 reserve 1 core */
2372        if (nb >= 0) {
2373                trace->config.perpkt_threads = nb;
2374                return 0;
2375        } else {
2376                return -1;
2377        }
2378}
2379
2380DLLEXPORT int trace_set_tick_interval(libtrace_t *trace, size_t millisec) {
2381        if (!trace_is_configurable(trace)) return -1;
2382
2383        trace->config.tick_interval = millisec;
2384        return 0;
2385}
2386
2387DLLEXPORT int trace_set_tick_count(libtrace_t *trace, size_t count) {
2388        if (!trace_is_configurable(trace)) return -1;
2389
2390        trace->config.tick_count = count;
2391        return 0;
2392}
2393
2394DLLEXPORT int trace_set_tracetime(libtrace_t *trace, bool tracetime) {
2395        if (!trace_is_configurable(trace)) return -1;
2396
2397        trace->tracetime = tracetime;
2398        return 0;
2399}
2400
2401DLLEXPORT int trace_set_cache_size(libtrace_t *trace, size_t size) {
2402        if (!trace_is_configurable(trace)) return -1;
2403
2404        trace->config.cache_size = size;
2405        return 0;
2406}
2407
2408DLLEXPORT int trace_set_thread_cache_size(libtrace_t *trace, size_t size) {
2409        if (!trace_is_configurable(trace)) return -1;
2410
2411        trace->config.thread_cache_size = size;
2412        return 0;
2413}
2414
2415DLLEXPORT int trace_set_fixed_count(libtrace_t *trace, bool fixed) {
2416        if (!trace_is_configurable(trace)) return -1;
2417
2418        trace->config.fixed_count = fixed;
2419        return 0;
2420}
2421
2422DLLEXPORT int trace_set_burst_size(libtrace_t *trace, size_t size) {
2423        if (!trace_is_configurable(trace)) return -1;
2424
2425        trace->config.burst_size = size;
2426        return 0;
2427}
2428
2429DLLEXPORT int trace_set_hasher_queue_size(libtrace_t *trace, size_t size) {
2430        if (!trace_is_configurable(trace)) return -1;
2431
2432        trace->config.hasher_queue_size = size;
2433        return 0;
2434}
2435
2436DLLEXPORT int trace_set_hasher_polling(libtrace_t *trace, bool polling) {
2437        if (!trace_is_configurable(trace)) return -1;
2438
2439        trace->config.hasher_polling = polling;
2440        return 0;
2441}
2442
2443DLLEXPORT int trace_set_reporter_polling(libtrace_t *trace, bool polling) {
2444        if (!trace_is_configurable(trace)) return -1;
2445
2446        trace->config.reporter_polling = polling;
2447        return 0;
2448}
2449
2450DLLEXPORT int trace_set_reporter_thold(libtrace_t *trace, size_t thold) {
2451        if (!trace_is_configurable(trace)) return -1;
2452
2453        trace->config.reporter_thold = thold;
2454        return 0;
2455}
2456
2457DLLEXPORT int trace_set_debug_state(libtrace_t *trace, bool debug_state) {
2458        if (!trace_is_configurable(trace)) return -1;
2459
2460        trace->config.debug_state = debug_state;
2461        return 0;
2462}
2463
2464static bool config_bool_parse(char *value, size_t nvalue) {
2465        if (strncmp(value, "true", nvalue) == 0)
2466                return true;
2467        else if (strncmp(value, "false", nvalue) == 0)
2468                return false;
2469        else
2470                return strtoll(value, NULL, 10) != 0;
2471}
2472
2473/* Note update documentation on trace_set_configuration */
2474static void config_string(struct user_configuration *uc, char *key, size_t nkey, char *value, size_t nvalue) {
2475        assert(key);
2476        assert(value);
2477        assert(uc);
2478        if (strncmp(key, "cache_size", nkey) == 0
2479            || strncmp(key, "cs", nkey) == 0) {
2480                uc->cache_size = strtoll(value, NULL, 10);
2481        } else if (strncmp(key, "thread_cache_size", nkey) == 0
2482                   || strncmp(key, "tcs", nkey) == 0) {
2483                uc->thread_cache_size = strtoll(value, NULL, 10);
2484        } else if (strncmp(key, "fixed_count", nkey) == 0
2485                   || strncmp(key, "fc", nkey) == 0) {
2486                uc->fixed_count = config_bool_parse(value, nvalue);
2487        } else if (strncmp(key, "burst_size", nkey) == 0
2488                   || strncmp(key, "bs", nkey) == 0) {
2489                uc->burst_size = strtoll(value, NULL, 10);
2490        } else if (strncmp(key, "tick_interval", nkey) == 0
2491                   || strncmp(key, "ti", nkey) == 0) {
2492                uc->tick_interval = strtoll(value, NULL, 10);
2493        } else if (strncmp(key, "tick_count", nkey) == 0
2494                   || strncmp(key, "tc", nkey) == 0) {
2495                uc->tick_count = strtoll(value, NULL, 10);
2496        } else if (strncmp(key, "perpkt_threads", nkey) == 0
2497                   || strncmp(key, "pt", nkey) == 0) {
2498                uc->perpkt_threads = strtoll(value, NULL, 10);
2499        } else if (strncmp(key, "hasher_queue_size", nkey) == 0
2500                   || strncmp(key, "hqs", nkey) == 0) {
2501                uc->hasher_queue_size = strtoll(value, NULL, 10);
2502        } else if (strncmp(key, "hasher_polling", nkey) == 0
2503                   || strncmp(key, "hp", nkey) == 0) {
2504                uc->hasher_polling = config_bool_parse(value, nvalue);
2505        } else if (strncmp(key, "reporter_polling", nkey) == 0
2506                   || strncmp(key, "rp", nkey) == 0) {
2507                uc->reporter_polling = config_bool_parse(value, nvalue);
2508        } else if (strncmp(key, "reporter_thold", nkey) == 0
2509                   || strncmp(key, "rt", nkey) == 0) {
2510                uc->reporter_thold = strtoll(value, NULL, 10);
2511        } else if (strncmp(key, "debug_state", nkey) == 0
2512                   || strncmp(key, "ds", nkey) == 0) {
2513                uc->debug_state = config_bool_parse(value, nvalue);
2514        } else {
2515                fprintf(stderr, "No matching option %s(=%s), ignoring\n", key, value);
2516        }
2517}
2518
2519DLLEXPORT int trace_set_configuration(libtrace_t *trace, const char *str) {
2520        char *pch;
2521        char key[100];
2522        char value[100];
2523        char *dup;
2524        assert(str);
2525        assert(trace);
2526
2527        if (!trace_is_configurable(trace)) return -1;
2528
2529        dup = strdup(str);
2530        pch = strtok (dup," ,.-");
2531        while (pch != NULL)
2532        {
2533                if (sscanf(pch, "%99[^=]=%99s", key, value) == 2) {
2534                        config_string(&trace->config, key, sizeof(key), value, sizeof(value));
2535                } else {
2536                        fprintf(stderr, "Error parsing option %s\n", pch);
2537                }
2538                pch = strtok (NULL," ,.-");
2539        }
2540        free(dup);
2541
2542        return 0;
2543}
2544
2545DLLEXPORT int trace_set_configuration_file(libtrace_t *trace, FILE *file) {
2546        char line[1024];
2547        if (!trace_is_configurable(trace)) return -1;
2548
2549        while (fgets(line, sizeof(line), file) != NULL)
2550        {
2551                trace_set_configuration(trace, line);
2552        }
2553
2554        if(ferror(file))
2555                return -1;
2556        else
2557                return 0;
2558}
2559
2560DLLEXPORT void trace_free_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
2561        assert(packet);
2562        /* Always release any resources this might be holding */
2563        trace_fin_packet(packet);
2564        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packet, 1, 1);
2565}
2566
2567DLLEXPORT libtrace_info_t *trace_get_information(libtrace_t * libtrace) {
2568        if (libtrace->format)
2569                return &libtrace->format->info;
2570        else
2571                return NULL;
2572}
Note: See TracBrowser for help on using the repository browser.