source: lib/trace_parallel.c @ ea75ec2

cachetimestampsdevelopdpdk-ndagetsilivendag_formatrc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformance
Last change on this file since ea75ec2 was ea75ec2, checked in by Shane Alcock <salcock@…>, 4 years ago

Add new format for receiving and parsing nDAG records

Also added new statistic: missing_records -- this tracks records
that have gone missing between the original capture device and
the current processing host (i.e. lost on the network between the
two). Useful for nDAG as there is a distinction between packets
dropped by the DAG and packets that were not received by the nDAG
client.

  • Property mode set to 100644
File size: 83.8 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27#define _GNU_SOURCE
28#include "common.h"
29#include "config.h"
30#include <assert.h>
31#include <errno.h>
32#include <fcntl.h>
33#include <stdio.h>
34#include <stdlib.h>
35#include <string.h>
36#include <sys/stat.h>
37#include <sys/types.h>
38#ifndef WIN32
39#include <sys/socket.h>
40#endif
41#include <stdarg.h>
42#include <sys/param.h>
43
44#ifdef HAVE_LIMITS_H
45#  include <limits.h>
46#endif
47
48#ifdef HAVE_SYS_LIMITS_H
49#  include <sys/limits.h>
50#endif
51
52#ifdef HAVE_NET_IF_ARP_H
53#  include <net/if_arp.h>
54#endif
55
56#ifdef HAVE_NET_IF_H
57#  include <net/if.h>
58#endif
59
60#ifdef HAVE_NETINET_IN_H
61#  include <netinet/in.h>
62#endif
63
64#ifdef HAVE_NET_ETHERNET_H
65#  include <net/ethernet.h>
66#endif
67
68#ifdef HAVE_NETINET_IF_ETHER_H
69#  include <netinet/if_ether.h>
70#endif
71
72#include <time.h>
73#ifdef WIN32
74#include <sys/timeb.h>
75#endif
76
77#include "libtrace.h"
78#include "libtrace_parallel.h"
79
80#ifdef HAVE_NET_BPF_H
81#  include <net/bpf.h>
82#else
83#  ifdef HAVE_PCAP_BPF_H
84#    include <pcap-bpf.h>
85#  endif
86#endif
87
88
89#include "libtrace_int.h"
90#include "format_helper.h"
91#include "rt_protocol.h"
92#include "hash_toeplitz.h"
93
94#include <pthread.h>
95#include <signal.h>
96#include <unistd.h>
97#include <ctype.h>
98
99static inline int delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t);
100extern int libtrace_parallel;
101
102struct mem_stats {
103        struct memfail {
104           uint64_t cache_hit;
105           uint64_t ring_hit;
106           uint64_t miss;
107           uint64_t recycled;
108        } readbulk, read, write, writebulk;
109};
110
111
112#ifdef ENABLE_MEM_STATS
113// Grrr gcc wants this spelt out
114__thread struct mem_stats mem_hits = {{0},{0},{0},{0}};
115
116
117static void print_memory_stats() {
118        uint64_t total;
119#if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__)
120        char t_name[50];
121        pthread_getname_np(pthread_self(), t_name, sizeof(t_name));
122
123        fprintf(stderr, "Thread ID#%d - %s\n", (int) pthread_self(), t_name);
124#else
125        fprintf(stderr, "Thread ID#%d\n", (int) pthread_self());
126#endif
127
128        total = mem_hits.read.cache_hit + mem_hits.read.ring_hit + mem_hits.read.miss;
129        if (total) {
130                fprintf(stderr, "\tRead:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
131                                mem_hits.read.cache_hit, mem_hits.read.ring_hit, mem_hits.read.miss, mem_hits.read.recycled);
132                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
133                                total, (double) mem_hits.read.miss / (double) total * 100.0);
134        }
135
136        total = mem_hits.readbulk.cache_hit + mem_hits.readbulk.ring_hit + mem_hits.readbulk.miss;
137        if (total) {
138                fprintf(stderr, "\tReadbulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
139                                mem_hits.readbulk.cache_hit, mem_hits.readbulk.ring_hit, mem_hits.readbulk.miss, mem_hits.readbulk.recycled);
140
141
142                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
143                                total, (double) mem_hits.readbulk.miss / (double) total * 100.0);
144        }
145
146        total = mem_hits.write.cache_hit + mem_hits.write.ring_hit + mem_hits.write.miss;
147        if (total) {
148                fprintf(stderr, "\tWrite:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
149                                mem_hits.write.cache_hit, mem_hits.write.ring_hit, mem_hits.write.miss, mem_hits.write.recycled);
150
151                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
152                                total, (double) mem_hits.write.miss / (double) total * 100.0);
153        }
154
155        total = mem_hits.writebulk.cache_hit + mem_hits.writebulk.ring_hit + mem_hits.writebulk.miss;
156        if (total) {
157                fprintf(stderr, "\tWritebulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
158                                mem_hits.writebulk.cache_hit, mem_hits.writebulk.ring_hit, mem_hits.writebulk.miss, mem_hits.writebulk.recycled);
159
160                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
161                                total, (double) mem_hits.writebulk.miss / (double) total * 100.0);
162        }
163}
164#else
165static void print_memory_stats() {}
166#endif
167
168static const libtrace_generic_t gen_zero = {0};
169
170/* This should optimise away the switch to nothing in the explict cases */
171inline void send_message(libtrace_t *trace, libtrace_thread_t *thread,
172                const enum libtrace_messages type,
173                libtrace_generic_t data, libtrace_thread_t *sender) {
174
175        fn_cb_dataless fn = NULL;
176        enum libtrace_messages switchtype;
177        libtrace_callback_set_t *cbs = NULL;
178
179        if (thread == &trace->reporter_thread) {
180                cbs = trace->reporter_cbs;
181        } else {
182                cbs = trace->perpkt_cbs;
183        }
184
185        if (cbs == NULL)
186                return;
187
188        if (type >= MESSAGE_USER)
189                switchtype = MESSAGE_USER;
190        else
191                switchtype = (enum libtrace_messages) type;
192
193        switch (switchtype) {
194        case MESSAGE_STARTING:
195                if (cbs->message_starting)
196                        thread->user_data = (*cbs->message_starting)(trace,
197                                        thread, trace->global_blob);
198                return;
199        case MESSAGE_FIRST_PACKET:
200                if (cbs->message_first_packet)
201                                (*cbs->message_first_packet)(trace, thread,
202                                trace->global_blob, thread->user_data,
203                                sender);
204                return;
205        case MESSAGE_TICK_COUNT:
206                if (cbs->message_tick_count)
207                        (*cbs->message_tick_count)(trace, thread,
208                                        trace->global_blob, thread->user_data,
209                                        data.uint64);
210                return;
211        case MESSAGE_TICK_INTERVAL:
212                if (cbs->message_tick_interval)
213                        (*cbs->message_tick_interval)(trace, thread,
214                                        trace->global_blob, thread->user_data,
215                                        data.uint64);
216                return;
217        case MESSAGE_STOPPING:
218                fn = cbs->message_stopping;
219                break;
220        case MESSAGE_RESUMING:
221                fn = cbs->message_resuming;
222                break;
223        case MESSAGE_PAUSING:
224                fn = cbs->message_pausing;
225                break;
226        case MESSAGE_USER:
227                if (cbs->message_user)
228                        (*cbs->message_user)(trace, thread, trace->global_blob,
229                                        thread->user_data, type, data, sender);
230                return;
231        case MESSAGE_RESULT:
232                if (cbs->message_result)
233                        (*cbs->message_result)(trace, thread,
234                                        trace->global_blob, thread->user_data,
235                                        data.res);
236                return;
237
238        /* These should be unused */
239        case MESSAGE_DO_PAUSE:
240        case MESSAGE_DO_STOP:
241        case MESSAGE_POST_REPORTER:
242        case MESSAGE_PACKET:
243                return;
244        }
245
246        if (fn)
247                (*fn)(trace, thread, trace->global_blob, thread->user_data);
248}
249
250DLLEXPORT void trace_destroy_callback_set(libtrace_callback_set_t *cbset) {
251        free(cbset);
252}
253
254DLLEXPORT libtrace_callback_set_t *trace_create_callback_set() {
255        libtrace_callback_set_t *cbset;
256
257        cbset = (libtrace_callback_set_t *)malloc(sizeof(libtrace_callback_set_t));
258        memset(cbset, 0, sizeof(libtrace_callback_set_t));
259        return cbset;
260}
261
262/*
263 * This can be used once the hasher thread has been started and internally after
264 * verify_configuration.
265 */
266DLLEXPORT bool trace_has_dedicated_hasher(libtrace_t * libtrace)
267{
268        return libtrace->hasher_thread.type == THREAD_HASHER;
269}
270
271DLLEXPORT bool trace_has_reporter(libtrace_t * libtrace)
272{
273        assert(libtrace->state != STATE_NEW);
274        return libtrace->reporter_thread.type == THREAD_REPORTER && libtrace->reporter_cbs;
275}
276
277/**
278 * When running the number of perpkt threads in use.
279 * TODO what if the trace is not running yet, or has finished??
280 *
281 * @brief libtrace_perpkt_thread_nb
282 * @param t The trace
283 * @return
284 */
285DLLEXPORT int trace_get_perpkt_threads(libtrace_t * t) {
286        return t->perpkt_thread_count;
287}
288
289/**
290 * Changes the overall traces state and signals the condition.
291 *
292 * @param trace A pointer to the trace
293 * @param new_state The new state of the trace
294 * @param need_lock Set to true if libtrace_lock is not held, otherwise
295 *        false in the case the lock is currently held by this thread.
296 */
297static inline void libtrace_change_state(libtrace_t *trace,
298        const enum trace_state new_state, const bool need_lock)
299{
300        UNUSED enum trace_state prev_state;
301        if (need_lock)
302                pthread_mutex_lock(&trace->libtrace_lock);
303        prev_state = trace->state;
304        trace->state = new_state;
305
306        if (trace->config.debug_state)
307                fprintf(stderr, "Trace(%s) state changed from %s to %s\n",
308                        trace->uridata, get_trace_state_name(prev_state),
309                        get_trace_state_name(trace->state));
310
311        pthread_cond_broadcast(&trace->perpkt_cond);
312        if (need_lock)
313                pthread_mutex_unlock(&trace->libtrace_lock);
314}
315
316/**
317 * Changes a thread's state and broadcasts the condition variable. This
318 * should always be done when the lock is held.
319 *
320 * Additionally for perpkt threads the state counts are updated.
321 *
322 * @param trace A pointer to the trace
323 * @param t A pointer to the thread to modify
324 * @param new_state The new state of the thread
325 * @param need_lock Set to true if libtrace_lock is not held, otherwise
326 *        false in the case the lock is currently held by this thread.
327 */
328static inline void thread_change_state(libtrace_t *trace, libtrace_thread_t *t,
329        const enum thread_states new_state, const bool need_lock)
330{
331        enum thread_states prev_state;
332        if (need_lock)
333                pthread_mutex_lock(&trace->libtrace_lock);
334        prev_state = t->state;
335        t->state = new_state;
336        if (t->type == THREAD_PERPKT) {
337                --trace->perpkt_thread_states[prev_state];
338                ++trace->perpkt_thread_states[new_state];
339        }
340
341        if (trace->config.debug_state)
342                fprintf(stderr, "Thread %d state changed from %d to %d\n",
343                        (int) t->tid, prev_state, t->state);
344
345        if (trace->perpkt_thread_states[THREAD_FINISHED] == trace->perpkt_thread_count) {
346                /* Make sure we save our final stats in case someone wants
347                 * them at the end of their program.
348                 */
349
350                trace_get_statistics(trace, NULL);
351                libtrace_change_state(trace, STATE_FINISHED, false);
352        }
353
354        pthread_cond_broadcast(&trace->perpkt_cond);
355        if (need_lock)
356                pthread_mutex_unlock(&trace->libtrace_lock);
357}
358
359/**
360 * This is valid once a trace is initialised
361 *
362 * @return True if the format supports parallel threads.
363 */
364static inline bool trace_supports_parallel(libtrace_t *trace)
365{
366        assert(trace);
367        assert(trace->format);
368        if (trace->format->pstart_input)
369                return true;
370        else
371                return false;
372}
373
374void libtrace_zero_thread(libtrace_thread_t * t) {
375        t->accepted_packets = 0;
376        t->filtered_packets = 0;
377        t->recorded_first = false;
378        t->tracetime_offset_usec = 0;
379        t->user_data = 0;
380        t->format_data = 0;
381        libtrace_zero_ringbuffer(&t->rbuffer);
382        t->trace = NULL;
383        t->ret = NULL;
384        t->type = THREAD_EMPTY;
385        t->perpkt_num = -1;
386}
387
388// Ints are aligned int is atomic so safe to read and write at same time
389// However write must be locked, read doesn't (We never try read before written to table)
390libtrace_thread_t * get_thread_table(libtrace_t *libtrace) {
391        int i = 0;
392        pthread_t tid = pthread_self();
393
394        for (;i<libtrace->perpkt_thread_count ;++i) {
395                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
396                        return &libtrace->perpkt_threads[i];
397        }
398        return NULL;
399}
400
401static libtrace_thread_t * get_thread_descriptor(libtrace_t *libtrace) {
402        libtrace_thread_t *ret;
403        if (!(ret = get_thread_table(libtrace))) {
404                pthread_t tid = pthread_self();
405                // Check if we are reporter or something else
406                if (libtrace->hasher_thread.type == THREAD_REPORTER &&
407                                pthread_equal(tid, libtrace->reporter_thread.tid))
408                        ret = &libtrace->reporter_thread;
409                else if (libtrace->hasher_thread.type == THREAD_HASHER &&
410                         pthread_equal(tid, libtrace->hasher_thread.tid))
411                        ret = &libtrace->hasher_thread;
412                else
413                        ret = NULL;
414        }
415        return ret;
416}
417
418DLLEXPORT void libtrace_make_packet_safe(libtrace_packet_t *pkt) {
419        // Duplicate the packet in standard malloc'd memory and free the
420        // original, This is a 1:1 exchange so the ocache count remains unchanged.
421        if (pkt->buf_control != TRACE_CTRL_PACKET) {
422                libtrace_packet_t *dup;
423                dup = trace_copy_packet(pkt);
424                /* Release the external buffer */
425                trace_fin_packet(pkt);
426                /* Copy the duplicated packet over the existing */
427                memcpy(pkt, dup, sizeof(libtrace_packet_t));
428                /* Free the packet structure */
429                free(dup);
430        }
431}
432
433/**
434 * Makes a libtrace_result_t safe, used when pausing a trace.
435 * This will call libtrace_make_packet_safe if the result is
436 * a packet.
437 */
438DLLEXPORT void libtrace_make_result_safe(libtrace_result_t *res) {
439        if (res->type == RESULT_PACKET) {
440                libtrace_make_packet_safe(res->value.pkt);
441        }
442}
443
444/**
445 * Holds threads in a paused state, until released by broadcasting
446 * the condition mutex.
447 */
448static void trace_thread_pause(libtrace_t *trace, libtrace_thread_t *t) {
449        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
450        thread_change_state(trace, t, THREAD_PAUSED, false);
451        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
452                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
453        }
454        thread_change_state(trace, t, THREAD_RUNNING, false);
455        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
456}
457
458/**
459 * Sends a packet to the user, expects either a valid packet or a TICK packet.
460 *
461 * @param trace The trace
462 * @param t The current thread
463 * @param packet A pointer to the packet storage, which may be set to null upon
464 *               return, or a packet to be finished.
465 * @param tracetime If true packets are delayed to match with tracetime
466 * @return 0 is successful, otherwise if playing back in tracetime
467 *         READ_MESSAGE(-2) can be returned in which case the packet is not sent.
468 *
469 * @note READ_MESSAGE will only be returned if tracetime is true.
470 */
471static inline int dispatch_packet(libtrace_t *trace,
472                                  libtrace_thread_t *t,
473                                  libtrace_packet_t **packet,
474                                  bool tracetime) {
475
476        if ((*packet)->error > 0) {
477                if (tracetime) {
478                        if (delay_tracetime(trace, packet[0], t) == READ_MESSAGE)
479                                return READ_MESSAGE;
480                }
481                if (!IS_LIBTRACE_META_PACKET((*packet))) {
482                        t->accepted_packets++;
483                }
484                if (trace->perpkt_cbs->message_packet)
485                        *packet = (*trace->perpkt_cbs->message_packet)(trace, t, trace->global_blob, t->user_data, *packet);
486                trace_fin_packet(*packet);
487        } else {
488                assert((*packet)->error == READ_TICK);
489                libtrace_generic_t data = {.uint64 = trace_packet_get_order(*packet)};
490                send_message(trace, t, MESSAGE_TICK_COUNT, data, t);
491        }
492        return 0;
493}
494
495/**
496 * Sends a batch of packets to the user, expects either a valid packet or a
497 * TICK packet.
498 *
499 * @param trace The trace
500 * @param t The current thread
501 * @param packets [in,out] An array of packets, these may be null upon return
502 * @param nb_packets The total number of packets in the list
503 * @param empty [in,out] A pointer to an integer storing the first empty slot,
504 * upon return this is updated
505 * @param offset [in,out] The offset into the array, upon return this is updated
506 * @param tracetime If true packets are delayed to match with tracetime
507 * @return 0 is successful, otherwise if playing back in tracetime
508 *         READ_MESSAGE(-2) can be returned in which case the packet is not sent.
509 *
510 * @note READ_MESSAGE will only be returned if tracetime is true.
511 */
512static inline int dispatch_packets(libtrace_t *trace,
513                                  libtrace_thread_t *t,
514                                  libtrace_packet_t *packets[],
515                                  int nb_packets, int *empty, int *offset,
516                                  bool tracetime) {
517        for (;*offset < nb_packets; ++*offset) {
518                int ret;
519                ret = dispatch_packet(trace, t, &packets[*offset], tracetime);
520                if (ret == 0) {
521                        /* Move full slots to front as we go */
522                        if (packets[*offset]) {
523                                if (*empty != *offset) {
524                                        packets[*empty] = packets[*offset];
525                                        packets[*offset] = NULL;
526                                }
527                                ++*empty;
528                        }
529                } else {
530                        /* Break early */
531                        assert(ret == READ_MESSAGE);
532                        return READ_MESSAGE;
533                }
534        }
535
536        return 0;
537}
538
539/**
540 * Pauses a per packet thread, messages will not be processed when the thread
541 * is paused.
542 *
543 * This process involves reading packets if a hasher thread is used. As such
544 * this function can fail to pause due to errors when reading in which case
545 * the thread should be stopped instead.
546 *
547 *
548 * @brief trace_perpkt_thread_pause
549 * @return READ_ERROR(-1) or READ_EOF(0) or 1 if successfull
550 */
551static int trace_perpkt_thread_pause(libtrace_t *trace, libtrace_thread_t *t,
552                                     libtrace_packet_t *packets[],
553                                     int nb_packets, int *empty, int *offset) {
554        libtrace_packet_t * packet = NULL;
555
556        /* Let the user thread know we are going to pause */
557        send_message(trace, t, MESSAGE_PAUSING, gen_zero, t);
558
559        /* Send through any remaining packets (or messages) without delay */
560
561        /* First send those packets already read, as fast as possible
562         * This should never fail or check for messages etc. */
563        ASSERT_RET(dispatch_packets(trace, t, packets, nb_packets, empty,
564                                    offset, false), == 0);
565
566        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
567        /* If a hasher thread is running, empty input queues so we don't lose data */
568        if (trace_has_dedicated_hasher(trace)) {
569                // The hasher has stopped by this point, so the queue shouldn't be filling
570                while(!libtrace_ringbuffer_is_empty(&t->rbuffer) || t->format_data) {
571                        int ret = trace->pread(trace, t, &packet, 1);
572                        if (ret == 1) {
573                                if (packet->error > 0) {
574                                        store_first_packet(trace, packet, t);
575                                }
576                                ASSERT_RET(dispatch_packet(trace, t, &packet, false), == 0);
577                                if (packet == NULL)
578                                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
579                        } else if (ret != READ_MESSAGE) {
580                                /* Ignore messages we pick these up next loop */
581                                assert (ret == READ_EOF || ret == READ_ERROR);
582                                /* Verify no packets are remaining */
583                                /* TODO refactor this sanity check out!! */
584                                while (!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
585                                        ASSERT_RET(trace->pread(trace, t, &packet, 1), <= 0);
586                                        // No packets after this should have any data in them
587                                        assert(packet->error <= 0);
588                                }
589                                libtrace_ocache_free(&trace->packet_freelist, (void **) &packet, 1, 1);
590                                return -1;
591                        }
592                }
593        }
594        libtrace_ocache_free(&trace->packet_freelist, (void **) &packet, 1, 1);
595
596        /* Now we do the actual pause, this returns when we resumed */
597        trace_thread_pause(trace, t);
598        send_message(trace, t, MESSAGE_RESUMING, gen_zero, t);
599        return 1;
600}
601
602/**
603 * The is the entry point for our packet processing threads.
604 */
605static void* perpkt_threads_entry(void *data) {
606        libtrace_t *trace = (libtrace_t *)data;
607        libtrace_thread_t *t;
608        libtrace_message_t message = {0, {.uint64=0}, NULL};
609        libtrace_packet_t *packets[trace->config.burst_size];
610        size_t i;
611        //int ret;
612        /* The current reading position into the packets */
613        int offset = 0;
614        /* The number of packets last read */
615        int nb_packets = 0;
616        /* The offset to the first NULL packet upto offset */
617        int empty = 0;
618        int j;
619
620        /* Wait until trace_pstart has been completed */
621        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
622        t = get_thread_table(trace);
623        assert(t);
624        if (trace->state == STATE_ERROR) {
625                thread_change_state(trace, t, THREAD_FINISHED, false);
626                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
627                pthread_exit(NULL);
628        }
629        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
630
631        if (trace->format->pregister_thread) {
632                if (trace->format->pregister_thread(trace, t, 
633                                trace_is_parallel(trace)) < 0) {
634                        thread_change_state(trace, t, THREAD_FINISHED, false);
635                        pthread_exit(NULL);
636                }
637        }
638
639        /* Fill our buffer with empty packets */
640        memset(&packets, 0, sizeof(void*) * trace->config.burst_size);
641        libtrace_ocache_alloc(&trace->packet_freelist, (void **) packets,
642                              trace->config.burst_size,
643                              trace->config.burst_size);
644
645        /* ~~~~~~~~~~~ Setup complete now we loop ~~~~~~~~~~~~~~~ */
646
647        /* Let the per_packet function know we have started */
648        send_message(trace, t, MESSAGE_STARTING, gen_zero, t);
649        send_message(trace, t, MESSAGE_RESUMING, gen_zero, t);
650
651        for (;;) {
652
653                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
654                        int ret;
655                        switch (message.code) {
656                                case MESSAGE_DO_PAUSE: // This is internal
657                                        ret = trace_perpkt_thread_pause(trace, t,
658                                              packets, nb_packets, &empty, &offset);
659                                        if (ret == READ_EOF) {
660                                                goto eof;
661                                        } else if (ret == READ_ERROR) {
662                                                goto error;
663                                        }
664                                        assert(ret == 1);
665                                        continue;
666                                case MESSAGE_DO_STOP: // This is internal
667                                        goto eof;
668                        }
669                        send_message(trace, t, message.code, message.data, 
670                                        message.sender);
671                        /* Continue and the empty messages out before packets */
672                        continue;
673                }
674
675
676                /* Do we need to read a new set of packets MOST LIKELY we do */
677                if (offset == nb_packets) {
678                        /* Refill the packet buffer */
679                        if (empty != nb_packets) {
680                                // Refill the empty packets
681                                libtrace_ocache_alloc(&trace->packet_freelist,
682                                                      (void **) &packets[empty],
683                                                      nb_packets - empty,
684                                                      nb_packets - empty);
685                        }
686                        if (!trace->pread) {
687                                assert(packets[0]);
688                                ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
689                                nb_packets = trace_read_packet(trace, packets[0]);
690                                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
691                                packets[0]->error = nb_packets;
692                                if (nb_packets > 0)
693                                        nb_packets = 1;
694                        } else {
695                                nb_packets = trace->pread(trace, t, packets, trace->config.burst_size);
696                        }
697                        offset = 0;
698                        empty = 0;
699                }
700
701                /* Handle error/message cases */
702                if (nb_packets > 0) {
703                        /* Store the first non-meta packet */
704                        for (j = 0; j < nb_packets; j++) {
705                                if (t->recorded_first)
706                                        break;
707                                if (packets[j]->error > 0) {
708                                        store_first_packet(trace, packets[j], t);
709                                }
710                        }
711                        dispatch_packets(trace, t, packets, nb_packets, &empty,
712                                         &offset, trace->tracetime);
713                } else {
714                        switch (nb_packets) {
715                        case READ_EOF:
716                                goto eof;
717                        case READ_ERROR:
718                                goto error;
719                        case READ_MESSAGE:
720                                nb_packets = 0;
721                                continue;
722                        default:
723                                fprintf(stderr, "Unexpected error %d!!\n", nb_packets);
724                                goto error;
725                        }
726                }
727
728        }
729
730error:
731        message.code = MESSAGE_DO_STOP;
732        message.sender = t;
733        message.data.uint64 = 0;
734        trace_message_perpkts(trace, &message);
735eof:
736        /* ~~~~~~~~~~~~~~ Trace is finished do tear down ~~~~~~~~~~~~~~~~~~~~~ */
737
738        // Let the per_packet function know we have stopped
739        send_message(trace, t, MESSAGE_PAUSING, gen_zero, t);
740        send_message(trace, t, MESSAGE_STOPPING, gen_zero, t);
741
742        // Free any remaining packets
743        for (i = 0; i < trace->config.burst_size; i++) {
744                if (packets[i]) {
745                        libtrace_ocache_free(&trace->packet_freelist, (void **) &packets[i], 1, 1);
746                        packets[i] = NULL;
747                }
748        }
749
750        thread_change_state(trace, t, THREAD_FINISHED, true);
751
752        /* Make sure the reporter sees we have finished */
753        if (trace_has_reporter(trace))
754                trace_post_reporter(trace);
755
756        // Release all ocache memory before unregistering with the format
757        // because this might(it does in DPDK) unlink the formats mempool
758        // causing destroy/finish packet to fail.
759        libtrace_ocache_unregister_thread(&trace->packet_freelist);
760        if (trace->format->punregister_thread) {
761                trace->format->punregister_thread(trace, t);
762        }
763        print_memory_stats();
764
765        pthread_exit(NULL);
766}
767
768/**
769 * The start point for our single threaded hasher thread, this will read
770 * and hash a packet from a data source and queue it against the correct
771 * core to process it.
772 */
773static void* hasher_entry(void *data) {
774        libtrace_t *trace = (libtrace_t *)data;
775        libtrace_thread_t * t;
776        int i;
777        libtrace_packet_t * packet;
778        libtrace_message_t message = {0, {.uint64=0}, NULL};
779        int pkt_skipped = 0;
780
781        assert(trace_has_dedicated_hasher(trace));
782        /* Wait until all threads are started and objects are initialised (ring buffers) */
783        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
784        t = &trace->hasher_thread;
785        assert(t->type == THREAD_HASHER && pthread_equal(pthread_self(), t->tid));
786        if (trace->state == STATE_ERROR) {
787                thread_change_state(trace, t, THREAD_FINISHED, false);
788                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
789                pthread_exit(NULL);
790        }
791        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
792
793        /* We are reading but it is not the parallel API */
794        if (trace->format->pregister_thread) {
795                trace->format->pregister_thread(trace, t, true);
796        }
797
798        /* Read all packets in then hash and queue against the correct thread */
799        while (1) {
800                int thread;
801                if (!pkt_skipped)
802                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
803                assert(packet);
804
805                // Check for messages that we expect MESSAGE_DO_PAUSE, (internal messages only)
806                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
807                        switch(message.code) {
808                                case MESSAGE_DO_PAUSE:
809                                        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
810                                        thread_change_state(trace, t, THREAD_PAUSED, false);
811                                        pthread_cond_broadcast(&trace->perpkt_cond);
812                                        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
813                                                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
814                                        }
815                                        thread_change_state(trace, t, THREAD_RUNNING, false);
816                                        pthread_cond_broadcast(&trace->perpkt_cond);
817                                        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
818                                        break;
819                                case MESSAGE_DO_STOP:
820                                        /* Either FINISHED or FINISHING */
821                                        assert(trace->started == false);
822                                        /* Mark the current packet as EOF */
823                                        packet->error = 0;
824                                        goto hasher_eof;
825                                default:
826                                        fprintf(stderr, "Hasher thread didn't expect message code=%d\n", message.code);
827                        }
828                        pkt_skipped = 1;
829                        continue;
830                }
831
832                if ((packet->error = trace_read_packet(trace, packet)) <1) {
833                        if (packet->error == READ_MESSAGE) {
834                                pkt_skipped = 1;
835                                continue;
836                        } else {
837                                break; /* We are EOF or error'd either way we stop  */
838                        }
839                }
840
841                /* We are guaranteed to have a hash function i.e. != NULL */
842                trace_packet_set_hash(packet, (*trace->hasher)(packet, trace->hasher_data));
843                thread = trace_packet_get_hash(packet) % trace->perpkt_thread_count;
844                /* Blocking write to the correct queue - I'm the only writer */
845                if (trace->perpkt_threads[thread].state != THREAD_FINISHED) {
846                        uint64_t order = trace_packet_get_order(packet);
847                        libtrace_ringbuffer_write(&trace->perpkt_threads[thread].rbuffer, packet);
848                        if (trace->config.tick_count && order % trace->config.tick_count == 0) {
849                                // Write ticks to everyone else
850                                libtrace_packet_t * pkts[trace->perpkt_thread_count];
851                                memset(pkts, 0, sizeof(void *) * trace->perpkt_thread_count);
852                                libtrace_ocache_alloc(&trace->packet_freelist, (void **) pkts, trace->perpkt_thread_count, trace->perpkt_thread_count);
853                                for (i = 0; i < trace->perpkt_thread_count; i++) {
854                                        pkts[i]->error = READ_TICK;
855                                        trace_packet_set_order(pkts[i], order);
856                                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, pkts[i]);
857                                }
858                        }
859                        pkt_skipped = 0;
860                } else {
861                        assert(!"Dropping a packet!!");
862                        pkt_skipped = 1; // Reuse that packet no one read it
863                }
864        }
865hasher_eof:
866        /* Broadcast our last failed read to all threads */
867        for (i = 0; i < trace->perpkt_thread_count; i++) {
868                libtrace_packet_t * bcast;
869                if (i == trace->perpkt_thread_count - 1) {
870                        bcast = packet;
871                } else {
872                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &bcast, 1, 1);
873                        bcast->error = packet->error;
874                }
875                ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
876                if (trace->perpkt_threads[i].state != THREAD_FINISHED) {
877                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, bcast);
878                } else {
879                        libtrace_ocache_free(&trace->packet_freelist, (void **) &bcast, 1, 1);
880                }
881                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
882        }
883
884        // We don't need to free the packet
885        thread_change_state(trace, t, THREAD_FINISHED, true);
886
887        libtrace_ocache_unregister_thread(&trace->packet_freelist);
888        if (trace->format->punregister_thread) {
889                trace->format->punregister_thread(trace, t);
890        }
891        print_memory_stats();
892
893        // TODO remove from TTABLE t sometime
894        pthread_exit(NULL);
895}
896
897/* Our simplest case when a thread becomes ready it can obtain an exclusive
898 * lock to read packets from the underlying trace.
899 */
900static int trace_pread_packet_first_in_first_served(libtrace_t *libtrace,
901                                                    libtrace_thread_t *t,
902                                                    libtrace_packet_t *packets[],
903                                                    size_t nb_packets) {
904        size_t i = 0;
905        //bool tick_hit = false;
906
907        ASSERT_RET(pthread_mutex_lock(&libtrace->read_packet_lock), == 0);
908        /* Read nb_packets */
909        for (i = 0; i < nb_packets; ++i) {
910                if (libtrace_message_queue_count(&t->messages) > 0) {
911                        if ( i==0 ) {
912                                ASSERT_RET(pthread_mutex_unlock(&libtrace->read_packet_lock), == 0);
913                                return READ_MESSAGE;
914                        } else {
915                                break;
916                        }
917                }
918                packets[i]->error = trace_read_packet(libtrace, packets[i]);
919
920                if (packets[i]->error <= 0) {
921                        /* We'll catch this next time if we have already got packets */
922                        if ( i==0 ) {
923                                ASSERT_RET(pthread_mutex_unlock(&libtrace->read_packet_lock), == 0);
924                                return packets[i]->error;
925                        } else {
926                                break;
927                        }
928                }
929                /*
930                if (libtrace->config.tick_count && trace_packet_get_order(packets[i]) % libtrace->config.tick_count == 0) {
931                        tick_hit = true;
932                }*/
933
934                // Doing this inside the lock ensures the first packet is
935                // always recorded first
936                if (!t->recorded_first && packets[0]->error > 0) {
937                        store_first_packet(libtrace, packets[0], t);
938                }
939        }
940        ASSERT_RET(pthread_mutex_unlock(&libtrace->read_packet_lock), == 0);
941        /* XXX TODO this needs to be inband with packets, or we don't bother in this case
942        if (tick_hit) {
943                libtrace_message_t tick;
944                tick.additional.uint64 = trace_packet_get_order(packets[i]);
945                tick.code = MESSAGE_TICK;
946                trace_send_message_to_perpkts(libtrace, &tick);
947        } */
948        return i;
949}
950
951/**
952 * For the case that we have a dedicated hasher thread
953 * 1. We read a packet from our buffer
954 * 2. Move that into the packet provided (packet)
955 */
956inline static int trace_pread_packet_hasher_thread(libtrace_t *libtrace,
957                                                   libtrace_thread_t *t,
958                                                   libtrace_packet_t *packets[],
959                                                   size_t nb_packets) {
960        size_t i;
961
962        /* We store the last error message here */
963        if (t->format_data) {
964                return ((libtrace_packet_t *)t->format_data)->error;
965        }
966
967        // Always grab at least one
968        if (packets[0]) // Recycle the old get the new
969                libtrace_ocache_free(&libtrace->packet_freelist, (void **) packets, 1, 1);
970        packets[0] = libtrace_ringbuffer_read(&t->rbuffer);
971
972        if (packets[0]->error <= 0 && packets[0]->error != READ_TICK) {
973                return packets[0]->error;
974        }
975
976        for (i = 1; i < nb_packets; i++) {
977                if (packets[i]) // Recycle the old get the new
978                        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packets[i], 1, 1);
979                if (!libtrace_ringbuffer_try_read(&t->rbuffer, (void **) &packets[i])) {
980                        packets[i] = NULL;
981                        break;
982                }
983
984                /* We will return an error or EOF the next time around */
985                if (packets[i]->error <= 0 && packets[0]->error != READ_TICK) {
986                        /* The message case will be checked automatically -
987                           However other cases like EOF and error will only be
988                           sent once*/
989                        if (packets[i]->error != READ_MESSAGE) {
990                                assert(t->format_data == NULL);
991                                t->format_data = packets[i];
992                        }
993                        break;
994                }
995        }
996
997        return i;
998}
999
1000/**
1001 * For the first packet of each queue we keep a copy and note the system
1002 * time it was received at.
1003 *
1004 * This is used for finding the first packet when playing back a trace
1005 * in trace time. And can be used by real time applications to print
1006 * results out every XXX seconds.
1007 */
1008void store_first_packet(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t)
1009{
1010
1011        libtrace_message_t mesg = {0, {.uint64=0}, NULL};
1012        struct timeval tv;
1013        libtrace_packet_t * dup;
1014
1015        if (t->recorded_first) {
1016                return;
1017        }
1018
1019        if (IS_LIBTRACE_META_PACKET(packet)) {
1020                return;
1021        }
1022
1023        /* We mark system time against a copy of the packet */
1024        gettimeofday(&tv, NULL);
1025        dup = trace_copy_packet(packet);
1026
1027        ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
1028        libtrace->first_packets.packets[t->perpkt_num].packet = dup;
1029        memcpy(&libtrace->first_packets.packets[t->perpkt_num].tv, &tv, sizeof(tv));
1030        libtrace->first_packets.count++;
1031
1032        /* Now update the first */
1033        if (libtrace->first_packets.count == 1) {
1034                /* We the first entry hence also the first known packet */
1035                libtrace->first_packets.first = t->perpkt_num;
1036        } else {
1037                /* Check if we are newer than the previous 'first' packet */
1038                size_t first = libtrace->first_packets.first;
1039                struct timeval cur_ts = trace_get_timeval(dup);
1040                struct timeval first_ts = trace_get_timeval(libtrace->first_packets.packets[first].packet);
1041                if (timercmp(&cur_ts, &first_ts, <))
1042                        libtrace->first_packets.first = t->perpkt_num;
1043        }
1044        ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
1045
1046        mesg.code = MESSAGE_FIRST_PACKET;
1047        trace_message_reporter(libtrace, &mesg);
1048        trace_message_perpkts(libtrace, &mesg);
1049        t->recorded_first = true;
1050}
1051
1052DLLEXPORT int trace_get_first_packet(libtrace_t *libtrace,
1053                                     libtrace_thread_t *t,
1054                                     const libtrace_packet_t **packet,
1055                                     const struct timeval **tv)
1056{
1057        void * tmp;
1058        int ret = 0;
1059
1060        if (t) {
1061                if (t->type != THREAD_PERPKT || t->trace != libtrace)
1062                        return -1;
1063        }
1064
1065        /* Throw away these which we don't use */
1066        if (!packet)
1067                packet = (const libtrace_packet_t **) &tmp;
1068        if (!tv)
1069                tv = (const struct timeval **) &tmp;
1070
1071        ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
1072        if (t) {
1073                /* Get the requested thread */
1074                *packet = libtrace->first_packets.packets[t->perpkt_num].packet;
1075                *tv = &libtrace->first_packets.packets[t->perpkt_num].tv;
1076        } else if (libtrace->first_packets.count) {
1077                /* Get the first packet across all threads */
1078                *packet = libtrace->first_packets.packets[libtrace->first_packets.first].packet;
1079                *tv = &libtrace->first_packets.packets[libtrace->first_packets.first].tv;
1080                if (libtrace->first_packets.count == (size_t) libtrace->perpkt_thread_count) {
1081                        ret = 1;
1082                } else {
1083                        struct timeval curr_tv;
1084                        // If a second has passed since the first entry we will assume this is the very first packet
1085                        gettimeofday(&curr_tv, NULL);
1086                        if (curr_tv.tv_sec > (*tv)->tv_sec) {
1087                                if(curr_tv.tv_usec > (*tv)->tv_usec || curr_tv.tv_sec - (*tv)->tv_sec > 1) {
1088                                        ret = 1;
1089                                }
1090                        }
1091                }
1092        } else {
1093                *packet = NULL;
1094                *tv = NULL;
1095        }
1096        ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
1097        return ret;
1098}
1099
1100
1101DLLEXPORT uint64_t tv_to_usec(const struct timeval *tv)
1102{
1103        return (uint64_t) tv->tv_sec*1000000ull + (uint64_t) tv->tv_usec;
1104}
1105
1106inline static struct timeval usec_to_tv(uint64_t usec)
1107{
1108        struct timeval tv;
1109        tv.tv_sec = usec / 1000000;
1110        tv.tv_usec = usec % 1000000;
1111        return tv;
1112}
1113
1114/** Similar to delay_tracetime but send messages to all threads periodically */
1115static void* reporter_entry(void *data) {
1116        libtrace_message_t message = {0, {.uint64=0}, NULL};
1117        libtrace_t *trace = (libtrace_t *)data;
1118        libtrace_thread_t *t = &trace->reporter_thread;
1119
1120        /* Wait until all threads are started */
1121        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
1122        if (trace->state == STATE_ERROR) {
1123                thread_change_state(trace, t, THREAD_FINISHED, false);
1124                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1125                pthread_exit(NULL);
1126        }
1127        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1128
1129        if (trace->format->pregister_thread) {
1130                trace->format->pregister_thread(trace, t, false);
1131        }
1132
1133        send_message(trace, t, MESSAGE_STARTING, (libtrace_generic_t){0}, t);
1134        send_message(trace, t, MESSAGE_RESUMING, (libtrace_generic_t){0}, t);
1135
1136        while (!trace_has_finished(trace)) {
1137                if (trace->config.reporter_polling) {
1138                        if (libtrace_message_queue_try_get(&t->messages, &message) == LIBTRACE_MQ_FAILED)
1139                                message.code = MESSAGE_POST_REPORTER;
1140                } else {
1141                        libtrace_message_queue_get(&t->messages, &message);
1142                }
1143                switch (message.code) {
1144                        // Check for results
1145                        case MESSAGE_POST_REPORTER:
1146                                trace->combiner.read(trace, &trace->combiner);
1147                                break;
1148                        case MESSAGE_DO_PAUSE:
1149                                assert(trace->combiner.pause);
1150                                trace->combiner.pause(trace, &trace->combiner);
1151                                send_message(trace, t, MESSAGE_PAUSING,
1152                                                (libtrace_generic_t) {0}, t);
1153                                trace_thread_pause(trace, t);
1154                                send_message(trace, t, MESSAGE_RESUMING,
1155                                                (libtrace_generic_t) {0}, t);
1156                                break;
1157                default:
1158                        send_message(trace, t, message.code, message.data,
1159                                        message.sender);
1160                }
1161        }
1162
1163        // Flush out whats left now all our threads have finished
1164        trace->combiner.read_final(trace, &trace->combiner);
1165
1166        // GOODBYE
1167        send_message(trace, t, MESSAGE_PAUSING,(libtrace_generic_t) {0}, t);
1168        send_message(trace, t, MESSAGE_STOPPING,(libtrace_generic_t) {0}, t);
1169
1170        thread_change_state(trace, &trace->reporter_thread, THREAD_FINISHED, true);
1171        print_memory_stats();
1172        return NULL;
1173}
1174
1175/** Similar to delay_tracetime but send messages to all threads periodically */
1176static void* keepalive_entry(void *data) {
1177        struct timeval prev, next;
1178        libtrace_message_t message = {0, {.uint64=0}, NULL};
1179        libtrace_t *trace = (libtrace_t *)data;
1180        uint64_t next_release;
1181        libtrace_thread_t *t = &trace->keepalive_thread;
1182
1183        /* Wait until all threads are started */
1184        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
1185        if (trace->state == STATE_ERROR) {
1186                thread_change_state(trace, t, THREAD_FINISHED, false);
1187                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1188                pthread_exit(NULL);
1189        }
1190        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1191
1192        gettimeofday(&prev, NULL);
1193        message.code = MESSAGE_TICK_INTERVAL;
1194
1195        while (trace->state != STATE_FINISHED) {
1196                fd_set rfds;
1197                next_release = tv_to_usec(&prev) + (trace->config.tick_interval * 1000);
1198                gettimeofday(&next, NULL);
1199                if (next_release > tv_to_usec(&next)) {
1200                        next = usec_to_tv(next_release - tv_to_usec(&next));
1201                        // Wait for timeout or a message
1202                        FD_ZERO(&rfds);
1203                        FD_SET(libtrace_message_queue_get_fd(&t->messages), &rfds);
1204                        if (select(libtrace_message_queue_get_fd(&t->messages)+1, &rfds, NULL, NULL, &next) == 1) {
1205                                libtrace_message_t msg;
1206                                libtrace_message_queue_get(&t->messages, &msg);
1207                                assert(msg.code == MESSAGE_DO_STOP);
1208                                goto done;
1209                        }
1210                }
1211                prev = usec_to_tv(next_release);
1212                if (trace->state == STATE_RUNNING) {
1213                        message.data.uint64 = ((((uint64_t)prev.tv_sec) << 32) +
1214                                               (((uint64_t)prev.tv_usec << 32)/1000000));
1215                        trace_message_perpkts(trace, &message);
1216                }
1217        }
1218done:
1219
1220        thread_change_state(trace, t, THREAD_FINISHED, true);
1221        return NULL;
1222}
1223
1224/**
1225 * Delays a packets playback so the playback will be in trace time.
1226 * This may break early if a message becomes available.
1227 *
1228 * Requires the first packet for this thread to be received.
1229 * @param libtrace  The trace
1230 * @param packet    The packet to delay
1231 * @param t         The current thread
1232 * @return Either READ_MESSAGE(-2) or 0 is successful
1233 */
1234static inline int delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t) {
1235        struct timeval curr_tv, pkt_tv;
1236        uint64_t next_release = t->tracetime_offset_usec;
1237        uint64_t curr_usec;
1238
1239        if (!t->tracetime_offset_usec) {
1240                const libtrace_packet_t *first_pkt;
1241                const struct timeval *sys_tv;
1242                int64_t initial_offset;
1243                int stable = trace_get_first_packet(libtrace, NULL, &first_pkt, &sys_tv);
1244                if (!first_pkt)
1245                        return 0;
1246                pkt_tv = trace_get_timeval(first_pkt);
1247                initial_offset = (int64_t)tv_to_usec(sys_tv) - (int64_t)tv_to_usec(&pkt_tv);
1248                /* In the unlikely case offset is 0, change it to 1 */
1249                if (stable)
1250                        t->tracetime_offset_usec = initial_offset ? initial_offset: 1;
1251                next_release = initial_offset;
1252        }
1253        /* next_release == offset */
1254        pkt_tv = trace_get_timeval(packet);
1255        next_release += tv_to_usec(&pkt_tv);
1256        gettimeofday(&curr_tv, NULL);
1257        curr_usec = tv_to_usec(&curr_tv);
1258        if (next_release > curr_usec) {
1259                int ret, mesg_fd = libtrace_message_queue_get_fd(&t->messages);
1260                struct timeval delay_tv = usec_to_tv(next_release-curr_usec);
1261                fd_set rfds;
1262                FD_ZERO(&rfds);
1263                FD_SET(mesg_fd, &rfds);
1264                // We need to wait
1265                ret = select(mesg_fd+1, &rfds, NULL, NULL, &delay_tv);
1266                if (ret == 0) {
1267                        return 0;
1268                } else if (ret > 0) {
1269                        return READ_MESSAGE;
1270                } else {
1271                        assert(!"trace_delay_packet: Unexpected return from select");
1272                }
1273        }
1274        return 0;
1275}
1276
1277/* Discards packets that don't match the filter.
1278 * Discarded packets are emptied and then moved to the end of the packet list.
1279 *
1280 * @param trace       The trace format, containing the filter
1281 * @param packets     An array of packets
1282 * @param nb_packets  The number of valid items in packets
1283 *
1284 * @return The number of packets that passed the filter, which are moved to
1285 *          the start of the packets array
1286 */
1287static inline size_t filter_packets(libtrace_t *trace,
1288                                    libtrace_packet_t **packets,
1289                                    size_t nb_packets) {
1290        size_t offset = 0;
1291        size_t i;
1292
1293        for (i = 0; i < nb_packets; ++i) {
1294                // The filter needs the trace attached to receive the link type
1295                packets[i]->trace = trace;
1296                if (trace_apply_filter(trace->filter, packets[i])) {
1297                        libtrace_packet_t *tmp;
1298                        tmp = packets[offset];
1299                        packets[offset++] = packets[i];
1300                        packets[i] = tmp;
1301                } else {
1302                        trace_fin_packet(packets[i]);
1303                }
1304        }
1305
1306        return offset;
1307}
1308
1309/* Read a batch of packets from the trace into a buffer.
1310 * Note that this function will block until a packet is read (or EOF is reached)
1311 *
1312 * @param libtrace    The trace
1313 * @param t           The thread
1314 * @param packets     An array of packets
1315 * @param nb_packets  The number of empty packets in packets
1316 * @return The number of packets read, 0 on EOF (or an error/message -1,-2).
1317 */
1318static int trace_pread_packet_wrapper(libtrace_t *libtrace,
1319                                      libtrace_thread_t *t,
1320                                      libtrace_packet_t *packets[],
1321                                      size_t nb_packets) {
1322        int i;
1323        assert(nb_packets);
1324        assert(libtrace && "libtrace is NULL in trace_read_packet()");
1325        if (trace_is_err(libtrace))
1326                return -1;
1327        if (!libtrace->started) {
1328                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
1329                              "You must call libtrace_start() before trace_read_packet()\n");
1330                return -1;
1331        }
1332
1333        if (libtrace->format->pread_packets) {
1334                int ret;
1335                for (i = 0; i < (int) nb_packets; ++i) {
1336                        assert(i[packets]);
1337                        if (!(packets[i]->buf_control==TRACE_CTRL_PACKET ||
1338                              packets[i]->buf_control==TRACE_CTRL_EXTERNAL)) {
1339                                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,
1340                                              "Packet passed to trace_read_packet() is invalid\n");
1341                                return -1;
1342                        }
1343                }
1344                do {
1345                        ret=libtrace->format->pread_packets(libtrace, t,
1346                                                            packets,
1347                                                            nb_packets);
1348                        /* Error, EOF or message? */
1349                        if (ret <= 0) {
1350                                return ret;
1351                        }
1352
1353                        if (libtrace->filter) {
1354                                int remaining;
1355                                remaining = filter_packets(libtrace,
1356                                                           packets, ret);
1357                                t->filtered_packets += ret - remaining;
1358                                ret = remaining;
1359                        }
1360                        for (i = 0; i < ret; ++i) {
1361                                /* We do not mark the packet against the trace,
1362                                 * before hand or after. After breaks DAG meta
1363                                 * packets and before is inefficient */
1364                                //packets[i]->trace = libtrace;
1365                                /* TODO IN FORMAT?? Like traditional libtrace */
1366                                if (libtrace->snaplen>0)
1367                                        trace_set_capture_length(packets[i],
1368                                                        libtrace->snaplen);
1369                        }
1370                } while(ret == 0);
1371                return ret;
1372        }
1373        trace_set_err(libtrace, TRACE_ERR_UNSUPPORTED,
1374                      "This format does not support reading packets\n");
1375        return ~0U;
1376}
1377
1378/* Restarts a parallel trace, this is called from trace_pstart.
1379 * The libtrace lock is held upon calling this function.
1380 * Typically with a parallel trace the threads are not
1381 * killed rather.
1382 */
1383static int trace_prestart(libtrace_t * libtrace, void *global_blob,
1384                          libtrace_callback_set_t *per_packet_cbs, 
1385                          libtrace_callback_set_t *reporter_cbs) {
1386        int i, err = 0;
1387        if (libtrace->state != STATE_PAUSED) {
1388                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
1389                        "trace(%s) is not currently paused",
1390                              libtrace->uridata);
1391                return -1;
1392        }
1393
1394        assert(libtrace_parallel);
1395        assert(!libtrace->perpkt_thread_states[THREAD_RUNNING]);
1396
1397        /* Reset first packets */
1398        pthread_spin_lock(&libtrace->first_packets.lock);
1399        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1400                assert(!!libtrace->perpkt_threads[i].recorded_first == !!libtrace->first_packets.packets[i].packet);
1401                if (libtrace->first_packets.packets[i].packet) {
1402                        trace_destroy_packet(libtrace->first_packets.packets[i].packet);
1403                        libtrace->first_packets.packets[i].packet = NULL;
1404                        libtrace->first_packets.packets[i].tv.tv_sec = 0;
1405                        libtrace->first_packets.packets[i].tv.tv_usec = 0;
1406                        libtrace->first_packets.count--;
1407                        libtrace->perpkt_threads[i].recorded_first = false;
1408                }
1409        }
1410        assert(libtrace->first_packets.count == 0);
1411        libtrace->first_packets.first = 0;
1412        pthread_spin_unlock(&libtrace->first_packets.lock);
1413
1414        /* Reset delay */
1415        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1416                libtrace->perpkt_threads[i].tracetime_offset_usec = 0;
1417        }
1418
1419        /* Reset statistics */
1420        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1421                libtrace->perpkt_threads[i].accepted_packets = 0;
1422                libtrace->perpkt_threads[i].filtered_packets = 0;
1423        }
1424        libtrace->accepted_packets = 0;
1425        libtrace->filtered_packets = 0;
1426
1427        /* Update functions if requested */
1428        if(global_blob)
1429                libtrace->global_blob = global_blob;
1430
1431        if (per_packet_cbs) {
1432                if (libtrace->perpkt_cbs)
1433                        trace_destroy_callback_set(libtrace->perpkt_cbs);
1434                libtrace->perpkt_cbs = trace_create_callback_set();
1435                memcpy(libtrace->perpkt_cbs, per_packet_cbs, 
1436                                sizeof(libtrace_callback_set_t));
1437        }
1438
1439        if (reporter_cbs) {
1440                if (libtrace->reporter_cbs)
1441                        trace_destroy_callback_set(libtrace->reporter_cbs);
1442
1443                libtrace->reporter_cbs = trace_create_callback_set();
1444                memcpy(libtrace->reporter_cbs, reporter_cbs, 
1445                                sizeof(libtrace_callback_set_t));
1446        }
1447
1448        if (trace_is_parallel(libtrace)) {
1449                err = libtrace->format->pstart_input(libtrace);
1450        } else {
1451                if (libtrace->format->start_input) {
1452                        err = libtrace->format->start_input(libtrace);
1453                }
1454        }
1455
1456        if (err == 0) {
1457                libtrace->started = true;
1458                libtrace_change_state(libtrace, STATE_RUNNING, false);
1459        }
1460        return err;
1461}
1462
1463/**
1464 * @return the number of CPU cores on the machine. -1 if unknown.
1465 */
1466SIMPLE_FUNCTION static int get_nb_cores() {
1467        int numCPU;
1468#ifdef _SC_NPROCESSORS_ONLN
1469        /* Most systems do this now */
1470        numCPU = sysconf(_SC_NPROCESSORS_ONLN);
1471
1472#else
1473        int mib[] = {CTL_HW, HW_AVAILCPU};
1474        size_t len = sizeof(numCPU);
1475
1476        /* get the number of CPUs from the system */
1477        sysctl(mib, 2, &numCPU, &len, NULL, 0);
1478#endif
1479        return numCPU <= 0 ? 1 : numCPU;
1480}
1481
1482/**
1483 * Verifies the configuration and sets default values for any values not
1484 * specified by the user.
1485 */
1486static void verify_configuration(libtrace_t *libtrace) {
1487
1488        if (libtrace->config.hasher_queue_size <= 0)
1489                libtrace->config.hasher_queue_size = 1000;
1490
1491        if (libtrace->config.perpkt_threads <= 0) {
1492                libtrace->perpkt_thread_count = get_nb_cores();
1493                if (libtrace->perpkt_thread_count <= 0)
1494                        // Lets just use one
1495                        libtrace->perpkt_thread_count = 1;
1496        } else {
1497                libtrace->perpkt_thread_count = libtrace->config.perpkt_threads;
1498        }
1499
1500        if (libtrace->config.reporter_thold <= 0)
1501                libtrace->config.reporter_thold = 100;
1502        if (libtrace->config.burst_size <= 0)
1503                libtrace->config.burst_size = 32;
1504        if (libtrace->config.thread_cache_size <= 0)
1505                libtrace->config.thread_cache_size = 64;
1506        if (libtrace->config.cache_size <= 0)
1507                libtrace->config.cache_size = (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count;
1508
1509        if (libtrace->config.cache_size <
1510                (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count)
1511                fprintf(stderr, "WARNING deadlocks may occur and extra memory allocating buffer sizes (packet_freelist_size) mismatched\n");
1512
1513        if (libtrace->combiner.initialise == NULL && libtrace->combiner.publish == NULL)
1514                libtrace->combiner = combiner_unordered;
1515
1516        /* Figure out if we are using a dedicated hasher thread? */
1517        if (libtrace->hasher && libtrace->perpkt_thread_count > 1) {
1518                libtrace->hasher_thread.type = THREAD_HASHER;
1519        }
1520}
1521
1522/**
1523 * Starts a libtrace_thread, including allocating memory for messaging.
1524 * Threads are expected to wait until the libtrace look is released.
1525 * Hence why we don't init structures until later.
1526 *
1527 * @param trace The trace the thread is associated with
1528 * @param t The thread that is filled when the thread is started
1529 * @param type The type of thread
1530 * @param start_routine The entry location of the thread
1531 * @param perpkt_num The perpkt thread number (should be set -1 if not perpkt)
1532 * @param name For debugging purposes set the threads name (Optional)
1533 *
1534 * @return 0 on success or -1 upon error in which case the libtrace error is set.
1535 *         In this situation the thread structure is zeroed.
1536 */
1537static int trace_start_thread(libtrace_t *trace,
1538                       libtrace_thread_t *t,
1539                       enum thread_types type,
1540                       void *(*start_routine) (void *),
1541                       int perpkt_num,
1542                       const char *name) {
1543#ifdef __linux__
1544        pthread_attr_t attrib;
1545        cpu_set_t cpus;
1546        int i;
1547#endif
1548        int ret;
1549        assert(t->type == THREAD_EMPTY);
1550        t->trace = trace;
1551        t->ret = NULL;
1552        t->user_data = NULL;
1553        t->type = type;
1554        t->state = THREAD_RUNNING;
1555
1556        assert(name);
1557
1558#ifdef __linux__
1559        CPU_ZERO(&cpus);
1560        for (i = 0; i < get_nb_cores(); i++)
1561                CPU_SET(i, &cpus);
1562        pthread_attr_init(&attrib);
1563        pthread_attr_setaffinity_np(&attrib, sizeof(cpus), &cpus);
1564        ret = pthread_create(&t->tid, &attrib, start_routine, (void *) trace);
1565        pthread_attr_destroy(&attrib);
1566#else
1567        ret = pthread_create(&t->tid, NULL, start_routine, (void *) trace);
1568#endif
1569        if (ret != 0) {
1570                libtrace_zero_thread(t);
1571                trace_set_err(trace, ret, "Failed to create a thread of type=%d\n", type);
1572                return -1;
1573        }
1574        libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
1575        if (trace_has_dedicated_hasher(trace) && type == THREAD_PERPKT) {
1576                libtrace_ringbuffer_init(&t->rbuffer,
1577                                         trace->config.hasher_queue_size,
1578                                         trace->config.hasher_polling?
1579                                                 LIBTRACE_RINGBUFFER_POLLING:
1580                                                 LIBTRACE_RINGBUFFER_BLOCKING);
1581        }
1582#if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__)
1583        if(name)
1584                pthread_setname_np(t->tid, name);
1585#endif
1586        t->perpkt_num = perpkt_num;
1587        return 0;
1588}
1589
1590/** Parses the environment variable LIBTRACE_CONF into the supplied
1591 * configuration structure.
1592 *
1593 * @param[in,out] libtrace The trace from which we determine the URI and set
1594 * the configuration.
1595 *
1596 * We search for 3 environment variables and apply them to the config in the
1597 * following order. Such that the first has the lowest priority.
1598 *
1599 * 1. LIBTRACE_CONF, The global environment configuration
1600 * 2. LIBTRACE_CONF_<FORMAT>, Applied to a given format
1601 * 3. LIBTRACE_CONF_<FORMAT_URI>, Applied the specified trace
1602 *
1603 * E.g.
1604 * - int:eth0 would match LIBTRACE_CONF, LIBTRACE_CONF_INT, LIBTRACE_CONF_INT_ETH0
1605 * - dag:/dev/dag0,0 would match LIBTRACE_CONF, LIBTRACE_CONF_DAG, LIBTRACE_CONF_DAG__DEV_DAG0_0
1606 * - test.erf would match LIBTRACE_CONF, LIBTRACE_CONF_ERF, LIBTRACE_CONF_ERF_TEST_ERF
1607 *
1608 * @note All environment variables names MUST only contian
1609 * [A-Z], [0-9] and [_] (underscore) and not start with a number. Any characters
1610 * outside of this range should be captilised if possible or replaced with an
1611 * underscore.
1612 */
1613static void parse_env_config (libtrace_t *libtrace) {
1614        char env_name[1024] = "LIBTRACE_CONF_";
1615        size_t len = strlen(env_name);
1616        size_t mark = 0;
1617        size_t i;
1618        char * env;
1619
1620        /* Make our compound string */
1621        strncpy(&env_name[len], libtrace->format->name, sizeof(env_name) - len);
1622        len += strlen(libtrace->format->name);
1623        strncpy(&env_name[len], ":", sizeof(env_name) - len);
1624        len += 1;
1625        strncpy(&env_name[len], libtrace->uridata, sizeof(env_name) - len);
1626
1627        /* env names are allowed to be A-Z (CAPS) 0-9 and _ */
1628        for (i = 0; env_name[i] != 0; ++i) {
1629                env_name[i] = toupper(env_name[i]);
1630                if(env_name[i] == ':') {
1631                        mark = i;
1632                }
1633                if (!( (env_name[i] >= 'A' && env_name[i] <= 'Z') ||
1634                       (env_name[i] >= '0' && env_name[i] <= '9') )) {
1635                        env_name[i] = '_';
1636                }
1637        }
1638
1639        /* First apply global env settings LIBTRACE_CONF */
1640        env = getenv("LIBTRACE_CONF");
1641        if (env)
1642        {
1643                printf("Got env %s", env);
1644                trace_set_configuration(libtrace, env);
1645        }
1646
1647        /* Then format settings LIBTRACE_CONF_<FORMAT> */
1648        if (mark != 0) {
1649                env_name[mark] = 0;
1650                env = getenv(env_name);
1651                if (env) {
1652                        trace_set_configuration(libtrace, env);
1653                }
1654                env_name[mark] = '_';
1655        }
1656
1657        /* Finally this specific trace LIBTRACE_CONF_<FORMAT_URI> */
1658        env = getenv(env_name);
1659        if (env) {
1660                trace_set_configuration(libtrace, env);
1661        }
1662}
1663
1664DLLEXPORT bool trace_is_parallel(libtrace_t * libtrace) {
1665        if (libtrace->state == STATE_NEW)
1666                return trace_supports_parallel(libtrace);
1667        return libtrace->pread == trace_pread_packet_wrapper;
1668}
1669
1670DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob,
1671                           libtrace_callback_set_t *per_packet_cbs,
1672                           libtrace_callback_set_t *reporter_cbs) {
1673        int i;
1674        int ret = -1;
1675        char name[16];
1676        sigset_t sig_before, sig_block_all;
1677        assert(libtrace);
1678
1679        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1680        if (trace_is_err(libtrace)) {
1681                goto cleanup_none;
1682        }
1683
1684        if (libtrace->state == STATE_PAUSED) {
1685                ret = trace_prestart(libtrace, global_blob, per_packet_cbs, 
1686                                reporter_cbs);
1687                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1688                return ret;
1689        }
1690
1691        if (libtrace->state != STATE_NEW) {
1692                trace_set_err(libtrace, TRACE_ERR_BAD_STATE, "trace_pstart "
1693                              "should be called on a NEW or PAUSED trace but "
1694                              "instead was called from %s",
1695                              get_trace_state_name(libtrace->state));
1696                goto cleanup_none;
1697        }
1698
1699        /* Store the user defined things against the trace */
1700        libtrace->global_blob = global_blob;
1701
1702        /* Save a copy of the callbacks in case the user tries to change them
1703         * on us later */
1704        if (!per_packet_cbs) {
1705                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "trace_pstart "
1706                                "requires a non-NULL set of per packet "
1707                                "callbacks.");
1708                goto cleanup_none;
1709        }
1710
1711        if (per_packet_cbs->message_packet == NULL) {
1712                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "The per "
1713                                "packet callbacks must include a handler "
1714                                "for a packet. Please set this using "
1715                                "trace_set_packet_cb().");
1716                goto cleanup_none;
1717        }
1718
1719        libtrace->perpkt_cbs = trace_create_callback_set();
1720        memcpy(libtrace->perpkt_cbs, per_packet_cbs, sizeof(libtrace_callback_set_t));
1721       
1722        if (reporter_cbs) {
1723                libtrace->reporter_cbs = trace_create_callback_set();
1724                memcpy(libtrace->reporter_cbs, reporter_cbs, sizeof(libtrace_callback_set_t));
1725        }
1726
1727       
1728
1729
1730        /* And zero other fields */
1731        for (i = 0; i < THREAD_STATE_MAX; ++i) {
1732                libtrace->perpkt_thread_states[i] = 0;
1733        }
1734        libtrace->first_packets.first = 0;
1735        libtrace->first_packets.count = 0;
1736        libtrace->first_packets.packets = NULL;
1737        libtrace->perpkt_threads = NULL;
1738        /* Set a global which says we are using a parallel trace. This is
1739         * for backwards compatibility due to changes when destroying packets */
1740        libtrace_parallel = 1;
1741
1742        /* Parses configuration passed through environment variables */
1743        parse_env_config(libtrace);
1744        verify_configuration(libtrace);
1745
1746        ret = -1;
1747        /* Try start the format - we prefer parallel over single threaded, as
1748         * these formats should support messages better */
1749
1750        if (trace_supports_parallel(libtrace) &&
1751            !trace_has_dedicated_hasher(libtrace)) {
1752                ret = libtrace->format->pstart_input(libtrace);
1753                libtrace->pread = trace_pread_packet_wrapper;
1754        }
1755        if (ret != 0) {
1756                if (libtrace->format->start_input) {
1757                        ret = libtrace->format->start_input(libtrace);
1758                }
1759                if (libtrace->perpkt_thread_count > 1) {
1760                        libtrace->pread = trace_pread_packet_first_in_first_served;
1761                        /* Don't wait for a burst of packets if the format is
1762                         * live as this could block ring based formats and
1763                         * introduces delay. */
1764                        if (libtrace->format->info.live) {
1765                                libtrace->config.burst_size = 1;
1766                        }
1767                }
1768                else {
1769                        /* Use standard read_packet */
1770                        libtrace->pread = NULL;
1771                }
1772        }
1773
1774        if (ret != 0) {
1775                goto cleanup_none;
1776        }
1777
1778        /* --- Start all the threads we need --- */
1779        /* Disable signals because it is inherited by the threads we start */
1780        sigemptyset(&sig_block_all);
1781        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_block_all, &sig_before), == 0);
1782
1783        /* If we need a hasher thread start it
1784         * Special Case: If single threaded we don't need a hasher
1785         */
1786        if (trace_has_dedicated_hasher(libtrace)) {
1787                libtrace->hasher_thread.type = THREAD_EMPTY;
1788                ret = trace_start_thread(libtrace, &libtrace->hasher_thread,
1789                                   THREAD_HASHER, hasher_entry, -1,
1790                                   "hasher-thread");
1791                if (ret != 0)
1792                        goto cleanup_started;
1793                libtrace->pread = trace_pread_packet_hasher_thread;
1794        } else {
1795                libtrace->hasher_thread.type = THREAD_EMPTY;
1796        }
1797
1798        /* Start up our perpkt threads */
1799        libtrace->perpkt_threads = calloc(sizeof(libtrace_thread_t),
1800                                          libtrace->perpkt_thread_count);
1801        if (!libtrace->perpkt_threads) {
1802                trace_set_err(libtrace, errno, "trace_pstart "
1803                              "failed to allocate memory.");
1804                goto cleanup_threads;
1805        }
1806        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1807                snprintf(name, sizeof(name), "perpkt-%d", i);
1808                libtrace_zero_thread(&libtrace->perpkt_threads[i]);
1809                ret = trace_start_thread(libtrace, &libtrace->perpkt_threads[i],
1810                                   THREAD_PERPKT, perpkt_threads_entry, i,
1811                                   name);
1812                if (ret != 0)
1813                        goto cleanup_threads;
1814        }
1815
1816        /* Start the reporter thread */
1817        if (reporter_cbs) {
1818                if (libtrace->combiner.initialise)
1819                        libtrace->combiner.initialise(libtrace, &libtrace->combiner);
1820                ret = trace_start_thread(libtrace, &libtrace->reporter_thread,
1821                                   THREAD_REPORTER, reporter_entry, -1,
1822                                   "reporter_thread");
1823                if (ret != 0)
1824                        goto cleanup_threads;
1825        }
1826
1827        /* Start the keepalive thread */
1828        if (libtrace->config.tick_interval > 0) {
1829                ret = trace_start_thread(libtrace, &libtrace->keepalive_thread,
1830                                   THREAD_KEEPALIVE, keepalive_entry, -1,
1831                                   "keepalive_thread");
1832                if (ret != 0)
1833                        goto cleanup_threads;
1834        }
1835
1836        /* Init other data structures */
1837        libtrace->perpkt_thread_states[THREAD_RUNNING] = libtrace->perpkt_thread_count;
1838        ASSERT_RET(pthread_spin_init(&libtrace->first_packets.lock, 0), == 0);
1839        libtrace->first_packets.packets = calloc(libtrace->perpkt_thread_count,
1840                                                 sizeof(*libtrace->first_packets.packets));
1841        if (libtrace->first_packets.packets == NULL) {
1842                trace_set_err(libtrace, errno, "trace_pstart "
1843                              "failed to allocate memory.");
1844                goto cleanup_threads;
1845        }
1846
1847        if (libtrace_ocache_init(&libtrace->packet_freelist,
1848                             (void* (*)()) trace_create_packet,
1849                             (void (*)(void *))trace_destroy_packet,
1850                             libtrace->config.thread_cache_size,
1851                             libtrace->config.cache_size * 4,
1852                             libtrace->config.fixed_count) != 0) {
1853                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "trace_pstart "
1854                              "failed to allocate ocache.");
1855                goto cleanup_threads;
1856        }
1857
1858        /* Threads don't start */
1859        libtrace->started = true;
1860        libtrace_change_state(libtrace, STATE_RUNNING, false);
1861
1862        ret = 0;
1863        goto success;
1864cleanup_threads:
1865        if (libtrace->first_packets.packets) {
1866                free(libtrace->first_packets.packets);
1867                libtrace->first_packets.packets = NULL;
1868        }
1869        libtrace_change_state(libtrace, STATE_ERROR, false);
1870        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1871        if (libtrace->hasher_thread.type == THREAD_HASHER) {
1872                pthread_join(libtrace->hasher_thread.tid, NULL);
1873                libtrace_zero_thread(&libtrace->hasher_thread);
1874        }
1875
1876        if (libtrace->perpkt_threads) {
1877                for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1878                        if (libtrace->perpkt_threads[i].type == THREAD_PERPKT) {
1879                                pthread_join(libtrace->perpkt_threads[i].tid, NULL);
1880                                libtrace_zero_thread(&libtrace->perpkt_threads[i]);
1881                        } else break;
1882                }
1883                free(libtrace->perpkt_threads);
1884                libtrace->perpkt_threads = NULL;
1885        }
1886
1887        if (libtrace->reporter_thread.type == THREAD_REPORTER) {
1888                pthread_join(libtrace->reporter_thread.tid, NULL);
1889                libtrace_zero_thread(&libtrace->reporter_thread);
1890        }
1891
1892        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
1893                pthread_join(libtrace->keepalive_thread.tid, NULL);
1894                libtrace_zero_thread(&libtrace->keepalive_thread);
1895        }
1896        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1897        libtrace_change_state(libtrace, STATE_NEW, false);
1898        assert(libtrace->perpkt_thread_states[THREAD_RUNNING] == 0);
1899        libtrace->perpkt_thread_states[THREAD_FINISHED] = 0;
1900cleanup_started:
1901        if (libtrace->pread == trace_pread_packet_wrapper) {
1902                if (libtrace->format->ppause_input)
1903                        libtrace->format->ppause_input(libtrace);
1904        } else {
1905                if (libtrace->format->pause_input)
1906                        libtrace->format->pause_input(libtrace);
1907        }
1908        ret = -1;
1909success:
1910        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_before, NULL), == 0);
1911cleanup_none:
1912        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1913        return ret;
1914}
1915
1916DLLEXPORT int trace_set_starting_cb(libtrace_callback_set_t *cbset,
1917                fn_cb_starting handler) {
1918        cbset->message_starting = handler;
1919        return 0;
1920}
1921
1922DLLEXPORT int trace_set_pausing_cb(libtrace_callback_set_t *cbset,
1923                fn_cb_dataless handler) {
1924        cbset->message_pausing = handler;
1925        return 0;
1926}
1927
1928DLLEXPORT int trace_set_resuming_cb(libtrace_callback_set_t *cbset,
1929                fn_cb_dataless handler) {
1930        cbset->message_resuming = handler;
1931        return 0;
1932}
1933
1934DLLEXPORT int trace_set_stopping_cb(libtrace_callback_set_t *cbset,
1935                fn_cb_dataless handler) {
1936        cbset->message_stopping = handler;
1937        return 0;
1938}
1939
1940DLLEXPORT int trace_set_packet_cb(libtrace_callback_set_t *cbset,
1941                fn_cb_packet handler) {
1942        cbset->message_packet = handler;
1943        return 0;
1944}
1945
1946DLLEXPORT int trace_set_first_packet_cb(libtrace_callback_set_t *cbset,
1947                fn_cb_first_packet handler) {
1948        cbset->message_first_packet = handler;
1949        return 0;
1950}
1951
1952DLLEXPORT int trace_set_tick_count_cb(libtrace_callback_set_t *cbset,
1953                fn_cb_tick handler) {
1954        cbset->message_tick_count = handler;
1955        return 0;
1956}
1957
1958DLLEXPORT int trace_set_tick_interval_cb(libtrace_callback_set_t *cbset,
1959                fn_cb_tick handler) {
1960        cbset->message_tick_interval = handler;
1961        return 0;
1962}
1963
1964DLLEXPORT int trace_set_result_cb(libtrace_callback_set_t *cbset,
1965                fn_cb_result handler) {
1966        cbset->message_result = handler;
1967        return 0;
1968}
1969
1970DLLEXPORT int trace_set_user_message_cb(libtrace_callback_set_t *cbset,
1971                fn_cb_usermessage handler) {
1972        cbset->message_user = handler;
1973        return 0;
1974}
1975
1976/*
1977 * Pauses a trace, this should only be called by the main thread
1978 * 1. Set started = false
1979 * 2. All perpkt threads are paused waiting on a condition var
1980 * 3. Then call ppause on the underlying format if found
1981 * 4. The traces state is paused
1982 *
1983 * Once done you should be able to modify the trace setup and call pstart again
1984 * TODO add support to change the number of threads.
1985 */
1986DLLEXPORT int trace_ppause(libtrace_t *libtrace)
1987{
1988        libtrace_thread_t *t;
1989        int i;
1990        assert(libtrace);
1991
1992        t = get_thread_table(libtrace);
1993        // Check state from within the lock if we are going to change it
1994        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1995
1996        /* If we are already paused, just treat this as a NOOP */
1997        if (libtrace->state == STATE_PAUSED) {
1998                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1999                return 0;
2000        }
2001        if (!libtrace->started || libtrace->state != STATE_RUNNING) {
2002                trace_set_err(libtrace,TRACE_ERR_BAD_STATE, "You must call trace_start() before calling trace_ppause()");
2003                return -1;
2004        }
2005
2006        libtrace_change_state(libtrace, STATE_PAUSING, false);
2007        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2008
2009        // Special case handle the hasher thread case
2010        if (trace_has_dedicated_hasher(libtrace)) {
2011                if (libtrace->config.debug_state)
2012                        fprintf(stderr, "Hasher thread is running, asking it to pause ...");
2013                libtrace_message_t message = {0, {.uint64=0}, NULL};
2014                message.code = MESSAGE_DO_PAUSE;
2015                trace_message_thread(libtrace, &libtrace->hasher_thread, &message);
2016                // Wait for it to pause
2017                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
2018                while (libtrace->hasher_thread.state == THREAD_RUNNING) {
2019                        ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
2020                }
2021                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2022                if (libtrace->config.debug_state)
2023                        fprintf(stderr, " DONE\n");
2024        }
2025
2026        if (libtrace->config.debug_state)
2027                fprintf(stderr, "Asking perpkt threads to pause ...");
2028        // Stop threads, skip this one if it's a perpkt
2029        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2030                if (&libtrace->perpkt_threads[i] != t) {
2031                        libtrace_message_t message = {0, {.uint64=0}, NULL};
2032                        message.code = MESSAGE_DO_PAUSE;
2033                        ASSERT_RET(trace_message_thread(libtrace, &libtrace->perpkt_threads[i], &message), != -1);
2034                        if(trace_has_dedicated_hasher(libtrace)) {
2035                                // The hasher has stopped and other threads have messages waiting therefore
2036                                // If the queues are empty the other threads would have no data
2037                                // So send some message packets to simply ask the threads to check
2038                                // We are the only writer since hasher has paused
2039                                libtrace_packet_t *pkt;
2040                                libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &pkt, 1, 1);
2041                                pkt->error = READ_MESSAGE;
2042                                libtrace_ringbuffer_write(&libtrace->perpkt_threads[i].rbuffer, pkt);
2043                        }
2044                } else {
2045                        fprintf(stderr, "Mapper threads should not be used to pause a trace this could cause any number of problems!!\n");
2046                }
2047        }
2048
2049        if (t) {
2050                // A perpkt is doing the pausing, interesting, fake an extra thread paused
2051                // We rely on the user to *not* return before starting the trace again
2052                thread_change_state(libtrace, t, THREAD_PAUSED, true);
2053        }
2054
2055        // Wait for all threads to pause
2056        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
2057        while(libtrace->perpkt_thread_states[THREAD_RUNNING]) {
2058                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
2059        }
2060        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2061
2062        if (libtrace->config.debug_state)
2063                fprintf(stderr, " DONE\n");
2064
2065        // Deal with the reporter
2066        if (trace_has_reporter(libtrace)) {
2067                if (libtrace->config.debug_state)
2068                        fprintf(stderr, "Reporter thread is running, asking it to pause ...");
2069                if (pthread_equal(pthread_self(), libtrace->reporter_thread.tid)) {
2070                        libtrace->combiner.pause(libtrace, &libtrace->combiner);
2071                        thread_change_state(libtrace, &libtrace->reporter_thread, THREAD_PAUSED, true);
2072               
2073                } else {
2074                        libtrace_message_t message = {0, {.uint64=0}, NULL};
2075                        message.code = MESSAGE_DO_PAUSE;
2076                        trace_message_thread(libtrace, &libtrace->reporter_thread, &message);
2077                        // Wait for it to pause
2078                        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
2079                        while (libtrace->reporter_thread.state == THREAD_RUNNING) {
2080                                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
2081                        }
2082                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2083                }
2084                if (libtrace->config.debug_state)
2085                        fprintf(stderr, " DONE\n");
2086        }
2087
2088        /* Cache values before we pause */
2089        if (libtrace->stats == NULL)
2090                libtrace->stats = trace_create_statistics();
2091        // Save the statistics against the trace
2092        trace_get_statistics(libtrace, NULL);
2093        if (trace_is_parallel(libtrace)) {
2094                libtrace->started = false;
2095                if (libtrace->format->ppause_input)
2096                        libtrace->format->ppause_input(libtrace);
2097                // TODO What happens if we don't have pause input??
2098        } else {
2099                int err;
2100                err = trace_pause(libtrace);
2101                // We should handle this a bit better
2102                if (err)
2103                        return err;
2104        }
2105
2106        // Only set as paused after the pause has been called on the trace
2107        libtrace_change_state(libtrace, STATE_PAUSED, true);
2108        return 0;
2109}
2110
2111/**
2112 * Stop trace finish prematurely as though it meet an EOF
2113 * This should only be called by the main thread
2114 * 1. Calls ppause
2115 * 2. Sends a message asking for threads to finish
2116 * 3. Releases threads which will pause
2117 */
2118DLLEXPORT int trace_pstop(libtrace_t *libtrace)
2119{
2120        int i, err;
2121        libtrace_message_t message = {0, {.uint64=0}, NULL};
2122        assert(libtrace);
2123
2124        // Ensure all threads have paused and the underlying trace format has
2125        // been closed and all packets associated are cleaned up
2126        // Pause will do any state checks for us
2127        err = trace_ppause(libtrace);
2128        if (err)
2129                return err;
2130
2131        // Now send a message asking the threads to stop
2132        // This will be retrieved before trying to read another packet
2133
2134        message.code = MESSAGE_DO_STOP;
2135        trace_message_perpkts(libtrace, &message);
2136        if (trace_has_dedicated_hasher(libtrace))
2137                trace_message_thread(libtrace, &libtrace->hasher_thread, &message);
2138
2139        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2140                trace_message_thread(libtrace, &libtrace->perpkt_threads[i], &message);
2141        }
2142
2143        /* Now release the threads and let them stop - when the threads finish
2144         * the state will be set to finished */
2145        libtrace_change_state(libtrace, STATE_FINISHING, true);
2146        return 0;
2147}
2148
2149DLLEXPORT int trace_set_hasher(libtrace_t *trace, enum hasher_types type, fn_hasher hasher, void *data) {
2150        int ret = -1;
2151        if ((type == HASHER_CUSTOM && !hasher) || (type == HASHER_BALANCE && hasher)) {
2152                return -1;
2153        }
2154
2155        // Save the requirements
2156        trace->hasher_type = type;
2157        if (hasher) {
2158                trace->hasher = hasher;
2159                trace->hasher_data = data;
2160        } else {
2161                trace->hasher = NULL;
2162                trace->hasher_data = NULL;
2163        }
2164
2165        // Try push this to hardware - NOTE hardware could do custom if
2166        // there is a more efficient way to apply it, in this case
2167        // it will simply grab the function out of libtrace_t
2168        if (trace_supports_parallel(trace) && trace->format->config_input)
2169                ret = trace->format->config_input(trace, TRACE_OPTION_HASHER, &type);
2170
2171        if (ret == -1) {
2172                /* We have to deal with this ourself */
2173                if (!hasher) {
2174                        switch (type)
2175                        {
2176                                case HASHER_CUSTOM:
2177                                case HASHER_BALANCE:
2178                                        return 0;
2179                                case HASHER_BIDIRECTIONAL:
2180                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
2181                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
2182                                        toeplitz_init_config(trace->hasher_data, 1);
2183                                        return 0;
2184                                case HASHER_UNIDIRECTIONAL:
2185                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
2186                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
2187                                        toeplitz_init_config(trace->hasher_data, 0);
2188                                        return 0;
2189                        }
2190                        return -1;
2191                }
2192        } else {
2193                /* If the hasher is hardware we zero out the hasher and hasher
2194                 * data fields - only if we need a hasher do we do this */
2195                trace->hasher = NULL;
2196                trace->hasher_data = NULL;
2197        }
2198
2199        return 0;
2200}
2201
2202// Waits for all threads to finish
2203DLLEXPORT void trace_join(libtrace_t *libtrace) {
2204        int i;
2205
2206        /* Firstly wait for the perpkt threads to finish, since these are
2207         * user controlled */
2208        for (i=0; i< libtrace->perpkt_thread_count; i++) {
2209                ASSERT_RET(pthread_join(libtrace->perpkt_threads[i].tid, NULL), == 0);
2210                // So we must do our best effort to empty the queue - so
2211                // the producer (or any other threads) don't block.
2212                libtrace_packet_t * packet;
2213                assert(libtrace->perpkt_threads[i].state == THREAD_FINISHED);
2214                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
2215                        if (packet) // This could be NULL iff the perpkt finishes early
2216                                trace_destroy_packet(packet);
2217        }
2218
2219        /* Now the hasher */
2220        if (trace_has_dedicated_hasher(libtrace)) {
2221                pthread_join(libtrace->hasher_thread.tid, NULL);
2222                assert(libtrace->hasher_thread.state == THREAD_FINISHED);
2223        }
2224
2225        // Now that everything is finished nothing can be touching our
2226        // buffers so clean them up
2227        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2228                // Its possible 1 packet got added by the reporter (or 1 per any other thread) since we cleaned up
2229                // if they lost timeslice before-during a write
2230                libtrace_packet_t * packet;
2231                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
2232                        trace_destroy_packet(packet);
2233                if (trace_has_dedicated_hasher(libtrace)) {
2234                        assert(libtrace_ringbuffer_is_empty(&libtrace->perpkt_threads[i].rbuffer));
2235                        libtrace_ringbuffer_destroy(&libtrace->perpkt_threads[i].rbuffer);
2236                }
2237                // Cannot destroy vector yet, this happens with trace_destroy
2238        }
2239
2240        if (trace_has_reporter(libtrace)) {
2241                pthread_join(libtrace->reporter_thread.tid, NULL);
2242                assert(libtrace->reporter_thread.state == THREAD_FINISHED);
2243        }
2244
2245        // Wait for the tick (keepalive) thread if it has been started
2246        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
2247                libtrace_message_t msg = {0, {.uint64=0}, NULL};
2248                msg.code = MESSAGE_DO_STOP;
2249                trace_message_thread(libtrace, &libtrace->keepalive_thread, &msg);
2250                pthread_join(libtrace->keepalive_thread.tid, NULL);
2251        }
2252
2253        libtrace_change_state(libtrace, STATE_JOINED, true);
2254        print_memory_stats();
2255}
2256
2257DLLEXPORT int libtrace_thread_get_message_count(libtrace_t * libtrace,
2258                                                libtrace_thread_t *t)
2259{
2260        int ret;
2261        if (t == NULL)
2262                t = get_thread_descriptor(libtrace);
2263        if (t == NULL)
2264                return -1;
2265        ret = libtrace_message_queue_count(&t->messages);
2266        return ret < 0 ? 0 : ret;
2267}
2268
2269DLLEXPORT int libtrace_thread_get_message(libtrace_t * libtrace,
2270                                          libtrace_thread_t *t,
2271                                          libtrace_message_t * message)
2272{
2273        int ret;
2274        if (t == NULL)
2275                t = get_thread_descriptor(libtrace);
2276        if (t == NULL)
2277                return -1;
2278        ret = libtrace_message_queue_get(&t->messages, message);
2279        return ret < 0 ? 0 : ret;
2280}
2281
2282DLLEXPORT int libtrace_thread_try_get_message(libtrace_t * libtrace,
2283                                              libtrace_thread_t *t,
2284                                              libtrace_message_t * message)
2285{
2286        if (t == NULL)
2287                t = get_thread_descriptor(libtrace);
2288        if (t == NULL)
2289                return -1;
2290        if (libtrace_message_queue_try_get(&t->messages, message) != LIBTRACE_MQ_FAILED)
2291                return 0;
2292        else
2293                return -1;
2294}
2295
2296DLLEXPORT int trace_message_thread(libtrace_t * libtrace, libtrace_thread_t *t, libtrace_message_t * message)
2297{
2298        int ret;
2299        if (!message->sender)
2300                message->sender = get_thread_descriptor(libtrace);
2301
2302        ret = libtrace_message_queue_put(&t->messages, message);
2303        return ret < 0 ? 0 : ret;
2304}
2305
2306DLLEXPORT int trace_message_reporter(libtrace_t * libtrace, libtrace_message_t * message)
2307{
2308        if (!trace_has_reporter(libtrace) ||
2309            !(libtrace->reporter_thread.state == THREAD_RUNNING
2310              || libtrace->reporter_thread.state == THREAD_PAUSED))
2311                return -1;
2312
2313        return trace_message_thread(libtrace, &libtrace->reporter_thread, message);
2314}
2315
2316DLLEXPORT int trace_post_reporter(libtrace_t *libtrace)
2317{
2318        libtrace_message_t message = {0, {.uint64=0}, NULL};
2319        message.code = MESSAGE_POST_REPORTER;
2320        return trace_message_reporter(libtrace, (void *) &message);
2321}
2322
2323DLLEXPORT int trace_message_perpkts(libtrace_t * libtrace, libtrace_message_t * message)
2324{
2325        int i;
2326        int missed = 0;
2327        if (message->sender == NULL)
2328                message->sender = get_thread_descriptor(libtrace);
2329        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2330                if (libtrace->perpkt_threads[i].state == THREAD_RUNNING ||
2331                    libtrace->perpkt_threads[i].state == THREAD_PAUSED) {
2332                        libtrace_message_queue_put(&libtrace->perpkt_threads[i].messages, message);
2333                } else {
2334                        missed += 1;
2335                }
2336        }
2337        return -missed;
2338}
2339
2340/**
2341 * Publishes a result to the reduce queue
2342 * Should only be called by a perpkt thread, i.e. from a perpkt handler
2343 */
2344DLLEXPORT void trace_publish_result(libtrace_t *libtrace, libtrace_thread_t *t, uint64_t key, libtrace_generic_t value, int type) {
2345        libtrace_result_t res;
2346        res.type = type;
2347        res.key = key;
2348        res.value = value;
2349        assert(libtrace->combiner.publish);
2350        libtrace->combiner.publish(libtrace, t->perpkt_num, &libtrace->combiner, &res);
2351        return;
2352}
2353
2354DLLEXPORT void trace_set_combiner(libtrace_t *trace, const libtrace_combine_t *combiner, libtrace_generic_t config){
2355        if (combiner) {
2356                trace->combiner = *combiner;
2357                trace->combiner.configuration = config;
2358        } else {
2359                // No combiner, so don't try use it
2360                memset(&trace->combiner, 0, sizeof(trace->combiner));
2361        }
2362}
2363
2364DLLEXPORT uint64_t trace_packet_get_order(libtrace_packet_t * packet) {
2365        return packet->order;
2366}
2367
2368DLLEXPORT uint64_t trace_packet_get_hash(libtrace_packet_t * packet) {
2369        return packet->hash;
2370}
2371
2372DLLEXPORT void trace_packet_set_order(libtrace_packet_t * packet, uint64_t order) {
2373        packet->order = order;
2374}
2375
2376DLLEXPORT void trace_packet_set_hash(libtrace_packet_t * packet, uint64_t hash) {
2377        packet->hash = hash;
2378}
2379
2380DLLEXPORT bool trace_has_finished(libtrace_t * libtrace) {
2381        return libtrace->state == STATE_FINISHED || libtrace->state == STATE_JOINED;
2382}
2383
2384/**
2385 * @return True if the trace is not running such that it can be configured
2386 */
2387static inline bool trace_is_configurable(libtrace_t *trace) {
2388        return trace->state == STATE_NEW ||
2389                        trace->state == STATE_PAUSED;
2390}
2391
2392DLLEXPORT int trace_set_perpkt_threads(libtrace_t *trace, int nb) {
2393        if (!trace_is_configurable(trace)) return -1;
2394
2395        /* TODO consider allowing an offset from the total number of cores i.e.
2396         * -1 reserve 1 core */
2397        if (nb >= 0) {
2398                trace->config.perpkt_threads = nb;
2399                return 0;
2400        } else {
2401                return -1;
2402        }
2403}
2404
2405DLLEXPORT int trace_set_tick_interval(libtrace_t *trace, size_t millisec) {
2406        if (!trace_is_configurable(trace)) return -1;
2407
2408        trace->config.tick_interval = millisec;
2409        return 0;
2410}
2411
2412DLLEXPORT int trace_set_tick_count(libtrace_t *trace, size_t count) {
2413        if (!trace_is_configurable(trace)) return -1;
2414
2415        trace->config.tick_count = count;
2416        return 0;
2417}
2418
2419DLLEXPORT int trace_set_tracetime(libtrace_t *trace, bool tracetime) {
2420        if (!trace_is_configurable(trace)) return -1;
2421
2422        trace->tracetime = tracetime;
2423        return 0;
2424}
2425
2426DLLEXPORT int trace_set_cache_size(libtrace_t *trace, size_t size) {
2427        if (!trace_is_configurable(trace)) return -1;
2428
2429        trace->config.cache_size = size;
2430        return 0;
2431}
2432
2433DLLEXPORT int trace_set_thread_cache_size(libtrace_t *trace, size_t size) {
2434        if (!trace_is_configurable(trace)) return -1;
2435
2436        trace->config.thread_cache_size = size;
2437        return 0;
2438}
2439
2440DLLEXPORT int trace_set_fixed_count(libtrace_t *trace, bool fixed) {
2441        if (!trace_is_configurable(trace)) return -1;
2442
2443        trace->config.fixed_count = fixed;
2444        return 0;
2445}
2446
2447DLLEXPORT int trace_set_burst_size(libtrace_t *trace, size_t size) {
2448        if (!trace_is_configurable(trace)) return -1;
2449
2450        trace->config.burst_size = size;
2451        return 0;
2452}
2453
2454DLLEXPORT int trace_set_hasher_queue_size(libtrace_t *trace, size_t size) {
2455        if (!trace_is_configurable(trace)) return -1;
2456
2457        trace->config.hasher_queue_size = size;
2458        return 0;
2459}
2460
2461DLLEXPORT int trace_set_hasher_polling(libtrace_t *trace, bool polling) {
2462        if (!trace_is_configurable(trace)) return -1;
2463
2464        trace->config.hasher_polling = polling;
2465        return 0;
2466}
2467
2468DLLEXPORT int trace_set_reporter_polling(libtrace_t *trace, bool polling) {
2469        if (!trace_is_configurable(trace)) return -1;
2470
2471        trace->config.reporter_polling = polling;
2472        return 0;
2473}
2474
2475DLLEXPORT int trace_set_reporter_thold(libtrace_t *trace, size_t thold) {
2476        if (!trace_is_configurable(trace)) return -1;
2477
2478        trace->config.reporter_thold = thold;
2479        return 0;
2480}
2481
2482DLLEXPORT int trace_set_debug_state(libtrace_t *trace, bool debug_state) {
2483        if (!trace_is_configurable(trace)) return -1;
2484
2485        trace->config.debug_state = debug_state;
2486        return 0;
2487}
2488
2489static bool config_bool_parse(char *value, size_t nvalue) {
2490        if (strncmp(value, "true", nvalue) == 0)
2491                return true;
2492        else if (strncmp(value, "false", nvalue) == 0)
2493                return false;
2494        else
2495                return strtoll(value, NULL, 10) != 0;
2496}
2497
2498/* Note update documentation on trace_set_configuration */
2499static void config_string(struct user_configuration *uc, char *key, size_t nkey, char *value, size_t nvalue) {
2500        assert(key);
2501        assert(value);
2502        assert(uc);
2503        if (strncmp(key, "cache_size", nkey) == 0
2504            || strncmp(key, "cs", nkey) == 0) {
2505                uc->cache_size = strtoll(value, NULL, 10);
2506        } else if (strncmp(key, "thread_cache_size", nkey) == 0
2507                   || strncmp(key, "tcs", nkey) == 0) {
2508                uc->thread_cache_size = strtoll(value, NULL, 10);
2509        } else if (strncmp(key, "fixed_count", nkey) == 0
2510                   || strncmp(key, "fc", nkey) == 0) {
2511                uc->fixed_count = config_bool_parse(value, nvalue);
2512        } else if (strncmp(key, "burst_size", nkey) == 0
2513                   || strncmp(key, "bs", nkey) == 0) {
2514                uc->burst_size = strtoll(value, NULL, 10);
2515        } else if (strncmp(key, "tick_interval", nkey) == 0
2516                   || strncmp(key, "ti", nkey) == 0) {
2517                uc->tick_interval = strtoll(value, NULL, 10);
2518        } else if (strncmp(key, "tick_count", nkey) == 0
2519                   || strncmp(key, "tc", nkey) == 0) {
2520                uc->tick_count = strtoll(value, NULL, 10);
2521        } else if (strncmp(key, "perpkt_threads", nkey) == 0
2522                   || strncmp(key, "pt", nkey) == 0) {
2523                uc->perpkt_threads = strtoll(value, NULL, 10);
2524        } else if (strncmp(key, "hasher_queue_size", nkey) == 0
2525                   || strncmp(key, "hqs", nkey) == 0) {
2526                uc->hasher_queue_size = strtoll(value, NULL, 10);
2527        } else if (strncmp(key, "hasher_polling", nkey) == 0
2528                   || strncmp(key, "hp", nkey) == 0) {
2529                uc->hasher_polling = config_bool_parse(value, nvalue);
2530        } else if (strncmp(key, "reporter_polling", nkey) == 0
2531                   || strncmp(key, "rp", nkey) == 0) {
2532                uc->reporter_polling = config_bool_parse(value, nvalue);
2533        } else if (strncmp(key, "reporter_thold", nkey) == 0
2534                   || strncmp(key, "rt", nkey) == 0) {
2535                uc->reporter_thold = strtoll(value, NULL, 10);
2536        } else if (strncmp(key, "debug_state", nkey) == 0
2537                   || strncmp(key, "ds", nkey) == 0) {
2538                uc->debug_state = config_bool_parse(value, nvalue);
2539        } else {
2540                fprintf(stderr, "No matching option %s(=%s), ignoring\n", key, value);
2541        }
2542}
2543
2544DLLEXPORT int trace_set_configuration(libtrace_t *trace, const char *str) {
2545        char *pch;
2546        char key[100];
2547        char value[100];
2548        char *dup;
2549        assert(str);
2550        assert(trace);
2551
2552        if (!trace_is_configurable(trace)) return -1;
2553
2554        dup = strdup(str);
2555        pch = strtok (dup," ,.-");
2556        while (pch != NULL)
2557        {
2558                if (sscanf(pch, "%99[^=]=%99s", key, value) == 2) {
2559                        config_string(&trace->config, key, sizeof(key), value, sizeof(value));
2560                } else {
2561                        fprintf(stderr, "Error parsing option %s\n", pch);
2562                }
2563                pch = strtok (NULL," ,.-");
2564        }
2565        free(dup);
2566
2567        return 0;
2568}
2569
2570DLLEXPORT int trace_set_configuration_file(libtrace_t *trace, FILE *file) {
2571        char line[1024];
2572        if (!trace_is_configurable(trace)) return -1;
2573
2574        while (fgets(line, sizeof(line), file) != NULL)
2575        {
2576                trace_set_configuration(trace, line);
2577        }
2578
2579        if(ferror(file))
2580                return -1;
2581        else
2582                return 0;
2583}
2584
2585DLLEXPORT void trace_free_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
2586        assert(packet);
2587        /* Always release any resources this might be holding */
2588        trace_fin_packet(packet);
2589        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packet, 1, 1);
2590}
2591
2592DLLEXPORT libtrace_info_t *trace_get_information(libtrace_t * libtrace) {
2593        if (libtrace->format)
2594                return &libtrace->format->info;
2595        else
2596                return NULL;
2597}
Note: See TracBrowser for help on using the repository browser.