source: lib/trace_parallel.c @ 4007dbb

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 4007dbb was 4007dbb, checked in by Richard Sanger <rsangerarj@…>, 6 years ago

Updates the new interface to be more complete

This should work around any issues with systems without thread support.
This still remains compatible with existing code.
Examples/tools/tests still need to be updated to make use of the new interface.
And tests also need to be updated.
Adds debug memory stats as an option to configure.

  • Property mode set to 100644
File size: 78.5 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * All rights reserved.
8 *
9 * This code has been developed by the University of Waikato WAND
10 * research group. For further information please see http://www.wand.net.nz/
11 *
12 * libtrace is free software; you can redistribute it and/or modify
13 * it under the terms of the GNU General Public License as published by
14 * the Free Software Foundation; either version 2 of the License, or
15 * (at your option) any later version.
16 *
17 * libtrace is distributed in the hope that it will be useful,
18 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 * GNU General Public License for more details.
21 *
22 * You should have received a copy of the GNU General Public License
23 * along with libtrace; if not, write to the Free Software
24 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25 *
26 * $Id$
27 *
28 */
29
30
31#define _GNU_SOURCE
32#include "common.h"
33#include "config.h"
34#include <assert.h>
35#include <errno.h>
36#include <fcntl.h>
37#include <stdio.h>
38#include <stdlib.h>
39#include <string.h>
40#include <sys/stat.h>
41#include <sys/types.h>
42#ifndef WIN32
43#include <sys/socket.h>
44#endif
45#include <stdarg.h>
46#include <sys/param.h>
47
48#ifdef HAVE_LIMITS_H
49#  include <limits.h>
50#endif
51
52#ifdef HAVE_SYS_LIMITS_H
53#  include <sys/limits.h>
54#endif
55
56#ifdef HAVE_NET_IF_ARP_H
57#  include <net/if_arp.h>
58#endif
59
60#ifdef HAVE_NET_IF_H
61#  include <net/if.h>
62#endif
63
64#ifdef HAVE_NETINET_IN_H
65#  include <netinet/in.h>
66#endif
67
68#ifdef HAVE_NET_ETHERNET_H
69#  include <net/ethernet.h>
70#endif
71
72#ifdef HAVE_NETINET_IF_ETHER_H
73#  include <netinet/if_ether.h>
74#endif
75
76#include <time.h>
77#ifdef WIN32
78#include <sys/timeb.h>
79#endif
80
81#include "libtrace.h"
82#include "libtrace_int.h"
83
84#ifdef HAVE_PCAP_BPF_H
85#  include <pcap-bpf.h>
86#else
87#  ifdef HAVE_NET_BPF_H
88#    include <net/bpf.h>
89#  endif
90#endif
91
92
93#include "libtrace_int.h"
94#include "format_helper.h"
95#include "rt_protocol.h"
96#include "hash_toeplitz.h"
97
98#include <pthread.h>
99#include <signal.h>
100#include <unistd.h>
101#include <ctype.h>
102
103static inline int delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t);
104extern int libtrace_parallel;
105
106struct mem_stats {
107        struct memfail {
108           uint64_t cache_hit;
109           uint64_t ring_hit;
110           uint64_t miss;
111           uint64_t recycled;
112        } readbulk, read, write, writebulk;
113};
114
115
116#ifdef ENABLE_MEM_STATS
117// Grrr gcc wants this spelt out
118__thread struct mem_stats mem_hits = {{0},{0},{0},{0}};
119
120
121static void print_memory_stats() {
122        uint64_t total;
123#if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__)
124        char t_name[50];
125        pthread_getname_np(pthread_self(), t_name, sizeof(t_name));
126
127        fprintf(stderr, "Thread ID#%d - %s\n", (int) pthread_self(), t_name);
128#else
129        fprintf(stderr, "Thread ID#%d\n", (int) pthread_self());
130#endif
131
132        total = mem_hits.read.cache_hit + mem_hits.read.ring_hit + mem_hits.read.miss;
133        if (total) {
134                fprintf(stderr, "\tRead:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
135                                mem_hits.read.cache_hit, mem_hits.read.ring_hit, mem_hits.read.miss, mem_hits.read.recycled);
136                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
137                                total, (double) mem_hits.read.miss / (double) total * 100.0);
138        }
139
140        total = mem_hits.readbulk.cache_hit + mem_hits.readbulk.ring_hit + mem_hits.readbulk.miss;
141        if (total) {
142                fprintf(stderr, "\tReadbulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
143                                mem_hits.readbulk.cache_hit, mem_hits.readbulk.ring_hit, mem_hits.readbulk.miss, mem_hits.readbulk.recycled);
144
145
146                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
147                                total, (double) mem_hits.readbulk.miss / (double) total * 100.0);
148        }
149
150        total = mem_hits.write.cache_hit + mem_hits.write.ring_hit + mem_hits.write.miss;
151        if (total) {
152                fprintf(stderr, "\tWrite:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
153                                mem_hits.write.cache_hit, mem_hits.write.ring_hit, mem_hits.write.miss, mem_hits.write.recycled);
154
155                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
156                                total, (double) mem_hits.write.miss / (double) total * 100.0);
157        }
158
159        total = mem_hits.writebulk.cache_hit + mem_hits.writebulk.ring_hit + mem_hits.writebulk.miss;
160        if (total) {
161                fprintf(stderr, "\tWritebulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
162                                mem_hits.writebulk.cache_hit, mem_hits.writebulk.ring_hit, mem_hits.writebulk.miss, mem_hits.writebulk.recycled);
163
164                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
165                                total, (double) mem_hits.writebulk.miss / (double) total * 100.0);
166        }
167}
168#else
169static void print_memory_stats() {}
170#endif
171
172static const libtrace_generic_t gen_zero = {0};
173
174/* This should optimise away the switch to nothing in the explict cases */
175static inline void send_message(libtrace_t *trace, libtrace_thread_t *thread, const enum libtrace_messages type,
176                                libtrace_generic_t data, libtrace_thread_t *sender) {
177        fn_cb_dataless fn = NULL;
178        switch (type) {
179        case MESSAGE_STARTING:
180                if (trace->callbacks.message_starting)
181                        thread->user_data = (*trace->callbacks.message_starting)(trace, thread, trace->global_blob);
182                else if (trace->per_msg)
183                        (*trace->per_msg)(trace, thread, type, data, sender);
184                return;
185        case MESSAGE_FIRST_PACKET:
186                if (trace->callbacks.message_first_packet)
187                        (*trace->callbacks.message_first_packet)(trace, thread, trace->global_blob, thread->user_data, data.pkt, sender);
188                else if (trace->per_msg)
189                        (*trace->per_msg)(trace, thread, type, data, sender);
190                return;
191        case MESSAGE_TICK_COUNT:
192                if (trace->callbacks.message_tick_count)
193                        (*trace->callbacks.message_tick_count)(trace, thread, trace->global_blob, thread->user_data, data.uint64);
194                else if (trace->per_msg)
195                        (*trace->per_msg)(trace, thread, type, data, sender);
196                return;
197        case MESSAGE_TICK_INTERVAL:
198                if (trace->callbacks.message_tick_interval)
199                        (*trace->callbacks.message_tick_interval)(trace, thread, trace->global_blob, thread->user_data,  data.uint64);
200                else if (trace->per_msg)
201                        (*trace->per_msg)(trace, thread, type, data, sender);
202                return;
203        case MESSAGE_STOPPING:
204                fn = trace->callbacks.message_stopping;
205                break;
206        case MESSAGE_RESUMING:
207                fn = trace->callbacks.message_resuming;
208                break;
209        case MESSAGE_PAUSING:
210                fn = trace->callbacks.message_pausing;
211                break;
212
213        /* These should be unused */
214        case MESSAGE_DO_PAUSE:
215        case MESSAGE_DO_STOP:
216        case MESSAGE_POST_REPORTER:
217        case MESSAGE_RESULT:
218        case MESSAGE_PACKET:
219                return;
220        case MESSAGE_USER:
221                break;
222        }
223        if (fn)
224                (*fn)(trace, thread, trace->global_blob, thread->user_data);
225        else if (trace->per_msg)
226                (*trace->per_msg)(trace, thread, type, data, sender);
227}
228
229/*
230 * This can be used once the hasher thread has been started and internally after
231 * verify_configuration.
232 */
233DLLEXPORT bool trace_has_dedicated_hasher(libtrace_t * libtrace)
234{
235        return libtrace->hasher_thread.type == THREAD_HASHER;
236}
237
238DLLEXPORT bool trace_has_reporter(libtrace_t * libtrace)
239{
240        assert(libtrace->state != STATE_NEW);
241        return libtrace->reporter_thread.type == THREAD_REPORTER && libtrace->reporter;
242}
243
244/**
245 * When running the number of perpkt threads in use.
246 * TODO what if the trace is not running yet, or has finished??
247 *
248 * @brief libtrace_perpkt_thread_nb
249 * @param t The trace
250 * @return
251 */
252DLLEXPORT int libtrace_get_perpkt_count(libtrace_t * t) {
253        return t->perpkt_thread_count;
254}
255
256/**
257 * Changes the overall traces state and signals the condition.
258 *
259 * @param trace A pointer to the trace
260 * @param new_state The new state of the trace
261 * @param need_lock Set to true if libtrace_lock is not held, otherwise
262 *        false in the case the lock is currently held by this thread.
263 */
264static inline void libtrace_change_state(libtrace_t *trace,
265        const enum trace_state new_state, const bool need_lock)
266{
267        UNUSED enum trace_state prev_state;
268        if (need_lock)
269                pthread_mutex_lock(&trace->libtrace_lock);
270        prev_state = trace->state;
271        trace->state = new_state;
272
273        if (trace->config.debug_state)
274                fprintf(stderr, "Trace(%s) state changed from %s to %s\n",
275                        trace->uridata, get_trace_state_name(prev_state),
276                        get_trace_state_name(trace->state));
277
278        pthread_cond_broadcast(&trace->perpkt_cond);
279        if (need_lock)
280                pthread_mutex_unlock(&trace->libtrace_lock);
281}
282
283/**
284 * Changes a thread's state and broadcasts the condition variable. This
285 * should always be done when the lock is held.
286 *
287 * Additionally for perpkt threads the state counts are updated.
288 *
289 * @param trace A pointer to the trace
290 * @param t A pointer to the thread to modify
291 * @param new_state The new state of the thread
292 * @param need_lock Set to true if libtrace_lock is not held, otherwise
293 *        false in the case the lock is currently held by this thread.
294 */
295static inline void thread_change_state(libtrace_t *trace, libtrace_thread_t *t,
296        const enum thread_states new_state, const bool need_lock)
297{
298        enum thread_states prev_state;
299        if (need_lock)
300                pthread_mutex_lock(&trace->libtrace_lock);
301        prev_state = t->state;
302        t->state = new_state;
303        if (t->type == THREAD_PERPKT) {
304                --trace->perpkt_thread_states[prev_state];
305                ++trace->perpkt_thread_states[new_state];
306        }
307
308        if (trace->config.debug_state)
309                fprintf(stderr, "Thread %d state changed from %d to %d\n",
310                        (int) t->tid, prev_state, t->state);
311
312        if (trace->perpkt_thread_states[THREAD_FINISHED] == trace->perpkt_thread_count)
313                libtrace_change_state(trace, STATE_FINISHED, false);
314
315        pthread_cond_broadcast(&trace->perpkt_cond);
316        if (need_lock)
317                pthread_mutex_unlock(&trace->libtrace_lock);
318}
319
320/**
321 * This is valid once a trace is initialised
322 *
323 * @return True if the format supports parallel threads.
324 */
325static inline bool trace_supports_parallel(libtrace_t *trace)
326{
327        assert(trace);
328        assert(trace->format);
329        if (trace->format->pstart_input)
330                return true;
331        else
332                return false;
333}
334
335void libtrace_zero_thread(libtrace_thread_t * t) {
336        t->accepted_packets = 0;
337        t->filtered_packets = 0;
338        t->recorded_first = false;
339        t->tracetime_offset_usec = 0;
340        t->user_data = 0;
341        t->format_data = 0;
342        libtrace_zero_ringbuffer(&t->rbuffer);
343        t->trace = NULL;
344        t->ret = NULL;
345        t->type = THREAD_EMPTY;
346        t->perpkt_num = -1;
347}
348
349// Ints are aligned int is atomic so safe to read and write at same time
350// However write must be locked, read doesn't (We never try read before written to table)
351libtrace_thread_t * get_thread_table(libtrace_t *libtrace) {
352        int i = 0;
353        pthread_t tid = pthread_self();
354
355        for (;i<libtrace->perpkt_thread_count ;++i) {
356                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
357                        return &libtrace->perpkt_threads[i];
358        }
359        return NULL;
360}
361
362static libtrace_thread_t * get_thread_descriptor(libtrace_t *libtrace) {
363        libtrace_thread_t *ret;
364        if (!(ret = get_thread_table(libtrace))) {
365                pthread_t tid = pthread_self();
366                // Check if we are reporter or something else
367                if (pthread_equal(tid, libtrace->reporter_thread.tid))
368                        ret = &libtrace->reporter_thread;
369                else if (pthread_equal(tid, libtrace->hasher_thread.tid))
370                        ret = &libtrace->hasher_thread;
371                else
372                        ret = NULL;
373        }
374        return ret;
375}
376
377DLLEXPORT void libtrace_make_packet_safe(libtrace_packet_t *pkt) {
378        // Duplicate the packet in standard malloc'd memory and free the
379        // original, This is a 1:1 exchange so is ocache count remains unchanged.
380        if (pkt->buf_control != TRACE_CTRL_PACKET) {
381                libtrace_packet_t *dup;
382                dup = trace_copy_packet(pkt);
383                /* Release the external buffer */
384                trace_fin_packet(pkt);
385                /* Copy the duplicated packet over the existing */
386                memcpy(pkt, dup, sizeof(libtrace_packet_t));
387        }
388}
389
390/**
391 * Makes a libtrace_result_t safe, used when pausing a trace.
392 * This will call libtrace_make_packet_safe if the result is
393 * a packet.
394 */
395DLLEXPORT void libtrace_make_result_safe(libtrace_result_t *res) {
396        if (res->type == RESULT_PACKET) {
397                libtrace_make_packet_safe(res->value.pkt);
398        }
399}
400
401/**
402 * Holds threads in a paused state, until released by broadcasting
403 * the condition mutex.
404 */
405static void trace_thread_pause(libtrace_t *trace, libtrace_thread_t *t) {
406        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
407        thread_change_state(trace, t, THREAD_PAUSED, false);
408        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
409                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
410        }
411        thread_change_state(trace, t, THREAD_RUNNING, false);
412        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
413}
414
415/**
416 * Sends a packet to the user, expects either a valid packet or a TICK packet.
417 *
418 * @param trace The trace
419 * @param t The current thread
420 * @param packet A pointer to the packet storage, which may be set to null upon
421 *               return, or a packet to be finished.
422 * @param tracetime If true packets are delayed to match with tracetime
423 * @return 0 is successful, otherwise if playing back in tracetime
424 *         READ_MESSAGE(-2) can be returned in which case the packet is not sent.
425 *
426 * @note READ_MESSAGE will only be returned if tracetime is true.
427 */
428static inline int dispatch_packet(libtrace_t *trace,
429                                  libtrace_thread_t *t,
430                                  libtrace_packet_t **packet,
431                                  bool tracetime) {
432
433        if ((*packet)->error > 0) {
434                if (tracetime) {
435                        if (delay_tracetime(trace, packet[0], t) == READ_MESSAGE)
436                                return READ_MESSAGE;
437                }
438                t->accepted_packets++;
439                libtrace_generic_t data = {.pkt = *packet};
440                if (trace->callbacks.message_packet)
441                        *packet = (*trace->callbacks.message_packet)(trace, t, trace->global_blob, t->user_data, *packet);
442                else if (trace->per_msg)
443                        *packet = (*trace->per_msg)(trace, t, MESSAGE_PACKET, data, t);
444                trace_fin_packet(*packet);
445        } else {
446                assert((*packet)->error == READ_TICK);
447                libtrace_generic_t data = {.uint64 = trace_packet_get_order(*packet)};
448                send_message(trace, t, MESSAGE_TICK_COUNT, data, t);
449        }
450        return 0;
451}
452
453/**
454 * Sends a batch of packets to the user, expects either a valid packet or a
455 * TICK packet.
456 *
457 * @param trace The trace
458 * @param t The current thread
459 * @param packets [in,out] An array of packets, these may be null upon return
460 * @param nb_packets The total number of packets in the list
461 * @param empty [in,out] A pointer to an integer storing the first empty slot,
462 * upon return this is updated
463 * @param offset [in,out] The offset into the array, upon return this is updated
464 * @param tracetime If true packets are delayed to match with tracetime
465 * @return 0 is successful, otherwise if playing back in tracetime
466 *         READ_MESSAGE(-2) can be returned in which case the packet is not sent.
467 *
468 * @note READ_MESSAGE will only be returned if tracetime is true.
469 */
470static inline int dispatch_packets(libtrace_t *trace,
471                                  libtrace_thread_t *t,
472                                  libtrace_packet_t *packets[],
473                                  int nb_packets, int *empty, int *offset,
474                                  bool tracetime) {
475        for (;*offset < nb_packets; ++*offset) {
476                int ret;
477                ret = dispatch_packet(trace, t, &packets[*offset], tracetime);
478                if (ret == 0) {
479                        /* Move full slots to front as we go */
480                        if (packets[*offset]) {
481                                if (*empty != *offset) {
482                                        packets[*empty] = packets[*offset];
483                                        packets[*offset] = NULL;
484                                }
485                                ++*empty;
486                        }
487                } else {
488                        /* Break early */
489                        assert(ret == READ_MESSAGE);
490                        return READ_MESSAGE;
491                }
492        }
493
494        return 0;
495}
496
497/**
498 * Pauses a per packet thread, messages will not be processed when the thread
499 * is paused.
500 *
501 * This process involves reading packets if a hasher thread is used. As such
502 * this function can fail to pause due to errors when reading in which case
503 * the thread should be stopped instead.
504 *
505 *
506 * @brief trace_perpkt_thread_pause
507 * @return READ_ERROR(-1) or READ_EOF(0) or 1 if successfull
508 */
509static int trace_perpkt_thread_pause(libtrace_t *trace, libtrace_thread_t *t,
510                                     libtrace_packet_t *packets[],
511                                     int nb_packets, int *empty, int *offset) {
512        libtrace_packet_t * packet = NULL;
513
514        /* Let the user thread know we are going to pause */
515        send_message(trace, t, MESSAGE_PAUSING, gen_zero, t);
516
517        /* Send through any remaining packets (or messages) without delay */
518
519        /* First send those packets already read, as fast as possible
520         * This should never fail or check for messages etc. */
521        ASSERT_RET(dispatch_packets(trace, t, packets, nb_packets, empty,
522                                    offset, false), == 0);
523
524        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
525        /* If a hasher thread is running, empty input queues so we don't lose data */
526        if (trace_has_dedicated_hasher(trace)) {
527                // The hasher has stopped by this point, so the queue shouldn't be filling
528                while(!libtrace_ringbuffer_is_empty(&t->rbuffer) || t->format_data) {
529                        int ret = trace->pread(trace, t, &packet, 1);
530                        if (ret == 1) {
531                                if (packet->error > 0) {
532                                        store_first_packet(trace, packet, t);
533                                }
534                                ASSERT_RET(dispatch_packet(trace, t, &packet, false), == 0);
535                                if (packet == NULL)
536                                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
537                        } else if (ret != READ_MESSAGE) {
538                                /* Ignore messages we pick these up next loop */
539                                assert (ret == READ_EOF || ret == READ_ERROR);
540                                /* Verify no packets are remaining */
541                                /* TODO refactor this sanity check out!! */
542                                while (!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
543                                        ASSERT_RET(trace->pread(trace, t, &packet, 1), <= 0);
544                                        // No packets after this should have any data in them
545                                        assert(packet->error <= 0);
546                                }
547                                libtrace_ocache_free(&trace->packet_freelist, (void **) &packet, 1, 1);
548                                return -1;
549                        }
550                }
551        }
552        libtrace_ocache_free(&trace->packet_freelist, (void **) &packet, 1, 1);
553
554        /* Now we do the actual pause, this returns when we resumed */
555        trace_thread_pause(trace, t);
556        send_message(trace, t, MESSAGE_RESUMING, gen_zero, t);
557        return 1;
558}
559
560/**
561 * The is the entry point for our packet processing threads.
562 */
563static void* perpkt_threads_entry(void *data) {
564        libtrace_t *trace = (libtrace_t *)data;
565        libtrace_thread_t *t;
566        libtrace_message_t message = {0};
567        libtrace_packet_t *packets[trace->config.burst_size];
568        size_t i;
569        //int ret;
570        /* The current reading position into the packets */
571        int offset = 0;
572        /* The number of packets last read */
573        int nb_packets = 0;
574        /* The offset to the first NULL packet upto offset */
575        int empty = 0;
576
577        /* Wait until trace_pstart has been completed */
578        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
579        t = get_thread_table(trace);
580        assert(t);
581        if (trace->state == STATE_ERROR) {
582                thread_change_state(trace, t, THREAD_FINISHED, false);
583                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
584                pthread_exit(NULL);
585        }
586        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
587
588        if (trace->format->pregister_thread) {
589                trace->format->pregister_thread(trace, t, !trace_has_dedicated_hasher(trace));
590        }
591
592        /* Fill our buffer with empty packets */
593        memset(&packets, 0, sizeof(void*) * trace->config.burst_size);
594        libtrace_ocache_alloc(&trace->packet_freelist, (void **) packets,
595                              trace->config.burst_size,
596                              trace->config.burst_size);
597
598        /* ~~~~~~~~~~~ Setup complete now we loop ~~~~~~~~~~~~~~~ */
599
600        /* Let the per_packet function know we have started */
601        send_message(trace, t, MESSAGE_STARTING, gen_zero, t);
602        send_message(trace, t, MESSAGE_RESUMING, gen_zero, t);
603
604        for (;;) {
605
606                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
607                        int ret;
608                        switch (message.code) {
609                                case MESSAGE_DO_PAUSE: // This is internal
610                                        ret = trace_perpkt_thread_pause(trace, t,
611                                              packets, nb_packets, &empty, &offset);
612                                        if (ret == READ_EOF) {
613                                                goto eof;
614                                        } else if (ret == READ_ERROR) {
615                                                goto error;
616                                        }
617                                        assert(ret == 1);
618                                        continue;
619                                case MESSAGE_DO_STOP: // This is internal
620                                        goto eof;
621                        }
622                        (*trace->per_msg)(trace, t, message.code, message.data, message.sender);
623                        (*trace->per_msg)(trace, t, message.code, message.data, message.sender);
624                        /* Continue and the empty messages out before packets */
625                        continue;
626                }
627
628
629                /* Do we need to read a new set of packets MOST LIKELY we do */
630                if (offset == nb_packets) {
631                        /* Refill the packet buffer */
632                        if (empty != nb_packets) {
633                                // Refill the empty packets
634                                libtrace_ocache_alloc(&trace->packet_freelist,
635                                                      (void **) &packets[empty],
636                                                      nb_packets - empty,
637                                                      nb_packets - empty);
638                        }
639                        if (!trace->pread) {
640                                assert(packets[0]);
641                                nb_packets = trace_read_packet(trace, packets[0]);
642                                packets[0]->error = nb_packets;
643                                if (nb_packets > 0)
644                                        nb_packets = 1;
645                        } else {
646                                nb_packets = trace->pread(trace, t, packets, trace->config.burst_size);
647                        }
648                        offset = 0;
649                        empty = 0;
650                }
651
652                /* Handle error/message cases */
653                if (nb_packets > 0) {
654                        /* Store the first packet */
655                        if (packets[0]->error > 0) {
656                                store_first_packet(trace, packets[0], t);
657                        }
658                        dispatch_packets(trace, t, packets, nb_packets, &empty,
659                                         &offset, trace->tracetime);
660                } else {
661                        switch (nb_packets) {
662                        case READ_EOF:
663                                goto eof;
664                        case READ_ERROR:
665                                goto error;
666                        case READ_MESSAGE:
667                                nb_packets = 0;
668                                continue;
669                        default:
670                                fprintf(stderr, "Unexpected error %d!!\n", nb_packets);
671                                goto error;
672                        }
673                }
674
675        }
676
677error:
678        message.code = MESSAGE_DO_STOP;
679        message.sender = t;
680        message.data.uint64 = 0;
681        trace_message_perpkts(trace, &message);
682eof:
683        /* ~~~~~~~~~~~~~~ Trace is finished do tear down ~~~~~~~~~~~~~~~~~~~~~ */
684
685        // Let the per_packet function know we have stopped
686        send_message(trace, t, MESSAGE_PAUSING, gen_zero, t);
687        send_message(trace, t, MESSAGE_STOPPING, gen_zero, t);
688
689        // Free any remaining packets
690        for (i = 0; i < trace->config.burst_size; i++) {
691                if (packets[i]) {
692                        libtrace_ocache_free(&trace->packet_freelist, (void **) &packets[i], 1, 1);
693                        packets[i] = NULL;
694                }
695        }
696
697        thread_change_state(trace, t, THREAD_FINISHED, true);
698
699        /* Make sure the reporter sees we have finished */
700        if (trace_has_reporter(trace))
701                trace_post_reporter(trace);
702
703        // Release all ocache memory before unregistering with the format
704        // because this might(it does in DPDK) unlink the formats mempool
705        // causing destroy/finish packet to fail.
706        libtrace_ocache_unregister_thread(&trace->packet_freelist);
707        if (trace->format->punregister_thread) {
708                trace->format->punregister_thread(trace, t);
709        }
710        print_memory_stats();
711
712        pthread_exit(NULL);
713}
714
715/**
716 * The start point for our single threaded hasher thread, this will read
717 * and hash a packet from a data source and queue it against the correct
718 * core to process it.
719 */
720static void* hasher_entry(void *data) {
721        libtrace_t *trace = (libtrace_t *)data;
722        libtrace_thread_t * t;
723        int i;
724        libtrace_packet_t * packet;
725        libtrace_message_t message = {0};
726        int pkt_skipped = 0;
727
728        assert(trace_has_dedicated_hasher(trace));
729        /* Wait until all threads are started and objects are initialised (ring buffers) */
730        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
731        t = &trace->hasher_thread;
732        assert(t->type == THREAD_HASHER && pthread_equal(pthread_self(), t->tid));
733        if (trace->state == STATE_ERROR) {
734                thread_change_state(trace, t, THREAD_FINISHED, false);
735                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
736                pthread_exit(NULL);
737        }
738        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
739
740        /* We are reading but it is not the parallel API */
741        if (trace->format->pregister_thread) {
742                trace->format->pregister_thread(trace, t, true);
743        }
744
745        /* Read all packets in then hash and queue against the correct thread */
746        while (1) {
747                int thread;
748                if (!pkt_skipped)
749                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
750                assert(packet);
751
752                if (libtrace_halt) {
753                        packet->error = 0;
754                        break;
755                }
756
757                // Check for messages that we expect MESSAGE_DO_PAUSE, (internal messages only)
758                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
759                        switch(message.code) {
760                                case MESSAGE_DO_PAUSE:
761                                        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
762                                        thread_change_state(trace, t, THREAD_PAUSED, false);
763                                        pthread_cond_broadcast(&trace->perpkt_cond);
764                                        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
765                                                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
766                                        }
767                                        thread_change_state(trace, t, THREAD_RUNNING, false);
768                                        pthread_cond_broadcast(&trace->perpkt_cond);
769                                        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
770                                        break;
771                                case MESSAGE_DO_STOP:
772                                        assert(trace->started == false);
773                                        assert(trace->state == STATE_FINISHED);
774                                        /* Mark the current packet as EOF */
775                                        packet->error = 0;
776                                        break;
777                                default:
778                                        fprintf(stderr, "Hasher thread didn't expect message code=%d\n", message.code);
779                        }
780                        pkt_skipped = 1;
781                        continue;
782                }
783
784                if ((packet->error = trace_read_packet(trace, packet)) <1) {
785                        break; /* We are EOF or error'd either way we stop  */
786                }
787
788                /* We are guaranteed to have a hash function i.e. != NULL */
789                trace_packet_set_hash(packet, (*trace->hasher)(packet, trace->hasher_data));
790                thread = trace_packet_get_hash(packet) % trace->perpkt_thread_count;
791                /* Blocking write to the correct queue - I'm the only writer */
792                if (trace->perpkt_threads[thread].state != THREAD_FINISHED) {
793                        uint64_t order = trace_packet_get_order(packet);
794                        libtrace_ringbuffer_write(&trace->perpkt_threads[thread].rbuffer, packet);
795                        if (trace->config.tick_count && order % trace->config.tick_count == 0) {
796                                // Write ticks to everyone else
797                                libtrace_packet_t * pkts[trace->perpkt_thread_count];
798                                memset(pkts, 0, sizeof(void *) * trace->perpkt_thread_count);
799                                libtrace_ocache_alloc(&trace->packet_freelist, (void **) pkts, trace->perpkt_thread_count, trace->perpkt_thread_count);
800                                for (i = 0; i < trace->perpkt_thread_count; i++) {
801                                        pkts[i]->error = READ_TICK;
802                                        trace_packet_set_order(pkts[i], order);
803                                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, pkts[i]);
804                                }
805                        }
806                        pkt_skipped = 0;
807                } else {
808                        assert(!"Dropping a packet!!");
809                        pkt_skipped = 1; // Reuse that packet no one read it
810                }
811        }
812
813        /* Broadcast our last failed read to all threads */
814        for (i = 0; i < trace->perpkt_thread_count; i++) {
815                libtrace_packet_t * bcast;
816                if (i == trace->perpkt_thread_count - 1) {
817                        bcast = packet;
818                } else {
819                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &bcast, 1, 1);
820                        bcast->error = packet->error;
821                }
822                ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
823                if (trace->perpkt_threads[i].state != THREAD_FINISHED) {
824                        // Unlock early otherwise we could deadlock
825                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, bcast);
826                }
827                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
828        }
829
830        // We don't need to free the packet
831        thread_change_state(trace, t, THREAD_FINISHED, true);
832
833        libtrace_ocache_unregister_thread(&trace->packet_freelist);
834        if (trace->format->punregister_thread) {
835                trace->format->punregister_thread(trace, t);
836        }
837        print_memory_stats();
838
839        // TODO remove from TTABLE t sometime
840        pthread_exit(NULL);
841}
842
843/* Our simplest case when a thread becomes ready it can obtain an exclusive
844 * lock to read packets from the underlying trace.
845 */
846static int trace_pread_packet_first_in_first_served(libtrace_t *libtrace,
847                                                    libtrace_thread_t *t,
848                                                    libtrace_packet_t *packets[],
849                                                    size_t nb_packets) {
850        size_t i = 0;
851        //bool tick_hit = false;
852
853        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
854        /* Read nb_packets */
855        for (i = 0; i < nb_packets; ++i) {
856                if (libtrace_halt) {
857                        break;
858                }
859                packets[i]->error = trace_read_packet(libtrace, packets[i]);
860
861                if (packets[i]->error <= 0) {
862                        /* We'll catch this next time if we have already got packets */
863                        if ( i==0 ) {
864                                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
865                                return packets[i]->error;
866                        } else {
867                                break;
868                        }
869                }
870                /*
871                if (libtrace->config.tick_count && trace_packet_get_order(packets[i]) % libtrace->config.tick_count == 0) {
872                        tick_hit = true;
873                }*/
874        }
875        // Doing this inside the lock ensures the first packet is always
876        // recorded first
877        if (packets[0]->error > 0) {
878                store_first_packet(libtrace, packets[0], t);
879        }
880        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
881        /* XXX TODO this needs to be inband with packets, or we don't bother in this case
882        if (tick_hit) {
883                libtrace_message_t tick;
884                tick.additional.uint64 = trace_packet_get_order(packets[i]);
885                tick.code = MESSAGE_TICK;
886                trace_send_message_to_perpkts(libtrace, &tick);
887        } */
888        return i;
889}
890
891/**
892 * For the case that we have a dedicated hasher thread
893 * 1. We read a packet from our buffer
894 * 2. Move that into the packet provided (packet)
895 */
896inline static int trace_pread_packet_hasher_thread(libtrace_t *libtrace,
897                                                   libtrace_thread_t *t,
898                                                   libtrace_packet_t *packets[],
899                                                   size_t nb_packets) {
900        size_t i;
901
902        /* We store the last error message here */
903        if (t->format_data) {
904                return ((libtrace_packet_t *)t->format_data)->error;
905        }
906
907        // Always grab at least one
908        if (packets[0]) // Recycle the old get the new
909                libtrace_ocache_free(&libtrace->packet_freelist, (void **) packets, 1, 1);
910        packets[0] = libtrace_ringbuffer_read(&t->rbuffer);
911
912        if (packets[0]->error <= 0 && packets[0]->error != READ_TICK) {
913                return packets[0]->error;
914        }
915
916        for (i = 1; i < nb_packets; i++) {
917                if (packets[i]) // Recycle the old get the new
918                        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packets[i], 1, 1);
919                if (!libtrace_ringbuffer_try_read(&t->rbuffer, (void **) &packets[i])) {
920                        packets[i] = NULL;
921                        break;
922                }
923
924                /* We will return an error or EOF the next time around */
925                if (packets[i]->error <= 0 && packets[0]->error != READ_TICK) {
926                        /* The message case will be checked automatically -
927                           However other cases like EOF and error will only be
928                           sent once*/
929                        if (packets[i]->error != READ_MESSAGE) {
930                                assert(t->format_data == NULL);
931                                t->format_data = packets[i];
932                        }
933                        break;
934                }
935        }
936
937        return i;
938}
939
940/**
941 * For the first packet of each queue we keep a copy and note the system
942 * time it was received at.
943 *
944 * This is used for finding the first packet when playing back a trace
945 * in trace time. And can be used by real time applications to print
946 * results out every XXX seconds.
947 */
948void store_first_packet(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t)
949{
950        if (!t->recorded_first) {
951                libtrace_message_t mesg = {0};
952                struct timeval tv;
953                libtrace_packet_t * dup;
954
955                /* We mark system time against a copy of the packet */
956                gettimeofday(&tv, NULL);
957                dup = trace_copy_packet(packet);
958
959                ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
960                libtrace->first_packets.packets[t->perpkt_num].packet = dup;
961                memcpy(&libtrace->first_packets.packets[t->perpkt_num].tv, &tv, sizeof(tv));
962                libtrace->first_packets.count++;
963
964                /* Now update the first */
965                if (libtrace->first_packets.count == 1) {
966                        /* We the first entry hence also the first known packet */
967                        libtrace->first_packets.first = t->perpkt_num;
968                } else {
969                        /* Check if we are newer than the previous 'first' packet */
970                        size_t first = libtrace->first_packets.first;
971                        if (trace_get_seconds(dup) <
972                                trace_get_seconds(libtrace->first_packets.packets[first].packet))
973                                libtrace->first_packets.first = t->perpkt_num;
974                }
975                ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
976
977                mesg.code = MESSAGE_FIRST_PACKET;
978                trace_message_reporter(libtrace, &mesg);
979                trace_message_perpkts(libtrace, &mesg);
980                t->recorded_first = true;
981        }
982}
983
984DLLEXPORT int trace_get_first_packet(libtrace_t *libtrace,
985                                     libtrace_thread_t *t,
986                                     const libtrace_packet_t **packet,
987                                     const struct timeval **tv)
988{
989        void * tmp;
990        int ret = 0;
991
992        if (t) {
993                if (t->type != THREAD_PERPKT || t->trace != libtrace)
994                        return -1;
995        }
996
997        /* Throw away these which we don't use */
998        if (!packet)
999                packet = (const libtrace_packet_t **) &tmp;
1000        if (!tv)
1001                tv = (const struct timeval **) &tmp;
1002
1003        ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
1004        if (t) {
1005                /* Get the requested thread */
1006                *packet = libtrace->first_packets.packets[t->perpkt_num].packet;
1007                *tv = &libtrace->first_packets.packets[t->perpkt_num].tv;
1008        } else if (libtrace->first_packets.count) {
1009                /* Get the first packet across all threads */
1010                *packet = libtrace->first_packets.packets[libtrace->first_packets.first].packet;
1011                *tv = &libtrace->first_packets.packets[libtrace->first_packets.first].tv;
1012                if (libtrace->first_packets.count == (size_t) libtrace->perpkt_thread_count) {
1013                        ret = 1;
1014                } else {
1015                        struct timeval curr_tv;
1016                        // If a second has passed since the first entry we will assume this is the very first packet
1017                        gettimeofday(&curr_tv, NULL);
1018                        if (curr_tv.tv_sec > (*tv)->tv_sec) {
1019                                if(curr_tv.tv_usec > (*tv)->tv_usec || curr_tv.tv_sec - (*tv)->tv_sec > 1) {
1020                                        ret = 1;
1021                                }
1022                        }
1023                }
1024        } else {
1025                *packet = NULL;
1026                *tv = NULL;
1027        }
1028        ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
1029        return ret;
1030}
1031
1032
1033DLLEXPORT uint64_t tv_to_usec(const struct timeval *tv)
1034{
1035        return (uint64_t) tv->tv_sec*1000000ull + (uint64_t) tv->tv_usec;
1036}
1037
1038inline static struct timeval usec_to_tv(uint64_t usec)
1039{
1040        struct timeval tv;
1041        tv.tv_sec = usec / 1000000;
1042        tv.tv_usec = usec % 1000000;
1043        return tv;
1044}
1045
1046/** Similar to delay_tracetime but send messages to all threads periodically */
1047static void* reporter_entry(void *data) {
1048        libtrace_message_t message = {0};
1049        libtrace_t *trace = (libtrace_t *)data;
1050        libtrace_thread_t *t = &trace->reporter_thread;
1051
1052        /* Wait until all threads are started */
1053        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
1054        if (trace->state == STATE_ERROR) {
1055                thread_change_state(trace, t, THREAD_FINISHED, false);
1056                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1057                pthread_exit(NULL);
1058        }
1059        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1060
1061        if (trace->format->pregister_thread) {
1062                trace->format->pregister_thread(trace, t, false);
1063        }
1064
1065        (*trace->reporter)(trace, MESSAGE_STARTING, (libtrace_generic_t) {0}, t);
1066        (*trace->reporter)(trace, MESSAGE_RESUMING, (libtrace_generic_t) {0}, t);
1067
1068        while (!trace_has_finished(trace)) {
1069                if (trace->config.reporter_polling) {
1070                        if (libtrace_message_queue_try_get(&t->messages, &message) == LIBTRACE_MQ_FAILED)
1071                                message.code = MESSAGE_POST_REPORTER;
1072                } else {
1073                        libtrace_message_queue_get(&t->messages, &message);
1074                }
1075                switch (message.code) {
1076                        // Check for results
1077                        case MESSAGE_POST_REPORTER:
1078                                trace->combiner.read(trace, &trace->combiner);
1079                                break;
1080                        case MESSAGE_DO_PAUSE:
1081                                assert(trace->combiner.pause);
1082                                trace->combiner.pause(trace, &trace->combiner);
1083                                (*trace->reporter)(trace, MESSAGE_PAUSING, (libtrace_generic_t) {0}, t);
1084                                trace_thread_pause(trace, t);
1085                                (*trace->reporter)(trace, MESSAGE_RESUMING, (libtrace_generic_t) {0}, t);
1086                                break;
1087                default:
1088                        (*trace->reporter)(trace, message.code, message.data, message.sender);
1089                }
1090        }
1091
1092        // Flush out whats left now all our threads have finished
1093        trace->combiner.read_final(trace, &trace->combiner);
1094
1095        // GOODBYE
1096        (*trace->reporter)(trace, MESSAGE_PAUSING, (libtrace_generic_t) {0}, t);
1097        (*trace->reporter)(trace, MESSAGE_STOPPING, (libtrace_generic_t) {0}, t);
1098
1099        thread_change_state(trace, &trace->reporter_thread, THREAD_FINISHED, true);
1100        print_memory_stats();
1101        return NULL;
1102}
1103
1104/** Similar to delay_tracetime but send messages to all threads periodically */
1105static void* keepalive_entry(void *data) {
1106        struct timeval prev, next;
1107        libtrace_message_t message = {0};
1108        libtrace_t *trace = (libtrace_t *)data;
1109        uint64_t next_release;
1110        libtrace_thread_t *t = &trace->keepalive_thread;
1111
1112        /* Wait until all threads are started */
1113        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
1114        if (trace->state == STATE_ERROR) {
1115                thread_change_state(trace, t, THREAD_FINISHED, false);
1116                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1117                pthread_exit(NULL);
1118        }
1119        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1120
1121        gettimeofday(&prev, NULL);
1122        message.code = MESSAGE_TICK_INTERVAL;
1123
1124        while (trace->state != STATE_FINISHED) {
1125                fd_set rfds;
1126                next_release = tv_to_usec(&prev) + (trace->config.tick_interval * 1000);
1127                gettimeofday(&next, NULL);
1128                if (next_release > tv_to_usec(&next)) {
1129                        next = usec_to_tv(next_release - tv_to_usec(&next));
1130                        // Wait for timeout or a message
1131                        FD_ZERO(&rfds);
1132                        FD_SET(libtrace_message_queue_get_fd(&t->messages), &rfds);
1133                        if (select(libtrace_message_queue_get_fd(&t->messages)+1, &rfds, NULL, NULL, &next) == 1) {
1134                                libtrace_message_t msg;
1135                                libtrace_message_queue_get(&t->messages, &msg);
1136                                assert(msg.code == MESSAGE_DO_STOP);
1137                                goto done;
1138                        }
1139                }
1140                prev = usec_to_tv(next_release);
1141                if (trace->state == STATE_RUNNING) {
1142                        message.data.uint64 = ((((uint64_t)prev.tv_sec) << 32) +
1143                                               (((uint64_t)prev.tv_usec << 32)/1000000));
1144                        trace_message_perpkts(trace, &message);
1145                }
1146        }
1147done:
1148
1149        thread_change_state(trace, t, THREAD_FINISHED, true);
1150        return NULL;
1151}
1152
1153/**
1154 * Delays a packets playback so the playback will be in trace time.
1155 * This may break early if a message becomes available.
1156 *
1157 * Requires the first packet for this thread to be received.
1158 * @param libtrace  The trace
1159 * @param packet    The packet to delay
1160 * @param t         The current thread
1161 * @return Either READ_MESSAGE(-2) or 0 is successful
1162 */
1163static inline int delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t) {
1164        struct timeval curr_tv, pkt_tv;
1165        uint64_t next_release = t->tracetime_offset_usec;
1166        uint64_t curr_usec;
1167
1168        if (!t->tracetime_offset_usec) {
1169                const libtrace_packet_t *first_pkt;
1170                const struct timeval *sys_tv;
1171                int64_t initial_offset;
1172                int stable = trace_get_first_packet(libtrace, NULL, &first_pkt, &sys_tv);
1173                assert(first_pkt);
1174                pkt_tv = trace_get_timeval(first_pkt);
1175                initial_offset = (int64_t)tv_to_usec(sys_tv) - (int64_t)tv_to_usec(&pkt_tv);
1176                /* In the unlikely case offset is 0, change it to 1 */
1177                if (stable)
1178                        t->tracetime_offset_usec = initial_offset ? initial_offset: 1;
1179                next_release = initial_offset;
1180        }
1181        /* next_release == offset */
1182        pkt_tv = trace_get_timeval(packet);
1183        next_release += tv_to_usec(&pkt_tv);
1184        gettimeofday(&curr_tv, NULL);
1185        curr_usec = tv_to_usec(&curr_tv);
1186        if (next_release > curr_usec) {
1187                int ret, mesg_fd = libtrace_message_queue_get_fd(&t->messages);
1188                struct timeval delay_tv = usec_to_tv(next_release-curr_usec);
1189                fd_set rfds;
1190                FD_ZERO(&rfds);
1191                FD_SET(mesg_fd, &rfds);
1192                // We need to wait
1193                ret = select(mesg_fd+1, &rfds, NULL, NULL, &delay_tv);
1194                if (ret == 0) {
1195                        return 0;
1196                } else if (ret > 0) {
1197                        return READ_MESSAGE;
1198                } else {
1199                        assert(!"trace_delay_packet: Unexpected return from select");
1200                }
1201        }
1202        return 0;
1203}
1204
1205/* Discards packets that don't match the filter.
1206 * Discarded packets are emptied and then moved to the end of the packet list.
1207 *
1208 * @param trace       The trace format, containing the filter
1209 * @param packets     An array of packets
1210 * @param nb_packets  The number of valid items in packets
1211 *
1212 * @return The number of packets that passed the filter, which are moved to
1213 *          the start of the packets array
1214 */
1215static inline size_t filter_packets(libtrace_t *trace,
1216                                    libtrace_packet_t **packets,
1217                                    size_t nb_packets) {
1218        size_t offset = 0;
1219        size_t i;
1220
1221        for (i = 0; i < nb_packets; ++i) {
1222                // The filter needs the trace attached to receive the link type
1223                packets[i]->trace = trace;
1224                if (trace_apply_filter(trace->filter, packets[i])) {
1225                        libtrace_packet_t *tmp;
1226                        tmp = packets[offset];
1227                        packets[offset++] = packets[i];
1228                        packets[i] = tmp;
1229                } else {
1230                        trace_fin_packet(packets[i]);
1231                }
1232        }
1233
1234        return offset;
1235}
1236
1237/* Read a batch of packets from the trace into a buffer.
1238 * Note that this function will block until a packet is read (or EOF is reached)
1239 *
1240 * @param libtrace    The trace
1241 * @param t           The thread
1242 * @param packets     An array of packets
1243 * @param nb_packets  The number of empty packets in packets
1244 * @return The number of packets read, 0 on EOF (or an error/message -1,-2).
1245 */
1246static int trace_pread_packet_wrapper(libtrace_t *libtrace,
1247                                      libtrace_thread_t *t,
1248                                      libtrace_packet_t *packets[],
1249                                      size_t nb_packets) {
1250        int i;
1251        assert(nb_packets);
1252        assert(libtrace && "libtrace is NULL in trace_read_packet()");
1253        if (trace_is_err(libtrace))
1254                return -1;
1255        if (!libtrace->started) {
1256                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
1257                              "You must call libtrace_start() before trace_read_packet()\n");
1258                return -1;
1259        }
1260
1261        if (libtrace->format->pread_packets) {
1262                int ret;
1263                for (i = 0; i < (int) nb_packets; ++i) {
1264                        assert(i[packets]);
1265                        if (!(packets[i]->buf_control==TRACE_CTRL_PACKET ||
1266                              packets[i]->buf_control==TRACE_CTRL_EXTERNAL)) {
1267                                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,
1268                                              "Packet passed to trace_read_packet() is invalid\n");
1269                                return -1;
1270                        }
1271                }
1272                do {
1273                        ret=libtrace->format->pread_packets(libtrace, t,
1274                                                            packets,
1275                                                            nb_packets);
1276                        /* Error, EOF or message? */
1277                        if (ret <= 0) {
1278                                return ret;
1279                        }
1280
1281                        if (libtrace->filter) {
1282                                int remaining;
1283                                remaining = filter_packets(libtrace,
1284                                                           packets, ret);
1285                                t->filtered_packets += ret - remaining;
1286                                ret = remaining;
1287                        }
1288                        for (i = 0; i < ret; ++i) {
1289                                /* We do not mark the packet against the trace,
1290                                 * before hand or after. After breaks DAG meta
1291                                 * packets and before is inefficient */
1292                                //packets[i]->trace = libtrace;
1293                                /* TODO IN FORMAT?? Like traditional libtrace */
1294                                if (libtrace->snaplen>0)
1295                                        trace_set_capture_length(packets[i],
1296                                                        libtrace->snaplen);
1297                                trace_packet_set_order(packets[i], trace_get_erf_timestamp(packets[i]));
1298                        }
1299                } while(ret == 0);
1300                return ret;
1301        }
1302        trace_set_err(libtrace, TRACE_ERR_UNSUPPORTED,
1303                      "This format does not support reading packets\n");
1304        return ~0U;
1305}
1306
1307/* Restarts a parallel trace, this is called from trace_pstart.
1308 * The libtrace lock is held upon calling this function.
1309 * Typically with a parallel trace the threads are not
1310 * killed rather.
1311 */
1312static int trace_prestart(libtrace_t * libtrace, void *global_blob,
1313                          fn_cb_msg per_msg, fn_reporter reporter) {
1314        int i, err = 0;
1315        if (libtrace->state != STATE_PAUSED) {
1316                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
1317                        "trace(%s) is not currently paused",
1318                              libtrace->uridata);
1319                return -1;
1320        }
1321
1322        assert(libtrace_parallel);
1323        assert(!libtrace->perpkt_thread_states[THREAD_RUNNING]);
1324
1325        /* Reset first packets */
1326        pthread_spin_lock(&libtrace->first_packets.lock);
1327        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1328                assert(!!libtrace->perpkt_threads[i].recorded_first == !!libtrace->first_packets.packets[i].packet);
1329                if (libtrace->first_packets.packets[i].packet) {
1330                        trace_destroy_packet(libtrace->first_packets.packets[i].packet);
1331                        libtrace->first_packets.packets[i].packet = NULL;
1332                        libtrace->first_packets.packets[i].tv.tv_sec = 0;
1333                        libtrace->first_packets.packets[i].tv.tv_usec = 0;
1334                        libtrace->first_packets.count--;
1335                        libtrace->perpkt_threads[i].recorded_first = false;
1336                }
1337        }
1338        assert(libtrace->first_packets.count == 0);
1339        libtrace->first_packets.first = 0;
1340        pthread_spin_unlock(&libtrace->first_packets.lock);
1341
1342        /* Reset delay */
1343        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1344                libtrace->perpkt_threads[i].tracetime_offset_usec = 0;
1345        }
1346
1347        /* Reset statistics */
1348        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1349                libtrace->perpkt_threads[i].accepted_packets = 0;
1350                libtrace->perpkt_threads[i].filtered_packets = 0;
1351        }
1352        libtrace->accepted_packets = 0;
1353        libtrace->filtered_packets = 0;
1354
1355        /* Update functions if requested */
1356        if (per_msg)
1357                libtrace->per_msg = per_msg;
1358        assert(libtrace->per_msg);
1359        if (reporter)
1360                libtrace->reporter = reporter;
1361        if(global_blob)
1362                libtrace->global_blob = global_blob;
1363
1364        if (libtrace->perpkt_thread_count > 1 &&
1365            trace_supports_parallel(libtrace) &&
1366            !trace_has_dedicated_hasher(libtrace)) {
1367                err = libtrace->format->pstart_input(libtrace);
1368        } else {
1369                if (libtrace->format->start_input) {
1370                        err = libtrace->format->start_input(libtrace);
1371                }
1372        }
1373
1374        if (err == 0) {
1375                libtrace->started = true;
1376                libtrace_change_state(libtrace, STATE_RUNNING, false);
1377        }
1378        return err;
1379}
1380
1381/**
1382 * @return the number of CPU cores on the machine. -1 if unknown.
1383 */
1384SIMPLE_FUNCTION static int get_nb_cores() {
1385        int numCPU;
1386#ifdef _SC_NPROCESSORS_ONLN
1387        /* Most systems do this now */
1388        numCPU = sysconf(_SC_NPROCESSORS_ONLN);
1389
1390#else
1391        int mib[] = {CTL_HW, HW_AVAILCPU};
1392        size_t len = sizeof(numCPU);
1393
1394        /* get the number of CPUs from the system */
1395        sysctl(mib, 2, &numCPU, &len, NULL, 0);
1396#endif
1397        return numCPU <= 0 ? 1 : numCPU;
1398}
1399
1400/**
1401 * Verifies the configuration and sets default values for any values not
1402 * specified by the user.
1403 */
1404static void verify_configuration(libtrace_t *libtrace) {
1405
1406        if (libtrace->config.hasher_queue_size <= 0)
1407                libtrace->config.hasher_queue_size = 1000;
1408
1409        if (libtrace->config.perpkt_threads <= 0) {
1410                libtrace->perpkt_thread_count = get_nb_cores();
1411                if (libtrace->perpkt_thread_count <= 0)
1412                        // Lets just use one
1413                        libtrace->perpkt_thread_count = 1;
1414        } else {
1415                libtrace->perpkt_thread_count = libtrace->config.perpkt_threads;
1416        }
1417
1418        if (libtrace->config.reporter_thold <= 0)
1419                libtrace->config.reporter_thold = 100;
1420        if (libtrace->config.burst_size <= 0)
1421                libtrace->config.burst_size = 10;
1422        if (libtrace->config.thread_cache_size <= 0)
1423                libtrace->config.thread_cache_size = 20;
1424        if (libtrace->config.cache_size <= 0)
1425                libtrace->config.cache_size = (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count;
1426
1427        if (libtrace->config.cache_size <
1428                (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count)
1429                fprintf(stderr, "WARNING deadlocks may occur and extra memory allocating buffer sizes (packet_freelist_size) mismatched\n");
1430
1431        if (libtrace->combiner.initialise == NULL && libtrace->combiner.publish == NULL)
1432                libtrace->combiner = combiner_unordered;
1433
1434        /* Figure out if we are using a dedicated hasher thread? */
1435        if (libtrace->hasher && libtrace->perpkt_thread_count > 1) {
1436                libtrace->hasher_thread.type = THREAD_HASHER;
1437        }
1438}
1439
1440/**
1441 * Starts a libtrace_thread, including allocating memory for messaging.
1442 * Threads are expected to wait until the libtrace look is released.
1443 * Hence why we don't init structures until later.
1444 *
1445 * @param trace The trace the thread is associated with
1446 * @param t The thread that is filled when the thread is started
1447 * @param type The type of thread
1448 * @param start_routine The entry location of the thread
1449 * @param perpkt_num The perpkt thread number (should be set -1 if not perpkt)
1450 * @param name For debugging purposes set the threads name (Optional)
1451 *
1452 * @return 0 on success or -1 upon error in which case the libtrace error is set.
1453 *         In this situation the thread structure is zeroed.
1454 */
1455static int trace_start_thread(libtrace_t *trace,
1456                       libtrace_thread_t *t,
1457                       enum thread_types type,
1458                       void *(*start_routine) (void *),
1459                       int perpkt_num,
1460                       const char *name) {
1461#ifdef __linux__
1462        pthread_attr_t attrib;
1463        cpu_set_t cpus;
1464#endif
1465        int ret, i;
1466        assert(t->type == THREAD_EMPTY);
1467        t->trace = trace;
1468        t->ret = NULL;
1469        t->user_data = NULL;
1470        t->type = type;
1471        t->state = THREAD_RUNNING;
1472
1473#ifdef __linux__
1474        CPU_ZERO(&cpus);
1475        for (i = 0; i < get_nb_cores(); i++)
1476                CPU_SET(i, &cpus);
1477        pthread_attr_init(&attrib);
1478        pthread_attr_setaffinity_np(&attrib, sizeof(cpus), &cpus);
1479        ret = pthread_create(&t->tid, &attrib, start_routine, (void *) trace);
1480        pthread_attr_destroy(&attrib);
1481#else
1482        ret = pthread_create(&t->tid, NULL, start_routine, (void *) trace);
1483#endif
1484        if (ret != 0) {
1485                libtrace_zero_thread(t);
1486                trace_set_err(trace, ret, "Failed to create a thread of type=%d\n", type);
1487                return -1;
1488        }
1489        libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
1490        if (trace_has_dedicated_hasher(trace) && type == THREAD_PERPKT) {
1491                libtrace_ringbuffer_init(&t->rbuffer,
1492                                         trace->config.hasher_queue_size,
1493                                         trace->config.hasher_polling?
1494                                                 LIBTRACE_RINGBUFFER_POLLING:
1495                                                 LIBTRACE_RINGBUFFER_BLOCKING);
1496        }
1497#if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__)
1498        if(name)
1499                pthread_setname_np(t->tid, name);
1500#endif
1501        t->perpkt_num = perpkt_num;
1502        return 0;
1503}
1504
1505/** Parses the environment variable LIBTRACE_CONF into the supplied
1506 * configuration structure.
1507 *
1508 * @param[in,out] libtrace The trace from which we determine the URI and set
1509 * the configuration.
1510 *
1511 * We search for 3 environment variables and apply them to the config in the
1512 * following order. Such that the first has the lowest priority.
1513 *
1514 * 1. LIBTRACE_CONF, The global environment configuration
1515 * 2. LIBTRACE_CONF_<FORMAT>, Applied to a given format
1516 * 3. LIBTRACE_CONF_<FORMAT_URI>, Applied the specified trace
1517 *
1518 * E.g.
1519 * - int:eth0 would match LIBTRACE_CONF, LIBTRACE_CONF_INT, LIBTRACE_CONF_INT_ETH0
1520 * - dag:/dev/dag0,0 would match LIBTRACE_CONF, LIBTRACE_CONF_DAG, LIBTRACE_CONF_DAG__DEV_DAG0_0
1521 * - test.erf would match LIBTRACE_CONF, LIBTRACE_CONF_ERF, LIBTRACE_CONF_ERF_TEST_ERF
1522 *
1523 * @note All environment variables names MUST only contian
1524 * [A-Z], [0-9] and [_] (underscore) and not start with a number. Any characters
1525 * outside of this range should be captilised if possible or replaced with an
1526 * underscore.
1527 */
1528static void parse_env_config (libtrace_t *libtrace) {
1529        char env_name[1024] = "LIBTRACE_CONF_";
1530        size_t len = strlen(env_name);
1531        size_t mark = 0;
1532        size_t i;
1533        char * env;
1534
1535        /* Make our compound string */
1536        strncpy(&env_name[len], libtrace->format->name, sizeof(env_name) - len);
1537        len += strlen(libtrace->format->name);
1538        strncpy(&env_name[len], ":", sizeof(env_name) - len);
1539        len += 1;
1540        strncpy(&env_name[len], libtrace->uridata, sizeof(env_name) - len);
1541
1542        /* env names are allowed to be A-Z (CAPS) 0-9 and _ */
1543        for (i = 0; env_name[i] != 0; ++i) {
1544                env_name[i] = toupper(env_name[i]);
1545                if(env_name[i] == ':') {
1546                        mark = i;
1547                }
1548                if (!( (env_name[i] >= 'A' && env_name[i] <= 'Z') ||
1549                       (env_name[i] >= '0' && env_name[i] <= '9') )) {
1550                        env_name[i] = '_';
1551                }
1552        }
1553
1554        /* First apply global env settings LIBTRACE_CONF */
1555        env = getenv("LIBTRACE_CONF");
1556        if (env)
1557        {
1558                printf("Got env %s", env);
1559                trace_set_configuration(libtrace, env);
1560        }
1561
1562        /* Then format settings LIBTRACE_CONF_<FORMAT> */
1563        if (mark != 0) {
1564                env_name[mark] = 0;
1565                env = getenv(env_name);
1566                if (env) {
1567                        trace_set_configuration(libtrace, env);
1568                }
1569                env_name[mark] = '_';
1570        }
1571
1572        /* Finally this specific trace LIBTRACE_CONF_<FORMAT_URI> */
1573        env = getenv(env_name);
1574        if (env) {
1575                trace_set_configuration(libtrace, env);
1576        }
1577}
1578
1579DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob,
1580                           fn_cb_msg per_msg, fn_reporter reporter) {
1581        int i;
1582        int ret = -1;
1583        char name[16];
1584        sigset_t sig_before, sig_block_all;
1585        assert(libtrace);
1586
1587        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1588        if (trace_is_err(libtrace)) {
1589                goto cleanup_none;
1590        }
1591
1592        if (libtrace->state == STATE_PAUSED) {
1593                ret = trace_prestart(libtrace, global_blob, per_msg, reporter);
1594                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1595                return ret;
1596        }
1597
1598        if (libtrace->state != STATE_NEW) {
1599                trace_set_err(libtrace, TRACE_ERR_BAD_STATE, "trace_pstart "
1600                              "should be called on a NEW or PAUSED trace but "
1601                              "instead was called from %s",
1602                              get_trace_state_name(libtrace->state));
1603                goto cleanup_none;
1604        }
1605
1606        /* Store the user defined things against the trace */
1607        libtrace->global_blob = global_blob;
1608        libtrace->per_msg = per_msg;
1609        libtrace->reporter = reporter;
1610        /* And zero other fields */
1611        for (i = 0; i < THREAD_STATE_MAX; ++i) {
1612                libtrace->perpkt_thread_states[i] = 0;
1613        }
1614        libtrace->first_packets.first = 0;
1615        libtrace->first_packets.count = 0;
1616        libtrace->first_packets.packets = NULL;
1617        libtrace->perpkt_threads = NULL;
1618        /* Set a global which says we are using a parallel trace. This is
1619         * for backwards compatability due to changes when destroying packets */
1620        libtrace_parallel = 1;
1621
1622        /* Parses configuration passed through environment variables */
1623        parse_env_config(libtrace);
1624        verify_configuration(libtrace);
1625
1626        /* Try start the format - we prefer parallel over single threaded, as
1627         * these formats should support messages better */
1628        if (trace_supports_parallel(libtrace) &&
1629            !trace_has_dedicated_hasher(libtrace)) {
1630                ret = libtrace->format->pstart_input(libtrace);
1631                libtrace->pread = trace_pread_packet_wrapper;
1632        } else {
1633                if (libtrace->format->start_input) {
1634                        ret = libtrace->format->start_input(libtrace);
1635                }
1636                if (libtrace->perpkt_thread_count > 1)
1637                        libtrace->pread = trace_pread_packet_first_in_first_served;
1638                else
1639                        /* Use standard read_packet */
1640                        libtrace->pread = NULL;
1641        }
1642
1643        if (ret != 0) {
1644                goto cleanup_none;
1645        }
1646
1647        /* --- Start all the threads we need --- */
1648        /* Disable signals because it is inherited by the threads we start */
1649        sigemptyset(&sig_block_all);
1650        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_block_all, &sig_before), == 0);
1651
1652        /* If we need a hasher thread start it
1653         * Special Case: If single threaded we don't need a hasher
1654         */
1655        if (trace_has_dedicated_hasher(libtrace)) {
1656                libtrace->hasher_thread.type = THREAD_EMPTY;
1657                ret = trace_start_thread(libtrace, &libtrace->hasher_thread,
1658                                   THREAD_HASHER, hasher_entry, -1,
1659                                   "hasher-thread");
1660                if (ret != 0)
1661                        goto cleanup_started;
1662                libtrace->pread = trace_pread_packet_hasher_thread;
1663        } else {
1664                libtrace->hasher_thread.type = THREAD_EMPTY;
1665        }
1666
1667        /* Start up our perpkt threads */
1668        libtrace->perpkt_threads = calloc(sizeof(libtrace_thread_t),
1669                                          libtrace->perpkt_thread_count);
1670        if (!libtrace->perpkt_threads) {
1671                trace_set_err(libtrace, errno, "trace_pstart "
1672                              "failed to allocate memory.");
1673                goto cleanup_threads;
1674        }
1675        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1676                snprintf(name, sizeof(name), "perpkt-%d", i);
1677                libtrace_zero_thread(&libtrace->perpkt_threads[i]);
1678                ret = trace_start_thread(libtrace, &libtrace->perpkt_threads[i],
1679                                   THREAD_PERPKT, perpkt_threads_entry, i,
1680                                   name);
1681                if (ret != 0)
1682                        goto cleanup_threads;
1683        }
1684
1685        /* Start the reporter thread */
1686        if (reporter) {
1687                if (libtrace->combiner.initialise)
1688                        libtrace->combiner.initialise(libtrace, &libtrace->combiner);
1689                ret = trace_start_thread(libtrace, &libtrace->reporter_thread,
1690                                   THREAD_REPORTER, reporter_entry, -1,
1691                                   "reporter_thread");
1692                if (ret != 0)
1693                        goto cleanup_threads;
1694        }
1695
1696        /* Start the keepalive thread */
1697        if (libtrace->config.tick_interval > 0) {
1698                ret = trace_start_thread(libtrace, &libtrace->keepalive_thread,
1699                                   THREAD_KEEPALIVE, keepalive_entry, -1,
1700                                   "keepalive_thread");
1701                if (ret != 0)
1702                        goto cleanup_threads;
1703        }
1704
1705        /* Init other data structures */
1706        libtrace->perpkt_thread_states[THREAD_RUNNING] = libtrace->perpkt_thread_count;
1707        ASSERT_RET(pthread_spin_init(&libtrace->first_packets.lock, 0), == 0);
1708        libtrace->first_packets.packets = calloc(libtrace->perpkt_thread_count,
1709                                                 sizeof(*libtrace->first_packets.packets));
1710        if (libtrace->first_packets.packets == NULL) {
1711                trace_set_err(libtrace, errno, "trace_pstart "
1712                              "failed to allocate memory.");
1713                goto cleanup_threads;
1714        }
1715
1716        if (libtrace_ocache_init(&libtrace->packet_freelist,
1717                             (void* (*)()) trace_create_packet,
1718                             (void (*)(void *))trace_destroy_packet,
1719                             libtrace->config.thread_cache_size,
1720                             libtrace->config.cache_size * 4,
1721                             libtrace->config.fixed_count) != 0) {
1722                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "trace_pstart "
1723                              "failed to allocate ocache.");
1724                goto cleanup_threads;
1725        }
1726
1727        /* Threads don't start */
1728        libtrace->started = true;
1729        libtrace_change_state(libtrace, STATE_RUNNING, false);
1730
1731        ret = 0;
1732        goto success;
1733cleanup_threads:
1734        if (libtrace->first_packets.packets) {
1735                free(libtrace->first_packets.packets);
1736                libtrace->first_packets.packets = NULL;
1737        }
1738        libtrace_change_state(libtrace, STATE_ERROR, false);
1739        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1740        if (libtrace->hasher_thread.type == THREAD_HASHER) {
1741                pthread_join(libtrace->hasher_thread.tid, NULL);
1742                libtrace_zero_thread(&libtrace->hasher_thread);
1743        }
1744
1745        if (libtrace->perpkt_threads) {
1746                for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1747                        if (libtrace->perpkt_threads[i].type == THREAD_PERPKT) {
1748                                pthread_join(libtrace->perpkt_threads[i].tid, NULL);
1749                                libtrace_zero_thread(&libtrace->perpkt_threads[i]);
1750                        } else break;
1751                }
1752                free(libtrace->perpkt_threads);
1753                libtrace->perpkt_threads = NULL;
1754        }
1755
1756        if (libtrace->reporter_thread.type == THREAD_REPORTER) {
1757                pthread_join(libtrace->reporter_thread.tid, NULL);
1758                libtrace_zero_thread(&libtrace->reporter_thread);
1759        }
1760
1761        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
1762                pthread_join(libtrace->keepalive_thread.tid, NULL);
1763                libtrace_zero_thread(&libtrace->keepalive_thread);
1764        }
1765        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1766        libtrace_change_state(libtrace, STATE_NEW, false);
1767        assert(libtrace->perpkt_thread_states[THREAD_RUNNING] == 0);
1768        libtrace->perpkt_thread_states[THREAD_FINISHED] = 0;
1769cleanup_started:
1770        if (trace_supports_parallel(libtrace) &&
1771            !trace_has_dedicated_hasher(libtrace)
1772            && libtrace->perpkt_thread_count > 1) {
1773                if (libtrace->format->ppause_input)
1774                        libtrace->format->ppause_input(libtrace);
1775        } else {
1776                if (libtrace->format->pause_input)
1777                        libtrace->format->pause_input(libtrace);
1778        }
1779        ret = -1;
1780success:
1781        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_before, NULL), == 0);
1782cleanup_none:
1783        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1784        return ret;
1785}
1786
1787DLLEXPORT int trace_cb_starting(libtrace_t *libtrace, fn_cb_starting handler) {
1788        libtrace->callbacks.message_starting = handler;
1789        return 0;
1790}
1791
1792DLLEXPORT int trace_cb_pausing(libtrace_t *libtrace, fn_cb_dataless handler) {
1793        libtrace->callbacks.message_pausing = handler;
1794        return 0;
1795}
1796
1797DLLEXPORT int trace_cb_resuming(libtrace_t *libtrace, fn_cb_dataless handler) {
1798        libtrace->callbacks.message_resuming = handler;
1799        return 0;
1800}
1801
1802DLLEXPORT int trace_cb_stopping(libtrace_t *libtrace, fn_cb_dataless handler) {
1803        libtrace->callbacks.message_stopping = handler;
1804        return 0;
1805}
1806
1807DLLEXPORT int trace_cb_packet(libtrace_t *libtrace, fn_cb_packet handler) {
1808        libtrace->callbacks.message_packet = handler;
1809        return 0;
1810}
1811
1812DLLEXPORT int trace_cb_first_packet(libtrace_t *libtrace, fn_cb_first_packet handler) {
1813        libtrace->callbacks.message_first_packet = handler;
1814        return 0;
1815}
1816
1817DLLEXPORT int trace_cb_tick_count(libtrace_t *libtrace, fn_cb_tick handler) {
1818        libtrace->callbacks.message_tick_count = handler;
1819        return 0;
1820}
1821
1822DLLEXPORT int trace_cb_tick_interval(libtrace_t *libtrace, fn_cb_tick handler) {
1823        libtrace->callbacks.message_tick_interval = handler;
1824        return 0;
1825}
1826
1827/*
1828 * Pauses a trace, this should only be called by the main thread
1829 * 1. Set started = false
1830 * 2. All perpkt threads are paused waiting on a condition var
1831 * 3. Then call ppause on the underlying format if found
1832 * 4. The traces state is paused
1833 *
1834 * Once done you should be able to modify the trace setup and call pstart again
1835 * TODO add support to change the number of threads.
1836 */
1837DLLEXPORT int trace_ppause(libtrace_t *libtrace)
1838{
1839        libtrace_thread_t *t;
1840        int i;
1841        assert(libtrace);
1842
1843        t = get_thread_table(libtrace);
1844        // Check state from within the lock if we are going to change it
1845        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1846        if (!libtrace->started || libtrace->state != STATE_RUNNING) {
1847                trace_set_err(libtrace,TRACE_ERR_BAD_STATE, "You must call trace_start() before calling trace_ppause()");
1848                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1849                return -1;
1850        }
1851
1852        libtrace_change_state(libtrace, STATE_PAUSING, false);
1853        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1854
1855        // Special case handle the hasher thread case
1856        if (trace_has_dedicated_hasher(libtrace)) {
1857                if (libtrace->config.debug_state)
1858                        fprintf(stderr, "Hasher thread is running, asking it to pause ...");
1859                libtrace_message_t message = {0};
1860                message.code = MESSAGE_DO_PAUSE;
1861                trace_message_thread(libtrace, &libtrace->hasher_thread, &message);
1862                // Wait for it to pause
1863                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1864                while (libtrace->hasher_thread.state == THREAD_RUNNING) {
1865                        ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
1866                }
1867                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1868                if (libtrace->config.debug_state)
1869                        fprintf(stderr, " DONE\n");
1870        }
1871
1872        if (libtrace->config.debug_state)
1873                fprintf(stderr, "Asking perpkt threads to pause ...");
1874        // Stop threads, skip this one if it's a perpkt
1875        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1876                if (&libtrace->perpkt_threads[i] != t) {
1877                        libtrace_message_t message = {0};
1878                        message.code = MESSAGE_DO_PAUSE;
1879                        ASSERT_RET(trace_message_thread(libtrace, &libtrace->perpkt_threads[i], &message), != -1);
1880                        if(trace_has_dedicated_hasher(libtrace)) {
1881                                // The hasher has stopped and other threads have messages waiting therefore
1882                                // If the queues are empty the other threads would have no data
1883                                // So send some message packets to simply ask the threads to check
1884                                // We are the only writer since hasher has paused
1885                                libtrace_packet_t *pkt;
1886                                libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &pkt, 1, 1);
1887                                pkt->error = READ_MESSAGE;
1888                                libtrace_ringbuffer_write(&libtrace->perpkt_threads[i].rbuffer, pkt);
1889                        }
1890                } else {
1891                        fprintf(stderr, "Mapper threads should not be used to pause a trace this could cause any number of problems!!\n");
1892                }
1893        }
1894
1895        if (t) {
1896                // A perpkt is doing the pausing, interesting, fake an extra thread paused
1897                // We rely on the user to *not* return before starting the trace again
1898                thread_change_state(libtrace, t, THREAD_PAUSED, true);
1899        }
1900
1901        // Wait for all threads to pause
1902        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1903        while(libtrace->perpkt_thread_states[THREAD_RUNNING]) {
1904                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
1905        }
1906        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1907
1908        if (libtrace->config.debug_state)
1909                fprintf(stderr, " DONE\n");
1910
1911        // Deal with the reporter
1912        if (trace_has_reporter(libtrace)) {
1913                if (libtrace->config.debug_state)
1914                        fprintf(stderr, "Reporter thread is running, asking it to pause ...");
1915                libtrace_message_t message = {0};
1916                message.code = MESSAGE_DO_PAUSE;
1917                trace_message_thread(libtrace, &libtrace->reporter_thread, &message);
1918                // Wait for it to pause
1919                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1920                while (libtrace->reporter_thread.state == THREAD_RUNNING) {
1921                        ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
1922                }
1923                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1924                if (libtrace->config.debug_state)
1925                        fprintf(stderr, " DONE\n");
1926        }
1927
1928        /* Cache values before we pause */
1929        if (libtrace->stats == NULL)
1930                libtrace->stats = trace_create_statistics();
1931        // Save the statistics against the trace
1932        trace_get_statistics(libtrace, NULL);
1933        if (trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace) && libtrace->perpkt_thread_count > 1) {
1934                libtrace->started = false;
1935                if (libtrace->format->ppause_input)
1936                        libtrace->format->ppause_input(libtrace);
1937                // TODO What happens if we don't have pause input??
1938        } else {
1939                int err;
1940                err = trace_pause(libtrace);
1941                // We should handle this a bit better
1942                if (err)
1943                        return err;
1944        }
1945
1946        // Only set as paused after the pause has been called on the trace
1947        libtrace_change_state(libtrace, STATE_PAUSED, true);
1948        return 0;
1949}
1950
1951/**
1952 * Stop trace finish prematurely as though it meet an EOF
1953 * This should only be called by the main thread
1954 * 1. Calls ppause
1955 * 2. Sends a message asking for threads to finish
1956 * 3. Releases threads which will pause
1957 */
1958DLLEXPORT int trace_pstop(libtrace_t *libtrace)
1959{
1960        int i, err;
1961        libtrace_message_t message = {0};
1962        assert(libtrace);
1963
1964        // Ensure all threads have paused and the underlying trace format has
1965        // been closed and all packets associated are cleaned up
1966        // Pause will do any state checks for us
1967        err = trace_ppause(libtrace);
1968        if (err)
1969                return err;
1970
1971        // Now send a message asking the threads to stop
1972        // This will be retrieved before trying to read another packet
1973
1974        message.code = MESSAGE_DO_STOP;
1975        trace_message_perpkts(libtrace, &message);
1976        if (trace_has_dedicated_hasher(libtrace))
1977                trace_message_thread(libtrace, &libtrace->hasher_thread, &message);
1978
1979        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1980                trace_message_thread(libtrace, &libtrace->perpkt_threads[i], &message);
1981        }
1982
1983        /* Now release the threads and let them stop - when the threads finish
1984         * the state will be set to finished */
1985        libtrace_change_state(libtrace, STATE_FINISHING, true);
1986        return 0;
1987}
1988
1989DLLEXPORT int trace_set_hasher(libtrace_t *trace, enum hasher_types type, fn_hasher hasher, void *data) {
1990        int ret = -1;
1991        if ((type == HASHER_CUSTOM && !hasher) || (type == HASHER_BALANCE && hasher)) {
1992                return -1;
1993        }
1994
1995        // Save the requirements
1996        trace->hasher_type = type;
1997        if (hasher) {
1998                trace->hasher = hasher;
1999                trace->hasher_data = data;
2000        } else {
2001                trace->hasher = NULL;
2002                trace->hasher_data = NULL;
2003        }
2004
2005        // Try push this to hardware - NOTE hardware could do custom if
2006        // there is a more efficient way to apply it, in this case
2007        // it will simply grab the function out of libtrace_t
2008        if (trace_supports_parallel(trace) && trace->format->config_input)
2009                ret = trace->format->config_input(trace, TRACE_OPTION_HASHER, &type);
2010
2011        if (ret == -1) {
2012                /* We have to deal with this ourself */
2013                if (!hasher) {
2014                        switch (type)
2015                        {
2016                                case HASHER_CUSTOM:
2017                                case HASHER_BALANCE:
2018                                        return 0;
2019                                case HASHER_BIDIRECTIONAL:
2020                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
2021                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
2022                                        toeplitz_init_config(trace->hasher_data, 1);
2023                                        return 0;
2024                                case HASHER_UNIDIRECTIONAL:
2025                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
2026                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
2027                                        toeplitz_init_config(trace->hasher_data, 0);
2028                                        return 0;
2029                        }
2030                        return -1;
2031                }
2032        } else {
2033                /* If the hasher is hardware we zero out the hasher and hasher
2034                 * data fields - only if we need a hasher do we do this */
2035                trace->hasher = NULL;
2036                trace->hasher_data = NULL;
2037        }
2038
2039        return 0;
2040}
2041
2042// Waits for all threads to finish
2043DLLEXPORT void trace_join(libtrace_t *libtrace) {
2044        int i;
2045
2046        /* Firstly wait for the perpkt threads to finish, since these are
2047         * user controlled */
2048        for (i=0; i< libtrace->perpkt_thread_count; i++) {
2049                ASSERT_RET(pthread_join(libtrace->perpkt_threads[i].tid, NULL), == 0);
2050                // So we must do our best effort to empty the queue - so
2051                // the producer (or any other threads) don't block.
2052                libtrace_packet_t * packet;
2053                assert(libtrace->perpkt_threads[i].state == THREAD_FINISHED);
2054                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
2055                        if (packet) // This could be NULL iff the perpkt finishes early
2056                                trace_destroy_packet(packet);
2057        }
2058
2059        /* Now the hasher */
2060        if (trace_has_dedicated_hasher(libtrace)) {
2061                pthread_join(libtrace->hasher_thread.tid, NULL);
2062                assert(libtrace->hasher_thread.state == THREAD_FINISHED);
2063        }
2064
2065        // Now that everything is finished nothing can be touching our
2066        // buffers so clean them up
2067        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2068                // Its possible 1 packet got added by the reporter (or 1 per any other thread) since we cleaned up
2069                // if they lost timeslice before-during a write
2070                libtrace_packet_t * packet;
2071                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
2072                        trace_destroy_packet(packet);
2073                if (trace_has_dedicated_hasher(libtrace)) {
2074                        assert(libtrace_ringbuffer_is_empty(&libtrace->perpkt_threads[i].rbuffer));
2075                        libtrace_ringbuffer_destroy(&libtrace->perpkt_threads[i].rbuffer);
2076                }
2077                // Cannot destroy vector yet, this happens with trace_destroy
2078        }
2079
2080        if (trace_has_reporter(libtrace)) {
2081                pthread_join(libtrace->reporter_thread.tid, NULL);
2082                assert(libtrace->reporter_thread.state == THREAD_FINISHED);
2083        }
2084
2085        // Wait for the tick (keepalive) thread if it has been started
2086        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
2087                libtrace_message_t msg = {0};
2088                msg.code = MESSAGE_DO_STOP;
2089                trace_message_thread(libtrace, &libtrace->keepalive_thread, &msg);
2090                pthread_join(libtrace->keepalive_thread.tid, NULL);
2091        }
2092
2093        libtrace_change_state(libtrace, STATE_JOINED, true);
2094        print_memory_stats();
2095}
2096
2097DLLEXPORT int libtrace_thread_get_message_count(libtrace_t * libtrace,
2098                                                libtrace_thread_t *t)
2099{
2100        int ret;
2101        if (t == NULL)
2102                t = get_thread_descriptor(libtrace);
2103        if (t == NULL)
2104                return -1;
2105        ret = libtrace_message_queue_count(&t->messages);
2106        return ret < 0 ? 0 : ret;
2107}
2108
2109DLLEXPORT int libtrace_thread_get_message(libtrace_t * libtrace,
2110                                          libtrace_thread_t *t,
2111                                          libtrace_message_t * message)
2112{
2113        int ret;
2114        if (t == NULL)
2115                t = get_thread_descriptor(libtrace);
2116        if (t == NULL)
2117                return -1;
2118        ret = libtrace_message_queue_get(&t->messages, message);
2119        return ret < 0 ? 0 : ret;
2120}
2121
2122DLLEXPORT int libtrace_thread_try_get_message(libtrace_t * libtrace,
2123                                              libtrace_thread_t *t,
2124                                              libtrace_message_t * message)
2125{
2126        if (t == NULL)
2127                t = get_thread_descriptor(libtrace);
2128        if (t == NULL)
2129                return -1;
2130        if (libtrace_message_queue_try_get(&t->messages, message) != LIBTRACE_MQ_FAILED)
2131                return 0;
2132        else
2133                return -1;
2134}
2135
2136DLLEXPORT int trace_message_thread(libtrace_t * libtrace, libtrace_thread_t *t, libtrace_message_t * message)
2137{
2138        int ret;
2139        if (!message->sender)
2140                message->sender = get_thread_descriptor(libtrace);
2141
2142        ret = libtrace_message_queue_put(&t->messages, message);
2143        return ret < 0 ? 0 : ret;
2144}
2145
2146DLLEXPORT int trace_message_reporter(libtrace_t * libtrace, libtrace_message_t * message)
2147{
2148        if (!trace_has_reporter(libtrace) ||
2149            !(libtrace->reporter_thread.state == THREAD_RUNNING
2150              || libtrace->reporter_thread.state == THREAD_PAUSED))
2151                return -1;
2152
2153        return trace_message_thread(libtrace, &libtrace->reporter_thread, message);
2154}
2155
2156DLLEXPORT int trace_post_reporter(libtrace_t *libtrace)
2157{
2158        libtrace_message_t message = {0};
2159        message.code = MESSAGE_POST_REPORTER;
2160        return trace_message_reporter(libtrace, (void *) &message);
2161}
2162
2163DLLEXPORT int trace_message_perpkts(libtrace_t * libtrace, libtrace_message_t * message)
2164{
2165        int i;
2166        int missed = 0;
2167        if (message->sender == NULL)
2168                message->sender = get_thread_descriptor(libtrace);
2169        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2170                if (libtrace->perpkt_threads[i].state == THREAD_RUNNING ||
2171                    libtrace->perpkt_threads[i].state == THREAD_PAUSED) {
2172                        libtrace_message_queue_put(&libtrace->perpkt_threads[i].messages, message);
2173                } else {
2174                        missed += 1;
2175                }
2176        }
2177        return -missed;
2178}
2179
2180DLLEXPORT void * trace_get_local(libtrace_t *trace)
2181{
2182        return trace->global_blob;
2183}
2184
2185DLLEXPORT void * trace_set_local(libtrace_t *trace, void * data)
2186{
2187        void *ret;
2188        pthread_mutex_lock(&trace->libtrace_lock);
2189        ret = trace->global_blob;
2190        trace->global_blob = data;
2191        pthread_mutex_unlock(&trace->libtrace_lock);
2192        return ret;
2193}
2194
2195DLLEXPORT void * trace_get_tls(libtrace_thread_t *t)
2196{
2197        return t->user_data;
2198}
2199
2200DLLEXPORT void * trace_set_tls(libtrace_thread_t *t, void * data)
2201{
2202        void *ret = t->user_data;
2203        t->user_data = data;
2204        return ret;
2205}
2206
2207/**
2208 * Publishes a result to the reduce queue
2209 * Should only be called by a perpkt thread, i.e. from a perpkt handler
2210 */
2211DLLEXPORT void trace_publish_result(libtrace_t *libtrace, libtrace_thread_t *t, uint64_t key, libtrace_generic_t value, int type) {
2212        libtrace_result_t res;
2213        res.type = type;
2214        res.key = key;
2215        res.value = value;
2216        assert(libtrace->combiner.publish);
2217        libtrace->combiner.publish(libtrace, t->perpkt_num, &libtrace->combiner, &res);
2218        return;
2219}
2220
2221DLLEXPORT void trace_set_combiner(libtrace_t *trace, const libtrace_combine_t *combiner, libtrace_generic_t config){
2222        if (combiner) {
2223                trace->combiner = *combiner;
2224                trace->combiner.configuration = config;
2225        } else {
2226                // No combiner, so don't try use it
2227                memset(&trace->combiner, 0, sizeof(trace->combiner));
2228        }
2229}
2230
2231DLLEXPORT uint64_t trace_packet_get_order(libtrace_packet_t * packet) {
2232        return packet->order;
2233}
2234
2235DLLEXPORT uint64_t trace_packet_get_hash(libtrace_packet_t * packet) {
2236        return packet->hash;
2237}
2238
2239DLLEXPORT void trace_packet_set_order(libtrace_packet_t * packet, uint64_t order) {
2240        packet->order = order;
2241}
2242
2243DLLEXPORT void trace_packet_set_hash(libtrace_packet_t * packet, uint64_t hash) {
2244        packet->hash = hash;
2245}
2246
2247DLLEXPORT bool trace_has_finished(libtrace_t * libtrace) {
2248        return libtrace->state == STATE_FINISHED || libtrace->state == STATE_JOINED;
2249}
2250
2251/**
2252 * @return True if the trace is not running such that it can be configured
2253 */
2254static inline bool trace_is_configurable(libtrace_t *trace) {
2255        return trace->state == STATE_NEW ||
2256                        trace->state == STATE_PAUSED;
2257}
2258
2259DLLEXPORT int trace_set_perpkt_threads(libtrace_t *trace, int nb) {
2260        if (!trace_is_configurable(trace)) return -1;
2261
2262        /* TODO consider allowing an offset from the total number of cores i.e.
2263         * -1 reserve 1 core */
2264        if (nb >= 0) {
2265                trace->config.perpkt_threads = nb;
2266                return 0;
2267        } else {
2268                return -1;
2269        }
2270}
2271
2272DLLEXPORT int trace_set_tick_interval(libtrace_t *trace, size_t millisec) {
2273        if (!trace_is_configurable(trace)) return -1;
2274
2275        trace->config.tick_interval = millisec;
2276        return 0;
2277}
2278
2279DLLEXPORT int trace_set_tick_count(libtrace_t *trace, size_t count) {
2280        if (!trace_is_configurable(trace)) return -1;
2281
2282        trace->config.tick_count = count;
2283        return 0;
2284}
2285
2286DLLEXPORT int trace_set_tracetime(libtrace_t *trace, bool tracetime) {
2287        if (!trace_is_configurable(trace)) return -1;
2288
2289        trace->tracetime = tracetime;
2290        return 0;
2291}
2292
2293DLLEXPORT int trace_set_cache_size(libtrace_t *trace, size_t size) {
2294        if (!trace_is_configurable(trace)) return -1;
2295
2296        trace->config.cache_size = size;
2297        return 0;
2298}
2299
2300DLLEXPORT int trace_set_thread_cache_size(libtrace_t *trace, size_t size) {
2301        if (!trace_is_configurable(trace)) return -1;
2302
2303        trace->config.thread_cache_size = size;
2304        return 0;
2305}
2306
2307DLLEXPORT int trace_set_fixed_count(libtrace_t *trace, bool fixed) {
2308        if (!trace_is_configurable(trace)) return -1;
2309
2310        trace->config.fixed_count = fixed;
2311        return 0;
2312}
2313
2314DLLEXPORT int trace_set_burst_size(libtrace_t *trace, size_t size) {
2315        if (!trace_is_configurable(trace)) return -1;
2316
2317        trace->config.burst_size = size;
2318        return 0;
2319}
2320
2321DLLEXPORT int trace_set_hasher_queue_size(libtrace_t *trace, size_t size) {
2322        if (!trace_is_configurable(trace)) return -1;
2323
2324        trace->config.hasher_queue_size = size;
2325        return 0;
2326}
2327
2328DLLEXPORT int trace_set_hasher_polling(libtrace_t *trace, bool polling) {
2329        if (!trace_is_configurable(trace)) return -1;
2330
2331        trace->config.hasher_polling = polling;
2332        return 0;
2333}
2334
2335DLLEXPORT int trace_set_reporter_polling(libtrace_t *trace, bool polling) {
2336        if (!trace_is_configurable(trace)) return -1;
2337
2338        trace->config.reporter_polling = polling;
2339        return 0;
2340}
2341
2342DLLEXPORT int trace_set_reporter_thold(libtrace_t *trace, size_t thold) {
2343        if (!trace_is_configurable(trace)) return -1;
2344
2345        trace->config.reporter_thold = thold;
2346        return 0;
2347}
2348
2349DLLEXPORT int trace_set_debug_state(libtrace_t *trace, bool debug_state) {
2350        if (!trace_is_configurable(trace)) return -1;
2351
2352        trace->config.debug_state = debug_state;
2353        return 0;
2354}
2355
2356
2357
2358static bool config_bool_parse(char *value, size_t nvalue) {
2359        if (strncmp(value, "true", nvalue) == 0)
2360                return true;
2361        else if (strncmp(value, "false", nvalue) == 0)
2362                return false;
2363        else
2364                return strtoll(value, NULL, 10) != 0;
2365}
2366
2367/* Note update documentation on trace_set_configuration */
2368static void config_string(struct user_configuration *uc, char *key, size_t nkey, char *value, size_t nvalue) {
2369        assert(key);
2370        assert(value);
2371        assert(uc);
2372        if (strncmp(key, "cache_size", nkey) == 0
2373            || strncmp(key, "cs", nkey) == 0) {
2374                uc->cache_size = strtoll(value, NULL, 10);
2375        } else if (strncmp(key, "thread_cache_size", nkey) == 0
2376                   || strncmp(key, "tcs", nkey) == 0) {
2377                uc->thread_cache_size = strtoll(value, NULL, 10);
2378        } else if (strncmp(key, "fixed_count", nkey) == 0
2379                   || strncmp(key, "fc", nkey) == 0) {
2380                uc->fixed_count = config_bool_parse(value, nvalue);
2381        } else if (strncmp(key, "burst_size", nkey) == 0
2382                   || strncmp(key, "bs", nkey) == 0) {
2383                uc->burst_size = strtoll(value, NULL, 10);
2384        } else if (strncmp(key, "tick_interval", nkey) == 0
2385                   || strncmp(key, "ti", nkey) == 0) {
2386                uc->tick_interval = strtoll(value, NULL, 10);
2387        } else if (strncmp(key, "tick_count", nkey) == 0
2388                   || strncmp(key, "tc", nkey) == 0) {
2389                uc->tick_count = strtoll(value, NULL, 10);
2390        } else if (strncmp(key, "perpkt_threads", nkey) == 0
2391                   || strncmp(key, "pt", nkey) == 0) {
2392                uc->perpkt_threads = strtoll(value, NULL, 10);
2393        } else if (strncmp(key, "hasher_queue_size", nkey) == 0
2394                   || strncmp(key, "hqs", nkey) == 0) {
2395                uc->hasher_queue_size = strtoll(value, NULL, 10);
2396        } else if (strncmp(key, "hasher_polling", nkey) == 0
2397                   || strncmp(key, "hp", nkey) == 0) {
2398                uc->hasher_polling = config_bool_parse(value, nvalue);
2399        } else if (strncmp(key, "reporter_polling", nkey) == 0
2400                   || strncmp(key, "rp", nkey) == 0) {
2401                uc->reporter_polling = config_bool_parse(value, nvalue);
2402        } else if (strncmp(key, "reporter_thold", nkey) == 0
2403                   || strncmp(key, "rt", nkey) == 0) {
2404                uc->reporter_thold = strtoll(value, NULL, 10);
2405        } else if (strncmp(key, "debug_state", nkey) == 0
2406                   || strncmp(key, "ds", nkey) == 0) {
2407                uc->debug_state = config_bool_parse(value, nvalue);
2408        } else {
2409                fprintf(stderr, "No matching option %s(=%s), ignoring\n", key, value);
2410        }
2411}
2412
2413DLLEXPORT int trace_set_configuration(libtrace_t *trace, const char *str) {
2414        char *pch;
2415        char key[100];
2416        char value[100];
2417        char *dup;
2418        assert(str);
2419        assert(trace);
2420
2421        if (!trace_is_configurable(trace)) return -1;
2422
2423        dup = strdup(str);
2424        pch = strtok (dup," ,.-");
2425        while (pch != NULL)
2426        {
2427                if (sscanf(pch, "%99[^=]=%99s", key, value) == 2) {
2428                        config_string(&trace->config, key, sizeof(key), value, sizeof(value));
2429                } else {
2430                        fprintf(stderr, "Error parsing option %s\n", pch);
2431                }
2432                pch = strtok (NULL," ,.-");
2433        }
2434        free(dup);
2435
2436        return 0;
2437}
2438
2439DLLEXPORT int trace_set_configuration_file(libtrace_t *trace, FILE *file) {
2440        char line[1024];
2441        if (!trace_is_configurable(trace)) return -1;
2442
2443        while (fgets(line, sizeof(line), file) != NULL)
2444        {
2445                trace_set_configuration(trace, line);
2446        }
2447
2448        if(ferror(file))
2449                return -1;
2450        else
2451                return 0;
2452}
2453
2454DLLEXPORT void trace_free_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
2455        assert(packet);
2456        /* Always release any resources this might be holding */
2457        trace_fin_packet(packet);
2458        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packet, 1, 1);
2459}
2460
2461DLLEXPORT libtrace_info_t *trace_get_information(libtrace_t * libtrace) {
2462        if (libtrace->format)
2463                return &libtrace->format->info;
2464        else
2465                return NULL;
2466}
Note: See TracBrowser for help on using the repository browser.