source: lib/trace_parallel.c @ b148e3b

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivendag_formatrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since b148e3b was b148e3b, checked in by Richard Sanger <rsanger@…>, 4 years ago

Updates DPDK to latest release and improves performance

Thanks to Richard Cziva for supplying an intial patch for this.

We now recommend using the latest release of DPDK, ideally 16.04 or newer

To support newer releases

  • Fixes RSS hashing renames
  • Fixes deprecated rte_mempool_count
  • Fixes ETH_LINK_SPEED_X rename
  • Fixes TX minimum memory requirement
  • Fixes dropped vs errored counting in recent versions (for best results use 16.04 or newer)

Tuned to allow DPDK's SSE vector in supporting drivers mode for better performance.
Bumps default internal batch size up to 32 to matches DPDK in SSE vector mode.

  • Property mode set to 100644
File size: 82.2 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27#define _GNU_SOURCE
28#include "common.h"
29#include "config.h"
30#include <assert.h>
31#include <errno.h>
32#include <fcntl.h>
33#include <stdio.h>
34#include <stdlib.h>
35#include <string.h>
36#include <sys/stat.h>
37#include <sys/types.h>
38#ifndef WIN32
39#include <sys/socket.h>
40#endif
41#include <stdarg.h>
42#include <sys/param.h>
43
44#ifdef HAVE_LIMITS_H
45#  include <limits.h>
46#endif
47
48#ifdef HAVE_SYS_LIMITS_H
49#  include <sys/limits.h>
50#endif
51
52#ifdef HAVE_NET_IF_ARP_H
53#  include <net/if_arp.h>
54#endif
55
56#ifdef HAVE_NET_IF_H
57#  include <net/if.h>
58#endif
59
60#ifdef HAVE_NETINET_IN_H
61#  include <netinet/in.h>
62#endif
63
64#ifdef HAVE_NET_ETHERNET_H
65#  include <net/ethernet.h>
66#endif
67
68#ifdef HAVE_NETINET_IF_ETHER_H
69#  include <netinet/if_ether.h>
70#endif
71
72#include <time.h>
73#ifdef WIN32
74#include <sys/timeb.h>
75#endif
76
77#include "libtrace.h"
78#include "libtrace_parallel.h"
79
80#ifdef HAVE_NET_BPF_H
81#  include <net/bpf.h>
82#else
83#  ifdef HAVE_PCAP_BPF_H
84#    include <pcap-bpf.h>
85#  endif
86#endif
87
88
89#include "libtrace_int.h"
90#include "format_helper.h"
91#include "rt_protocol.h"
92#include "hash_toeplitz.h"
93
94#include <pthread.h>
95#include <signal.h>
96#include <unistd.h>
97#include <ctype.h>
98
99static inline int delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t);
100extern int libtrace_parallel;
101
102struct mem_stats {
103        struct memfail {
104           uint64_t cache_hit;
105           uint64_t ring_hit;
106           uint64_t miss;
107           uint64_t recycled;
108        } readbulk, read, write, writebulk;
109};
110
111
112#ifdef ENABLE_MEM_STATS
113// Grrr gcc wants this spelt out
114__thread struct mem_stats mem_hits = {{0},{0},{0},{0}};
115
116
117static void print_memory_stats() {
118        uint64_t total;
119#if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__)
120        char t_name[50];
121        pthread_getname_np(pthread_self(), t_name, sizeof(t_name));
122
123        fprintf(stderr, "Thread ID#%d - %s\n", (int) pthread_self(), t_name);
124#else
125        fprintf(stderr, "Thread ID#%d\n", (int) pthread_self());
126#endif
127
128        total = mem_hits.read.cache_hit + mem_hits.read.ring_hit + mem_hits.read.miss;
129        if (total) {
130                fprintf(stderr, "\tRead:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
131                                mem_hits.read.cache_hit, mem_hits.read.ring_hit, mem_hits.read.miss, mem_hits.read.recycled);
132                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
133                                total, (double) mem_hits.read.miss / (double) total * 100.0);
134        }
135
136        total = mem_hits.readbulk.cache_hit + mem_hits.readbulk.ring_hit + mem_hits.readbulk.miss;
137        if (total) {
138                fprintf(stderr, "\tReadbulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
139                                mem_hits.readbulk.cache_hit, mem_hits.readbulk.ring_hit, mem_hits.readbulk.miss, mem_hits.readbulk.recycled);
140
141
142                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
143                                total, (double) mem_hits.readbulk.miss / (double) total * 100.0);
144        }
145
146        total = mem_hits.write.cache_hit + mem_hits.write.ring_hit + mem_hits.write.miss;
147        if (total) {
148                fprintf(stderr, "\tWrite:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
149                                mem_hits.write.cache_hit, mem_hits.write.ring_hit, mem_hits.write.miss, mem_hits.write.recycled);
150
151                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
152                                total, (double) mem_hits.write.miss / (double) total * 100.0);
153        }
154
155        total = mem_hits.writebulk.cache_hit + mem_hits.writebulk.ring_hit + mem_hits.writebulk.miss;
156        if (total) {
157                fprintf(stderr, "\tWritebulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
158                                mem_hits.writebulk.cache_hit, mem_hits.writebulk.ring_hit, mem_hits.writebulk.miss, mem_hits.writebulk.recycled);
159
160                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
161                                total, (double) mem_hits.writebulk.miss / (double) total * 100.0);
162        }
163}
164#else
165static void print_memory_stats() {}
166#endif
167
168static const libtrace_generic_t gen_zero = {0};
169
170/* This should optimise away the switch to nothing in the explict cases */
171inline void send_message(libtrace_t *trace, libtrace_thread_t *thread,
172                const enum libtrace_messages type,
173                libtrace_generic_t data, libtrace_thread_t *sender) {
174
175        fn_cb_dataless fn = NULL;
176        enum libtrace_messages switchtype;
177        libtrace_callback_set_t *cbs = NULL;
178
179        if (thread == &trace->reporter_thread) {
180                cbs = trace->reporter_cbs;
181        } else {
182                cbs = trace->perpkt_cbs;
183        }
184
185        if (cbs == NULL)
186                return;
187
188        if (type >= MESSAGE_USER)
189                switchtype = MESSAGE_USER;
190        else
191                switchtype = (enum libtrace_messages) type;
192
193        switch (switchtype) {
194        case MESSAGE_STARTING:
195                if (cbs->message_starting)
196                        thread->user_data = (*cbs->message_starting)(trace,
197                                        thread, trace->global_blob);
198                return;
199        case MESSAGE_FIRST_PACKET:
200                if (cbs->message_first_packet)
201                                (*cbs->message_first_packet)(trace, thread,
202                                trace->global_blob, thread->user_data,
203                                sender);
204                return;
205        case MESSAGE_TICK_COUNT:
206                if (cbs->message_tick_count)
207                        (*cbs->message_tick_count)(trace, thread,
208                                        trace->global_blob, thread->user_data,
209                                        data.uint64);
210                return;
211        case MESSAGE_TICK_INTERVAL:
212                if (cbs->message_tick_interval)
213                        (*cbs->message_tick_interval)(trace, thread,
214                                        trace->global_blob, thread->user_data,
215                                        data.uint64);
216                return;
217        case MESSAGE_STOPPING:
218                fn = cbs->message_stopping;
219                break;
220        case MESSAGE_RESUMING:
221                fn = cbs->message_resuming;
222                break;
223        case MESSAGE_PAUSING:
224                fn = cbs->message_pausing;
225                break;
226        case MESSAGE_USER:
227                if (cbs->message_user)
228                        (*cbs->message_user)(trace, thread, trace->global_blob,
229                                        thread->user_data, type, data, sender);
230                return;
231        case MESSAGE_RESULT:
232                if (cbs->message_result)
233                        (*cbs->message_result)(trace, thread,
234                                        trace->global_blob, thread->user_data,
235                                        data.res);
236                return;
237
238        /* These should be unused */
239        case MESSAGE_DO_PAUSE:
240        case MESSAGE_DO_STOP:
241        case MESSAGE_POST_REPORTER:
242        case MESSAGE_PACKET:
243                return;
244        }
245
246        if (fn)
247                (*fn)(trace, thread, trace->global_blob, thread->user_data);
248}
249
250DLLEXPORT void trace_destroy_callback_set(libtrace_callback_set_t *cbset) {
251        free(cbset);
252}
253
254DLLEXPORT libtrace_callback_set_t *trace_create_callback_set() {
255        libtrace_callback_set_t *cbset;
256
257        cbset = (libtrace_callback_set_t *)malloc(sizeof(libtrace_callback_set_t));
258        memset(cbset, 0, sizeof(libtrace_callback_set_t));
259        return cbset;
260}
261
262/*
263 * This can be used once the hasher thread has been started and internally after
264 * verify_configuration.
265 */
266DLLEXPORT bool trace_has_dedicated_hasher(libtrace_t * libtrace)
267{
268        return libtrace->hasher_thread.type == THREAD_HASHER;
269}
270
271DLLEXPORT bool trace_has_reporter(libtrace_t * libtrace)
272{
273        assert(libtrace->state != STATE_NEW);
274        return libtrace->reporter_thread.type == THREAD_REPORTER && libtrace->reporter_cbs;
275}
276
277/**
278 * When running the number of perpkt threads in use.
279 * TODO what if the trace is not running yet, or has finished??
280 *
281 * @brief libtrace_perpkt_thread_nb
282 * @param t The trace
283 * @return
284 */
285DLLEXPORT int trace_get_perpkt_threads(libtrace_t * t) {
286        return t->perpkt_thread_count;
287}
288
289/**
290 * Changes the overall traces state and signals the condition.
291 *
292 * @param trace A pointer to the trace
293 * @param new_state The new state of the trace
294 * @param need_lock Set to true if libtrace_lock is not held, otherwise
295 *        false in the case the lock is currently held by this thread.
296 */
297static inline void libtrace_change_state(libtrace_t *trace,
298        const enum trace_state new_state, const bool need_lock)
299{
300        UNUSED enum trace_state prev_state;
301        if (need_lock)
302                pthread_mutex_lock(&trace->libtrace_lock);
303        prev_state = trace->state;
304        trace->state = new_state;
305
306        if (trace->config.debug_state)
307                fprintf(stderr, "Trace(%s) state changed from %s to %s\n",
308                        trace->uridata, get_trace_state_name(prev_state),
309                        get_trace_state_name(trace->state));
310
311        pthread_cond_broadcast(&trace->perpkt_cond);
312        if (need_lock)
313                pthread_mutex_unlock(&trace->libtrace_lock);
314}
315
316/**
317 * Changes a thread's state and broadcasts the condition variable. This
318 * should always be done when the lock is held.
319 *
320 * Additionally for perpkt threads the state counts are updated.
321 *
322 * @param trace A pointer to the trace
323 * @param t A pointer to the thread to modify
324 * @param new_state The new state of the thread
325 * @param need_lock Set to true if libtrace_lock is not held, otherwise
326 *        false in the case the lock is currently held by this thread.
327 */
328static inline void thread_change_state(libtrace_t *trace, libtrace_thread_t *t,
329        const enum thread_states new_state, const bool need_lock)
330{
331        enum thread_states prev_state;
332        if (need_lock)
333                pthread_mutex_lock(&trace->libtrace_lock);
334        prev_state = t->state;
335        t->state = new_state;
336        if (t->type == THREAD_PERPKT) {
337                --trace->perpkt_thread_states[prev_state];
338                ++trace->perpkt_thread_states[new_state];
339        }
340
341        if (trace->config.debug_state)
342                fprintf(stderr, "Thread %d state changed from %d to %d\n",
343                        (int) t->tid, prev_state, t->state);
344
345        if (trace->perpkt_thread_states[THREAD_FINISHED] == trace->perpkt_thread_count)
346                libtrace_change_state(trace, STATE_FINISHED, false);
347
348        pthread_cond_broadcast(&trace->perpkt_cond);
349        if (need_lock)
350                pthread_mutex_unlock(&trace->libtrace_lock);
351}
352
353/**
354 * This is valid once a trace is initialised
355 *
356 * @return True if the format supports parallel threads.
357 */
358static inline bool trace_supports_parallel(libtrace_t *trace)
359{
360        assert(trace);
361        assert(trace->format);
362        if (trace->format->pstart_input)
363                return true;
364        else
365                return false;
366}
367
368void libtrace_zero_thread(libtrace_thread_t * t) {
369        t->accepted_packets = 0;
370        t->filtered_packets = 0;
371        t->recorded_first = false;
372        t->tracetime_offset_usec = 0;
373        t->user_data = 0;
374        t->format_data = 0;
375        libtrace_zero_ringbuffer(&t->rbuffer);
376        t->trace = NULL;
377        t->ret = NULL;
378        t->type = THREAD_EMPTY;
379        t->perpkt_num = -1;
380}
381
382// Ints are aligned int is atomic so safe to read and write at same time
383// However write must be locked, read doesn't (We never try read before written to table)
384libtrace_thread_t * get_thread_table(libtrace_t *libtrace) {
385        int i = 0;
386        pthread_t tid = pthread_self();
387
388        for (;i<libtrace->perpkt_thread_count ;++i) {
389                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
390                        return &libtrace->perpkt_threads[i];
391        }
392        return NULL;
393}
394
395static libtrace_thread_t * get_thread_descriptor(libtrace_t *libtrace) {
396        libtrace_thread_t *ret;
397        if (!(ret = get_thread_table(libtrace))) {
398                pthread_t tid = pthread_self();
399                // Check if we are reporter or something else
400                if (libtrace->hasher_thread.type == THREAD_REPORTER &&
401                                pthread_equal(tid, libtrace->reporter_thread.tid))
402                        ret = &libtrace->reporter_thread;
403                else if (libtrace->hasher_thread.type == THREAD_HASHER &&
404                         pthread_equal(tid, libtrace->hasher_thread.tid))
405                        ret = &libtrace->hasher_thread;
406                else
407                        ret = NULL;
408        }
409        return ret;
410}
411
412DLLEXPORT void libtrace_make_packet_safe(libtrace_packet_t *pkt) {
413        // Duplicate the packet in standard malloc'd memory and free the
414        // original, This is a 1:1 exchange so the ocache count remains unchanged.
415        if (pkt->buf_control != TRACE_CTRL_PACKET) {
416                libtrace_packet_t *dup;
417                dup = trace_copy_packet(pkt);
418                /* Release the external buffer */
419                trace_fin_packet(pkt);
420                /* Copy the duplicated packet over the existing */
421                memcpy(pkt, dup, sizeof(libtrace_packet_t));
422                /* Free the packet structure */
423                free(dup);
424        }
425}
426
427/**
428 * Makes a libtrace_result_t safe, used when pausing a trace.
429 * This will call libtrace_make_packet_safe if the result is
430 * a packet.
431 */
432DLLEXPORT void libtrace_make_result_safe(libtrace_result_t *res) {
433        if (res->type == RESULT_PACKET) {
434                libtrace_make_packet_safe(res->value.pkt);
435        }
436}
437
438/**
439 * Holds threads in a paused state, until released by broadcasting
440 * the condition mutex.
441 */
442static void trace_thread_pause(libtrace_t *trace, libtrace_thread_t *t) {
443        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
444        thread_change_state(trace, t, THREAD_PAUSED, false);
445        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
446                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
447        }
448        thread_change_state(trace, t, THREAD_RUNNING, false);
449        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
450}
451
452/**
453 * Sends a packet to the user, expects either a valid packet or a TICK packet.
454 *
455 * @param trace The trace
456 * @param t The current thread
457 * @param packet A pointer to the packet storage, which may be set to null upon
458 *               return, or a packet to be finished.
459 * @param tracetime If true packets are delayed to match with tracetime
460 * @return 0 is successful, otherwise if playing back in tracetime
461 *         READ_MESSAGE(-2) can be returned in which case the packet is not sent.
462 *
463 * @note READ_MESSAGE will only be returned if tracetime is true.
464 */
465static inline int dispatch_packet(libtrace_t *trace,
466                                  libtrace_thread_t *t,
467                                  libtrace_packet_t **packet,
468                                  bool tracetime) {
469
470        if ((*packet)->error > 0) {
471                if (tracetime) {
472                        if (delay_tracetime(trace, packet[0], t) == READ_MESSAGE)
473                                return READ_MESSAGE;
474                }
475                t->accepted_packets++;
476                if (trace->perpkt_cbs->message_packet)
477                        *packet = (*trace->perpkt_cbs->message_packet)(trace, t, trace->global_blob, t->user_data, *packet);
478                trace_fin_packet(*packet);
479        } else {
480                assert((*packet)->error == READ_TICK);
481                libtrace_generic_t data = {.uint64 = trace_packet_get_order(*packet)};
482                send_message(trace, t, MESSAGE_TICK_COUNT, data, t);
483        }
484        return 0;
485}
486
487/**
488 * Sends a batch of packets to the user, expects either a valid packet or a
489 * TICK packet.
490 *
491 * @param trace The trace
492 * @param t The current thread
493 * @param packets [in,out] An array of packets, these may be null upon return
494 * @param nb_packets The total number of packets in the list
495 * @param empty [in,out] A pointer to an integer storing the first empty slot,
496 * upon return this is updated
497 * @param offset [in,out] The offset into the array, upon return this is updated
498 * @param tracetime If true packets are delayed to match with tracetime
499 * @return 0 is successful, otherwise if playing back in tracetime
500 *         READ_MESSAGE(-2) can be returned in which case the packet is not sent.
501 *
502 * @note READ_MESSAGE will only be returned if tracetime is true.
503 */
504static inline int dispatch_packets(libtrace_t *trace,
505                                  libtrace_thread_t *t,
506                                  libtrace_packet_t *packets[],
507                                  int nb_packets, int *empty, int *offset,
508                                  bool tracetime) {
509        for (;*offset < nb_packets; ++*offset) {
510                int ret;
511                ret = dispatch_packet(trace, t, &packets[*offset], tracetime);
512                if (ret == 0) {
513                        /* Move full slots to front as we go */
514                        if (packets[*offset]) {
515                                if (*empty != *offset) {
516                                        packets[*empty] = packets[*offset];
517                                        packets[*offset] = NULL;
518                                }
519                                ++*empty;
520                        }
521                } else {
522                        /* Break early */
523                        assert(ret == READ_MESSAGE);
524                        return READ_MESSAGE;
525                }
526        }
527
528        return 0;
529}
530
531/**
532 * Pauses a per packet thread, messages will not be processed when the thread
533 * is paused.
534 *
535 * This process involves reading packets if a hasher thread is used. As such
536 * this function can fail to pause due to errors when reading in which case
537 * the thread should be stopped instead.
538 *
539 *
540 * @brief trace_perpkt_thread_pause
541 * @return READ_ERROR(-1) or READ_EOF(0) or 1 if successfull
542 */
543static int trace_perpkt_thread_pause(libtrace_t *trace, libtrace_thread_t *t,
544                                     libtrace_packet_t *packets[],
545                                     int nb_packets, int *empty, int *offset) {
546        libtrace_packet_t * packet = NULL;
547
548        /* Let the user thread know we are going to pause */
549        send_message(trace, t, MESSAGE_PAUSING, gen_zero, t);
550
551        /* Send through any remaining packets (or messages) without delay */
552
553        /* First send those packets already read, as fast as possible
554         * This should never fail or check for messages etc. */
555        ASSERT_RET(dispatch_packets(trace, t, packets, nb_packets, empty,
556                                    offset, false), == 0);
557
558        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
559        /* If a hasher thread is running, empty input queues so we don't lose data */
560        if (trace_has_dedicated_hasher(trace)) {
561                // The hasher has stopped by this point, so the queue shouldn't be filling
562                while(!libtrace_ringbuffer_is_empty(&t->rbuffer) || t->format_data) {
563                        int ret = trace->pread(trace, t, &packet, 1);
564                        if (ret == 1) {
565                                if (packet->error > 0) {
566                                        store_first_packet(trace, packet, t);
567                                }
568                                ASSERT_RET(dispatch_packet(trace, t, &packet, false), == 0);
569                                if (packet == NULL)
570                                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
571                        } else if (ret != READ_MESSAGE) {
572                                /* Ignore messages we pick these up next loop */
573                                assert (ret == READ_EOF || ret == READ_ERROR);
574                                /* Verify no packets are remaining */
575                                /* TODO refactor this sanity check out!! */
576                                while (!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
577                                        ASSERT_RET(trace->pread(trace, t, &packet, 1), <= 0);
578                                        // No packets after this should have any data in them
579                                        assert(packet->error <= 0);
580                                }
581                                libtrace_ocache_free(&trace->packet_freelist, (void **) &packet, 1, 1);
582                                return -1;
583                        }
584                }
585        }
586        libtrace_ocache_free(&trace->packet_freelist, (void **) &packet, 1, 1);
587
588        /* Now we do the actual pause, this returns when we resumed */
589        trace_thread_pause(trace, t);
590        send_message(trace, t, MESSAGE_RESUMING, gen_zero, t);
591        return 1;
592}
593
594/**
595 * The is the entry point for our packet processing threads.
596 */
597static void* perpkt_threads_entry(void *data) {
598        libtrace_t *trace = (libtrace_t *)data;
599        libtrace_thread_t *t;
600        libtrace_message_t message = {0, {.uint64=0}, NULL};
601        libtrace_packet_t *packets[trace->config.burst_size];
602        size_t i;
603        //int ret;
604        /* The current reading position into the packets */
605        int offset = 0;
606        /* The number of packets last read */
607        int nb_packets = 0;
608        /* The offset to the first NULL packet upto offset */
609        int empty = 0;
610
611        /* Wait until trace_pstart has been completed */
612        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
613        t = get_thread_table(trace);
614        assert(t);
615        if (trace->state == STATE_ERROR) {
616                thread_change_state(trace, t, THREAD_FINISHED, false);
617                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
618                pthread_exit(NULL);
619        }
620        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
621
622        if (trace->format->pregister_thread) {
623                if (trace->format->pregister_thread(trace, t, 
624                                trace_is_parallel(trace)) < 0) {
625                        thread_change_state(trace, t, THREAD_FINISHED, false);
626                        pthread_exit(NULL);
627                }
628        }
629
630        /* Fill our buffer with empty packets */
631        memset(&packets, 0, sizeof(void*) * trace->config.burst_size);
632        libtrace_ocache_alloc(&trace->packet_freelist, (void **) packets,
633                              trace->config.burst_size,
634                              trace->config.burst_size);
635
636        /* ~~~~~~~~~~~ Setup complete now we loop ~~~~~~~~~~~~~~~ */
637
638        /* Let the per_packet function know we have started */
639        send_message(trace, t, MESSAGE_STARTING, gen_zero, t);
640        send_message(trace, t, MESSAGE_RESUMING, gen_zero, t);
641
642        for (;;) {
643
644                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
645                        int ret;
646                        switch (message.code) {
647                                case MESSAGE_DO_PAUSE: // This is internal
648                                        ret = trace_perpkt_thread_pause(trace, t,
649                                              packets, nb_packets, &empty, &offset);
650                                        if (ret == READ_EOF) {
651                                                goto eof;
652                                        } else if (ret == READ_ERROR) {
653                                                goto error;
654                                        }
655                                        assert(ret == 1);
656                                        continue;
657                                case MESSAGE_DO_STOP: // This is internal
658                                        goto eof;
659                        }
660                        send_message(trace, t, message.code, message.data, 
661                                        message.sender);
662                        /* Continue and the empty messages out before packets */
663                        continue;
664                }
665
666
667                /* Do we need to read a new set of packets MOST LIKELY we do */
668                if (offset == nb_packets) {
669                        /* Refill the packet buffer */
670                        if (empty != nb_packets) {
671                                // Refill the empty packets
672                                libtrace_ocache_alloc(&trace->packet_freelist,
673                                                      (void **) &packets[empty],
674                                                      nb_packets - empty,
675                                                      nb_packets - empty);
676                        }
677                        if (!trace->pread) {
678                                assert(packets[0]);
679                                nb_packets = trace_read_packet(trace, packets[0]);
680                                packets[0]->error = nb_packets;
681                                if (nb_packets > 0)
682                                        nb_packets = 1;
683                        } else {
684                                nb_packets = trace->pread(trace, t, packets, trace->config.burst_size);
685                        }
686                        offset = 0;
687                        empty = 0;
688                }
689
690                /* Handle error/message cases */
691                if (nb_packets > 0) {
692                        /* Store the first packet */
693                        if (packets[0]->error > 0) {
694                                store_first_packet(trace, packets[0], t);
695                        }
696                        dispatch_packets(trace, t, packets, nb_packets, &empty,
697                                         &offset, trace->tracetime);
698                } else {
699                        switch (nb_packets) {
700                        case READ_EOF:
701                                goto eof;
702                        case READ_ERROR:
703                                goto error;
704                        case READ_MESSAGE:
705                                nb_packets = 0;
706                                continue;
707                        default:
708                                fprintf(stderr, "Unexpected error %d!!\n", nb_packets);
709                                goto error;
710                        }
711                }
712
713        }
714
715error:
716        message.code = MESSAGE_DO_STOP;
717        message.sender = t;
718        message.data.uint64 = 0;
719        trace_message_perpkts(trace, &message);
720eof:
721        /* ~~~~~~~~~~~~~~ Trace is finished do tear down ~~~~~~~~~~~~~~~~~~~~~ */
722
723        // Let the per_packet function know we have stopped
724        send_message(trace, t, MESSAGE_PAUSING, gen_zero, t);
725        send_message(trace, t, MESSAGE_STOPPING, gen_zero, t);
726
727        // Free any remaining packets
728        for (i = 0; i < trace->config.burst_size; i++) {
729                if (packets[i]) {
730                        libtrace_ocache_free(&trace->packet_freelist, (void **) &packets[i], 1, 1);
731                        packets[i] = NULL;
732                }
733        }
734
735        thread_change_state(trace, t, THREAD_FINISHED, true);
736
737        /* Make sure the reporter sees we have finished */
738        if (trace_has_reporter(trace))
739                trace_post_reporter(trace);
740
741        // Release all ocache memory before unregistering with the format
742        // because this might(it does in DPDK) unlink the formats mempool
743        // causing destroy/finish packet to fail.
744        libtrace_ocache_unregister_thread(&trace->packet_freelist);
745        if (trace->format->punregister_thread) {
746                trace->format->punregister_thread(trace, t);
747        }
748        print_memory_stats();
749
750        pthread_exit(NULL);
751}
752
753/**
754 * The start point for our single threaded hasher thread, this will read
755 * and hash a packet from a data source and queue it against the correct
756 * core to process it.
757 */
758static void* hasher_entry(void *data) {
759        libtrace_t *trace = (libtrace_t *)data;
760        libtrace_thread_t * t;
761        int i;
762        libtrace_packet_t * packet;
763        libtrace_message_t message = {0, {.uint64=0}, NULL};
764        int pkt_skipped = 0;
765
766        assert(trace_has_dedicated_hasher(trace));
767        /* Wait until all threads are started and objects are initialised (ring buffers) */
768        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
769        t = &trace->hasher_thread;
770        assert(t->type == THREAD_HASHER && pthread_equal(pthread_self(), t->tid));
771        if (trace->state == STATE_ERROR) {
772                thread_change_state(trace, t, THREAD_FINISHED, false);
773                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
774                pthread_exit(NULL);
775        }
776        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
777
778        /* We are reading but it is not the parallel API */
779        if (trace->format->pregister_thread) {
780                trace->format->pregister_thread(trace, t, true);
781        }
782
783        /* Read all packets in then hash and queue against the correct thread */
784        while (1) {
785                int thread;
786                if (!pkt_skipped)
787                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
788                assert(packet);
789
790                if (libtrace_halt) {
791                        packet->error = 0;
792                        break;
793                }
794
795                // Check for messages that we expect MESSAGE_DO_PAUSE, (internal messages only)
796                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
797                        switch(message.code) {
798                                case MESSAGE_DO_PAUSE:
799                                        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
800                                        thread_change_state(trace, t, THREAD_PAUSED, false);
801                                        pthread_cond_broadcast(&trace->perpkt_cond);
802                                        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
803                                                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
804                                        }
805                                        thread_change_state(trace, t, THREAD_RUNNING, false);
806                                        pthread_cond_broadcast(&trace->perpkt_cond);
807                                        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
808                                        break;
809                                case MESSAGE_DO_STOP:
810                                        assert(trace->started == false);
811                                        assert(trace->state == STATE_FINISHED);
812                                        /* Mark the current packet as EOF */
813                                        packet->error = 0;
814                                        break;
815                                default:
816                                        fprintf(stderr, "Hasher thread didn't expect message code=%d\n", message.code);
817                        }
818                        pkt_skipped = 1;
819                        continue;
820                }
821
822                if ((packet->error = trace_read_packet(trace, packet)) <1) {
823                        break; /* We are EOF or error'd either way we stop  */
824                }
825
826                /* We are guaranteed to have a hash function i.e. != NULL */
827                trace_packet_set_hash(packet, (*trace->hasher)(packet, trace->hasher_data));
828                thread = trace_packet_get_hash(packet) % trace->perpkt_thread_count;
829                /* Blocking write to the correct queue - I'm the only writer */
830                if (trace->perpkt_threads[thread].state != THREAD_FINISHED) {
831                        uint64_t order = trace_packet_get_order(packet);
832                        libtrace_ringbuffer_write(&trace->perpkt_threads[thread].rbuffer, packet);
833                        if (trace->config.tick_count && order % trace->config.tick_count == 0) {
834                                // Write ticks to everyone else
835                                libtrace_packet_t * pkts[trace->perpkt_thread_count];
836                                memset(pkts, 0, sizeof(void *) * trace->perpkt_thread_count);
837                                libtrace_ocache_alloc(&trace->packet_freelist, (void **) pkts, trace->perpkt_thread_count, trace->perpkt_thread_count);
838                                for (i = 0; i < trace->perpkt_thread_count; i++) {
839                                        pkts[i]->error = READ_TICK;
840                                        trace_packet_set_order(pkts[i], order);
841                                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, pkts[i]);
842                                }
843                        }
844                        pkt_skipped = 0;
845                } else {
846                        assert(!"Dropping a packet!!");
847                        pkt_skipped = 1; // Reuse that packet no one read it
848                }
849        }
850
851        /* Broadcast our last failed read to all threads */
852        for (i = 0; i < trace->perpkt_thread_count; i++) {
853                libtrace_packet_t * bcast;
854                if (i == trace->perpkt_thread_count - 1) {
855                        bcast = packet;
856                } else {
857                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &bcast, 1, 1);
858                        bcast->error = packet->error;
859                }
860                ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
861                if (trace->perpkt_threads[i].state != THREAD_FINISHED) {
862                        // Unlock early otherwise we could deadlock
863                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, bcast);
864                }
865                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
866        }
867
868        // We don't need to free the packet
869        thread_change_state(trace, t, THREAD_FINISHED, true);
870
871        libtrace_ocache_unregister_thread(&trace->packet_freelist);
872        if (trace->format->punregister_thread) {
873                trace->format->punregister_thread(trace, t);
874        }
875        print_memory_stats();
876
877        // TODO remove from TTABLE t sometime
878        pthread_exit(NULL);
879}
880
881/* Our simplest case when a thread becomes ready it can obtain an exclusive
882 * lock to read packets from the underlying trace.
883 */
884static int trace_pread_packet_first_in_first_served(libtrace_t *libtrace,
885                                                    libtrace_thread_t *t,
886                                                    libtrace_packet_t *packets[],
887                                                    size_t nb_packets) {
888        size_t i = 0;
889        //bool tick_hit = false;
890
891        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
892        /* Read nb_packets */
893        for (i = 0; i < nb_packets; ++i) {
894                if (libtrace_halt) {
895                        break;
896                }
897                packets[i]->error = trace_read_packet(libtrace, packets[i]);
898
899                if (packets[i]->error <= 0) {
900                        /* We'll catch this next time if we have already got packets */
901                        if ( i==0 ) {
902                                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
903                                return packets[i]->error;
904                        } else {
905                                break;
906                        }
907                }
908                /*
909                if (libtrace->config.tick_count && trace_packet_get_order(packets[i]) % libtrace->config.tick_count == 0) {
910                        tick_hit = true;
911                }*/
912        }
913        // Doing this inside the lock ensures the first packet is always
914        // recorded first
915        if (packets[0]->error > 0) {
916                store_first_packet(libtrace, packets[0], t);
917        }
918        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
919        /* XXX TODO this needs to be inband with packets, or we don't bother in this case
920        if (tick_hit) {
921                libtrace_message_t tick;
922                tick.additional.uint64 = trace_packet_get_order(packets[i]);
923                tick.code = MESSAGE_TICK;
924                trace_send_message_to_perpkts(libtrace, &tick);
925        } */
926        return i;
927}
928
929/**
930 * For the case that we have a dedicated hasher thread
931 * 1. We read a packet from our buffer
932 * 2. Move that into the packet provided (packet)
933 */
934inline static int trace_pread_packet_hasher_thread(libtrace_t *libtrace,
935                                                   libtrace_thread_t *t,
936                                                   libtrace_packet_t *packets[],
937                                                   size_t nb_packets) {
938        size_t i;
939
940        /* We store the last error message here */
941        if (t->format_data) {
942                return ((libtrace_packet_t *)t->format_data)->error;
943        }
944
945        // Always grab at least one
946        if (packets[0]) // Recycle the old get the new
947                libtrace_ocache_free(&libtrace->packet_freelist, (void **) packets, 1, 1);
948        packets[0] = libtrace_ringbuffer_read(&t->rbuffer);
949
950        if (packets[0]->error <= 0 && packets[0]->error != READ_TICK) {
951                return packets[0]->error;
952        }
953
954        for (i = 1; i < nb_packets; i++) {
955                if (packets[i]) // Recycle the old get the new
956                        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packets[i], 1, 1);
957                if (!libtrace_ringbuffer_try_read(&t->rbuffer, (void **) &packets[i])) {
958                        packets[i] = NULL;
959                        break;
960                }
961
962                /* We will return an error or EOF the next time around */
963                if (packets[i]->error <= 0 && packets[0]->error != READ_TICK) {
964                        /* The message case will be checked automatically -
965                           However other cases like EOF and error will only be
966                           sent once*/
967                        if (packets[i]->error != READ_MESSAGE) {
968                                assert(t->format_data == NULL);
969                                t->format_data = packets[i];
970                        }
971                        break;
972                }
973        }
974
975        return i;
976}
977
978/**
979 * For the first packet of each queue we keep a copy and note the system
980 * time it was received at.
981 *
982 * This is used for finding the first packet when playing back a trace
983 * in trace time. And can be used by real time applications to print
984 * results out every XXX seconds.
985 */
986void store_first_packet(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t)
987{
988        if (!t->recorded_first) {
989                libtrace_message_t mesg = {0, {.uint64=0}, NULL};
990                struct timeval tv;
991                libtrace_packet_t * dup;
992
993                /* We mark system time against a copy of the packet */
994                gettimeofday(&tv, NULL);
995                dup = trace_copy_packet(packet);
996
997                ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
998                libtrace->first_packets.packets[t->perpkt_num].packet = dup;
999                memcpy(&libtrace->first_packets.packets[t->perpkt_num].tv, &tv, sizeof(tv));
1000                libtrace->first_packets.count++;
1001
1002                /* Now update the first */
1003                if (libtrace->first_packets.count == 1) {
1004                        /* We the first entry hence also the first known packet */
1005                        libtrace->first_packets.first = t->perpkt_num;
1006                } else {
1007                        /* Check if we are newer than the previous 'first' packet */
1008                        size_t first = libtrace->first_packets.first;
1009                        if (trace_get_seconds(dup) <
1010                                trace_get_seconds(libtrace->first_packets.packets[first].packet))
1011                                libtrace->first_packets.first = t->perpkt_num;
1012                }
1013                ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
1014
1015                mesg.code = MESSAGE_FIRST_PACKET;
1016                trace_message_reporter(libtrace, &mesg);
1017                trace_message_perpkts(libtrace, &mesg);
1018                t->recorded_first = true;
1019        }
1020}
1021
1022DLLEXPORT int trace_get_first_packet(libtrace_t *libtrace,
1023                                     libtrace_thread_t *t,
1024                                     const libtrace_packet_t **packet,
1025                                     const struct timeval **tv)
1026{
1027        void * tmp;
1028        int ret = 0;
1029
1030        if (t) {
1031                if (t->type != THREAD_PERPKT || t->trace != libtrace)
1032                        return -1;
1033        }
1034
1035        /* Throw away these which we don't use */
1036        if (!packet)
1037                packet = (const libtrace_packet_t **) &tmp;
1038        if (!tv)
1039                tv = (const struct timeval **) &tmp;
1040
1041        ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
1042        if (t) {
1043                /* Get the requested thread */
1044                *packet = libtrace->first_packets.packets[t->perpkt_num].packet;
1045                *tv = &libtrace->first_packets.packets[t->perpkt_num].tv;
1046        } else if (libtrace->first_packets.count) {
1047                /* Get the first packet across all threads */
1048                *packet = libtrace->first_packets.packets[libtrace->first_packets.first].packet;
1049                *tv = &libtrace->first_packets.packets[libtrace->first_packets.first].tv;
1050                if (libtrace->first_packets.count == (size_t) libtrace->perpkt_thread_count) {
1051                        ret = 1;
1052                } else {
1053                        struct timeval curr_tv;
1054                        // If a second has passed since the first entry we will assume this is the very first packet
1055                        gettimeofday(&curr_tv, NULL);
1056                        if (curr_tv.tv_sec > (*tv)->tv_sec) {
1057                                if(curr_tv.tv_usec > (*tv)->tv_usec || curr_tv.tv_sec - (*tv)->tv_sec > 1) {
1058                                        ret = 1;
1059                                }
1060                        }
1061                }
1062        } else {
1063                *packet = NULL;
1064                *tv = NULL;
1065        }
1066        ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
1067        return ret;
1068}
1069
1070
1071DLLEXPORT uint64_t tv_to_usec(const struct timeval *tv)
1072{
1073        return (uint64_t) tv->tv_sec*1000000ull + (uint64_t) tv->tv_usec;
1074}
1075
1076inline static struct timeval usec_to_tv(uint64_t usec)
1077{
1078        struct timeval tv;
1079        tv.tv_sec = usec / 1000000;
1080        tv.tv_usec = usec % 1000000;
1081        return tv;
1082}
1083
1084/** Similar to delay_tracetime but send messages to all threads periodically */
1085static void* reporter_entry(void *data) {
1086        libtrace_message_t message = {0, {.uint64=0}, NULL};
1087        libtrace_t *trace = (libtrace_t *)data;
1088        libtrace_thread_t *t = &trace->reporter_thread;
1089
1090        /* Wait until all threads are started */
1091        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
1092        if (trace->state == STATE_ERROR) {
1093                thread_change_state(trace, t, THREAD_FINISHED, false);
1094                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1095                pthread_exit(NULL);
1096        }
1097        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1098
1099        if (trace->format->pregister_thread) {
1100                trace->format->pregister_thread(trace, t, false);
1101        }
1102
1103        send_message(trace, t, MESSAGE_STARTING, (libtrace_generic_t){0}, t);
1104        send_message(trace, t, MESSAGE_RESUMING, (libtrace_generic_t){0}, t);
1105
1106        while (!trace_has_finished(trace)) {
1107                if (trace->config.reporter_polling) {
1108                        if (libtrace_message_queue_try_get(&t->messages, &message) == LIBTRACE_MQ_FAILED)
1109                                message.code = MESSAGE_POST_REPORTER;
1110                } else {
1111                        libtrace_message_queue_get(&t->messages, &message);
1112                }
1113                switch (message.code) {
1114                        // Check for results
1115                        case MESSAGE_POST_REPORTER:
1116                                trace->combiner.read(trace, &trace->combiner);
1117                                break;
1118                        case MESSAGE_DO_PAUSE:
1119                                assert(trace->combiner.pause);
1120                                trace->combiner.pause(trace, &trace->combiner);
1121                                send_message(trace, t, MESSAGE_PAUSING,
1122                                                (libtrace_generic_t) {0}, t);
1123                                trace_thread_pause(trace, t);
1124                                send_message(trace, t, MESSAGE_RESUMING,
1125                                                (libtrace_generic_t) {0}, t);
1126                                break;
1127                default:
1128                        send_message(trace, t, message.code, message.data,
1129                                        message.sender);
1130                }
1131        }
1132
1133        // Flush out whats left now all our threads have finished
1134        trace->combiner.read_final(trace, &trace->combiner);
1135
1136        // GOODBYE
1137        send_message(trace, t, MESSAGE_PAUSING,(libtrace_generic_t) {0}, t);
1138        send_message(trace, t, MESSAGE_STOPPING,(libtrace_generic_t) {0}, t);
1139
1140        thread_change_state(trace, &trace->reporter_thread, THREAD_FINISHED, true);
1141        print_memory_stats();
1142        return NULL;
1143}
1144
1145/** Similar to delay_tracetime but send messages to all threads periodically */
1146static void* keepalive_entry(void *data) {
1147        struct timeval prev, next;
1148        libtrace_message_t message = {0, {.uint64=0}, NULL};
1149        libtrace_t *trace = (libtrace_t *)data;
1150        uint64_t next_release;
1151        libtrace_thread_t *t = &trace->keepalive_thread;
1152
1153        /* Wait until all threads are started */
1154        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
1155        if (trace->state == STATE_ERROR) {
1156                thread_change_state(trace, t, THREAD_FINISHED, false);
1157                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1158                pthread_exit(NULL);
1159        }
1160        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1161
1162        gettimeofday(&prev, NULL);
1163        message.code = MESSAGE_TICK_INTERVAL;
1164
1165        while (trace->state != STATE_FINISHED) {
1166                fd_set rfds;
1167                next_release = tv_to_usec(&prev) + (trace->config.tick_interval * 1000);
1168                gettimeofday(&next, NULL);
1169                if (next_release > tv_to_usec(&next)) {
1170                        next = usec_to_tv(next_release - tv_to_usec(&next));
1171                        // Wait for timeout or a message
1172                        FD_ZERO(&rfds);
1173                        FD_SET(libtrace_message_queue_get_fd(&t->messages), &rfds);
1174                        if (select(libtrace_message_queue_get_fd(&t->messages)+1, &rfds, NULL, NULL, &next) == 1) {
1175                                libtrace_message_t msg;
1176                                libtrace_message_queue_get(&t->messages, &msg);
1177                                assert(msg.code == MESSAGE_DO_STOP);
1178                                goto done;
1179                        }
1180                }
1181                prev = usec_to_tv(next_release);
1182                if (trace->state == STATE_RUNNING) {
1183                        message.data.uint64 = ((((uint64_t)prev.tv_sec) << 32) +
1184                                               (((uint64_t)prev.tv_usec << 32)/1000000));
1185                        trace_message_perpkts(trace, &message);
1186                }
1187        }
1188done:
1189
1190        thread_change_state(trace, t, THREAD_FINISHED, true);
1191        return NULL;
1192}
1193
1194/**
1195 * Delays a packets playback so the playback will be in trace time.
1196 * This may break early if a message becomes available.
1197 *
1198 * Requires the first packet for this thread to be received.
1199 * @param libtrace  The trace
1200 * @param packet    The packet to delay
1201 * @param t         The current thread
1202 * @return Either READ_MESSAGE(-2) or 0 is successful
1203 */
1204static inline int delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t) {
1205        struct timeval curr_tv, pkt_tv;
1206        uint64_t next_release = t->tracetime_offset_usec;
1207        uint64_t curr_usec;
1208
1209        if (!t->tracetime_offset_usec) {
1210                const libtrace_packet_t *first_pkt;
1211                const struct timeval *sys_tv;
1212                int64_t initial_offset;
1213                int stable = trace_get_first_packet(libtrace, NULL, &first_pkt, &sys_tv);
1214                assert(first_pkt);
1215                pkt_tv = trace_get_timeval(first_pkt);
1216                initial_offset = (int64_t)tv_to_usec(sys_tv) - (int64_t)tv_to_usec(&pkt_tv);
1217                /* In the unlikely case offset is 0, change it to 1 */
1218                if (stable)
1219                        t->tracetime_offset_usec = initial_offset ? initial_offset: 1;
1220                next_release = initial_offset;
1221        }
1222        /* next_release == offset */
1223        pkt_tv = trace_get_timeval(packet);
1224        next_release += tv_to_usec(&pkt_tv);
1225        gettimeofday(&curr_tv, NULL);
1226        curr_usec = tv_to_usec(&curr_tv);
1227        if (next_release > curr_usec) {
1228                int ret, mesg_fd = libtrace_message_queue_get_fd(&t->messages);
1229                struct timeval delay_tv = usec_to_tv(next_release-curr_usec);
1230                fd_set rfds;
1231                FD_ZERO(&rfds);
1232                FD_SET(mesg_fd, &rfds);
1233                // We need to wait
1234                ret = select(mesg_fd+1, &rfds, NULL, NULL, &delay_tv);
1235                if (ret == 0) {
1236                        return 0;
1237                } else if (ret > 0) {
1238                        return READ_MESSAGE;
1239                } else {
1240                        assert(!"trace_delay_packet: Unexpected return from select");
1241                }
1242        }
1243        return 0;
1244}
1245
1246/* Discards packets that don't match the filter.
1247 * Discarded packets are emptied and then moved to the end of the packet list.
1248 *
1249 * @param trace       The trace format, containing the filter
1250 * @param packets     An array of packets
1251 * @param nb_packets  The number of valid items in packets
1252 *
1253 * @return The number of packets that passed the filter, which are moved to
1254 *          the start of the packets array
1255 */
1256static inline size_t filter_packets(libtrace_t *trace,
1257                                    libtrace_packet_t **packets,
1258                                    size_t nb_packets) {
1259        size_t offset = 0;
1260        size_t i;
1261
1262        for (i = 0; i < nb_packets; ++i) {
1263                // The filter needs the trace attached to receive the link type
1264                packets[i]->trace = trace;
1265                if (trace_apply_filter(trace->filter, packets[i])) {
1266                        libtrace_packet_t *tmp;
1267                        tmp = packets[offset];
1268                        packets[offset++] = packets[i];
1269                        packets[i] = tmp;
1270                } else {
1271                        trace_fin_packet(packets[i]);
1272                }
1273        }
1274
1275        return offset;
1276}
1277
1278/* Read a batch of packets from the trace into a buffer.
1279 * Note that this function will block until a packet is read (or EOF is reached)
1280 *
1281 * @param libtrace    The trace
1282 * @param t           The thread
1283 * @param packets     An array of packets
1284 * @param nb_packets  The number of empty packets in packets
1285 * @return The number of packets read, 0 on EOF (or an error/message -1,-2).
1286 */
1287static int trace_pread_packet_wrapper(libtrace_t *libtrace,
1288                                      libtrace_thread_t *t,
1289                                      libtrace_packet_t *packets[],
1290                                      size_t nb_packets) {
1291        int i;
1292        assert(nb_packets);
1293        assert(libtrace && "libtrace is NULL in trace_read_packet()");
1294        if (trace_is_err(libtrace))
1295                return -1;
1296        if (!libtrace->started) {
1297                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
1298                              "You must call libtrace_start() before trace_read_packet()\n");
1299                return -1;
1300        }
1301
1302        if (libtrace->format->pread_packets) {
1303                int ret;
1304                for (i = 0; i < (int) nb_packets; ++i) {
1305                        assert(i[packets]);
1306                        if (!(packets[i]->buf_control==TRACE_CTRL_PACKET ||
1307                              packets[i]->buf_control==TRACE_CTRL_EXTERNAL)) {
1308                                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,
1309                                              "Packet passed to trace_read_packet() is invalid\n");
1310                                return -1;
1311                        }
1312                }
1313                do {
1314                        ret=libtrace->format->pread_packets(libtrace, t,
1315                                                            packets,
1316                                                            nb_packets);
1317                        /* Error, EOF or message? */
1318                        if (ret <= 0) {
1319                                return ret;
1320                        }
1321
1322                        if (libtrace->filter) {
1323                                int remaining;
1324                                remaining = filter_packets(libtrace,
1325                                                           packets, ret);
1326                                t->filtered_packets += ret - remaining;
1327                                ret = remaining;
1328                        }
1329                        for (i = 0; i < ret; ++i) {
1330                                /* We do not mark the packet against the trace,
1331                                 * before hand or after. After breaks DAG meta
1332                                 * packets and before is inefficient */
1333                                //packets[i]->trace = libtrace;
1334                                /* TODO IN FORMAT?? Like traditional libtrace */
1335                                if (libtrace->snaplen>0)
1336                                        trace_set_capture_length(packets[i],
1337                                                        libtrace->snaplen);
1338                                trace_packet_set_order(packets[i], trace_get_erf_timestamp(packets[i]));
1339                        }
1340                } while(ret == 0);
1341                return ret;
1342        }
1343        trace_set_err(libtrace, TRACE_ERR_UNSUPPORTED,
1344                      "This format does not support reading packets\n");
1345        return ~0U;
1346}
1347
1348/* Restarts a parallel trace, this is called from trace_pstart.
1349 * The libtrace lock is held upon calling this function.
1350 * Typically with a parallel trace the threads are not
1351 * killed rather.
1352 */
1353static int trace_prestart(libtrace_t * libtrace, void *global_blob,
1354                          libtrace_callback_set_t *per_packet_cbs, 
1355                          libtrace_callback_set_t *reporter_cbs) {
1356        int i, err = 0;
1357        if (libtrace->state != STATE_PAUSED) {
1358                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
1359                        "trace(%s) is not currently paused",
1360                              libtrace->uridata);
1361                return -1;
1362        }
1363
1364        assert(libtrace_parallel);
1365        assert(!libtrace->perpkt_thread_states[THREAD_RUNNING]);
1366
1367        /* Reset first packets */
1368        pthread_spin_lock(&libtrace->first_packets.lock);
1369        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1370                assert(!!libtrace->perpkt_threads[i].recorded_first == !!libtrace->first_packets.packets[i].packet);
1371                if (libtrace->first_packets.packets[i].packet) {
1372                        trace_destroy_packet(libtrace->first_packets.packets[i].packet);
1373                        libtrace->first_packets.packets[i].packet = NULL;
1374                        libtrace->first_packets.packets[i].tv.tv_sec = 0;
1375                        libtrace->first_packets.packets[i].tv.tv_usec = 0;
1376                        libtrace->first_packets.count--;
1377                        libtrace->perpkt_threads[i].recorded_first = false;
1378                }
1379        }
1380        assert(libtrace->first_packets.count == 0);
1381        libtrace->first_packets.first = 0;
1382        pthread_spin_unlock(&libtrace->first_packets.lock);
1383
1384        /* Reset delay */
1385        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1386                libtrace->perpkt_threads[i].tracetime_offset_usec = 0;
1387        }
1388
1389        /* Reset statistics */
1390        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1391                libtrace->perpkt_threads[i].accepted_packets = 0;
1392                libtrace->perpkt_threads[i].filtered_packets = 0;
1393        }
1394        libtrace->accepted_packets = 0;
1395        libtrace->filtered_packets = 0;
1396
1397        /* Update functions if requested */
1398        if(global_blob)
1399                libtrace->global_blob = global_blob;
1400
1401        if (per_packet_cbs) {
1402                if (libtrace->perpkt_cbs)
1403                        trace_destroy_callback_set(libtrace->perpkt_cbs);
1404                libtrace->perpkt_cbs = trace_create_callback_set();
1405                memcpy(libtrace->perpkt_cbs, per_packet_cbs, 
1406                                sizeof(libtrace_callback_set_t));
1407        }
1408
1409        if (reporter_cbs) {
1410                if (libtrace->reporter_cbs)
1411                        trace_destroy_callback_set(libtrace->reporter_cbs);
1412
1413                libtrace->reporter_cbs = trace_create_callback_set();
1414                memcpy(libtrace->reporter_cbs, reporter_cbs, 
1415                                sizeof(libtrace_callback_set_t));
1416        }
1417
1418        if (trace_is_parallel(libtrace)) {
1419                err = libtrace->format->pstart_input(libtrace);
1420        } else {
1421                if (libtrace->format->start_input) {
1422                        err = libtrace->format->start_input(libtrace);
1423                }
1424        }
1425
1426        if (err == 0) {
1427                libtrace->started = true;
1428                libtrace_change_state(libtrace, STATE_RUNNING, false);
1429        }
1430        return err;
1431}
1432
1433/**
1434 * @return the number of CPU cores on the machine. -1 if unknown.
1435 */
1436SIMPLE_FUNCTION static int get_nb_cores() {
1437        int numCPU;
1438#ifdef _SC_NPROCESSORS_ONLN
1439        /* Most systems do this now */
1440        numCPU = sysconf(_SC_NPROCESSORS_ONLN);
1441
1442#else
1443        int mib[] = {CTL_HW, HW_AVAILCPU};
1444        size_t len = sizeof(numCPU);
1445
1446        /* get the number of CPUs from the system */
1447        sysctl(mib, 2, &numCPU, &len, NULL, 0);
1448#endif
1449        return numCPU <= 0 ? 1 : numCPU;
1450}
1451
1452/**
1453 * Verifies the configuration and sets default values for any values not
1454 * specified by the user.
1455 */
1456static void verify_configuration(libtrace_t *libtrace) {
1457
1458        if (libtrace->config.hasher_queue_size <= 0)
1459                libtrace->config.hasher_queue_size = 1000;
1460
1461        if (libtrace->config.perpkt_threads <= 0) {
1462                libtrace->perpkt_thread_count = get_nb_cores();
1463                if (libtrace->perpkt_thread_count <= 0)
1464                        // Lets just use one
1465                        libtrace->perpkt_thread_count = 1;
1466        } else {
1467                libtrace->perpkt_thread_count = libtrace->config.perpkt_threads;
1468        }
1469
1470        if (libtrace->config.reporter_thold <= 0)
1471                libtrace->config.reporter_thold = 100;
1472        if (libtrace->config.burst_size <= 0)
1473                libtrace->config.burst_size = 32;
1474        if (libtrace->config.thread_cache_size <= 0)
1475                libtrace->config.thread_cache_size = 64;
1476        if (libtrace->config.cache_size <= 0)
1477                libtrace->config.cache_size = (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count;
1478
1479        if (libtrace->config.cache_size <
1480                (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count)
1481                fprintf(stderr, "WARNING deadlocks may occur and extra memory allocating buffer sizes (packet_freelist_size) mismatched\n");
1482
1483        if (libtrace->combiner.initialise == NULL && libtrace->combiner.publish == NULL)
1484                libtrace->combiner = combiner_unordered;
1485
1486        /* Figure out if we are using a dedicated hasher thread? */
1487        if (libtrace->hasher && libtrace->perpkt_thread_count > 1) {
1488                libtrace->hasher_thread.type = THREAD_HASHER;
1489        }
1490}
1491
1492/**
1493 * Starts a libtrace_thread, including allocating memory for messaging.
1494 * Threads are expected to wait until the libtrace look is released.
1495 * Hence why we don't init structures until later.
1496 *
1497 * @param trace The trace the thread is associated with
1498 * @param t The thread that is filled when the thread is started
1499 * @param type The type of thread
1500 * @param start_routine The entry location of the thread
1501 * @param perpkt_num The perpkt thread number (should be set -1 if not perpkt)
1502 * @param name For debugging purposes set the threads name (Optional)
1503 *
1504 * @return 0 on success or -1 upon error in which case the libtrace error is set.
1505 *         In this situation the thread structure is zeroed.
1506 */
1507static int trace_start_thread(libtrace_t *trace,
1508                       libtrace_thread_t *t,
1509                       enum thread_types type,
1510                       void *(*start_routine) (void *),
1511                       int perpkt_num,
1512                       const char *name) {
1513#ifdef __linux__
1514        pthread_attr_t attrib;
1515        cpu_set_t cpus;
1516        int i;
1517#endif
1518        int ret;
1519        assert(t->type == THREAD_EMPTY);
1520        t->trace = trace;
1521        t->ret = NULL;
1522        t->user_data = NULL;
1523        t->type = type;
1524        t->state = THREAD_RUNNING;
1525
1526        assert(name);
1527
1528#ifdef __linux__
1529        CPU_ZERO(&cpus);
1530        for (i = 0; i < get_nb_cores(); i++)
1531                CPU_SET(i, &cpus);
1532        pthread_attr_init(&attrib);
1533        pthread_attr_setaffinity_np(&attrib, sizeof(cpus), &cpus);
1534        ret = pthread_create(&t->tid, &attrib, start_routine, (void *) trace);
1535        pthread_attr_destroy(&attrib);
1536#else
1537        ret = pthread_create(&t->tid, NULL, start_routine, (void *) trace);
1538#endif
1539        if (ret != 0) {
1540                libtrace_zero_thread(t);
1541                trace_set_err(trace, ret, "Failed to create a thread of type=%d\n", type);
1542                return -1;
1543        }
1544        libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
1545        if (trace_has_dedicated_hasher(trace) && type == THREAD_PERPKT) {
1546                libtrace_ringbuffer_init(&t->rbuffer,
1547                                         trace->config.hasher_queue_size,
1548                                         trace->config.hasher_polling?
1549                                                 LIBTRACE_RINGBUFFER_POLLING:
1550                                                 LIBTRACE_RINGBUFFER_BLOCKING);
1551        }
1552#if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__)
1553        if(name)
1554                pthread_setname_np(t->tid, name);
1555#endif
1556        t->perpkt_num = perpkt_num;
1557        return 0;
1558}
1559
1560/** Parses the environment variable LIBTRACE_CONF into the supplied
1561 * configuration structure.
1562 *
1563 * @param[in,out] libtrace The trace from which we determine the URI and set
1564 * the configuration.
1565 *
1566 * We search for 3 environment variables and apply them to the config in the
1567 * following order. Such that the first has the lowest priority.
1568 *
1569 * 1. LIBTRACE_CONF, The global environment configuration
1570 * 2. LIBTRACE_CONF_<FORMAT>, Applied to a given format
1571 * 3. LIBTRACE_CONF_<FORMAT_URI>, Applied the specified trace
1572 *
1573 * E.g.
1574 * - int:eth0 would match LIBTRACE_CONF, LIBTRACE_CONF_INT, LIBTRACE_CONF_INT_ETH0
1575 * - dag:/dev/dag0,0 would match LIBTRACE_CONF, LIBTRACE_CONF_DAG, LIBTRACE_CONF_DAG__DEV_DAG0_0
1576 * - test.erf would match LIBTRACE_CONF, LIBTRACE_CONF_ERF, LIBTRACE_CONF_ERF_TEST_ERF
1577 *
1578 * @note All environment variables names MUST only contian
1579 * [A-Z], [0-9] and [_] (underscore) and not start with a number. Any characters
1580 * outside of this range should be captilised if possible or replaced with an
1581 * underscore.
1582 */
1583static void parse_env_config (libtrace_t *libtrace) {
1584        char env_name[1024] = "LIBTRACE_CONF_";
1585        size_t len = strlen(env_name);
1586        size_t mark = 0;
1587        size_t i;
1588        char * env;
1589
1590        /* Make our compound string */
1591        strncpy(&env_name[len], libtrace->format->name, sizeof(env_name) - len);
1592        len += strlen(libtrace->format->name);
1593        strncpy(&env_name[len], ":", sizeof(env_name) - len);
1594        len += 1;
1595        strncpy(&env_name[len], libtrace->uridata, sizeof(env_name) - len);
1596
1597        /* env names are allowed to be A-Z (CAPS) 0-9 and _ */
1598        for (i = 0; env_name[i] != 0; ++i) {
1599                env_name[i] = toupper(env_name[i]);
1600                if(env_name[i] == ':') {
1601                        mark = i;
1602                }
1603                if (!( (env_name[i] >= 'A' && env_name[i] <= 'Z') ||
1604                       (env_name[i] >= '0' && env_name[i] <= '9') )) {
1605                        env_name[i] = '_';
1606                }
1607        }
1608
1609        /* First apply global env settings LIBTRACE_CONF */
1610        env = getenv("LIBTRACE_CONF");
1611        if (env)
1612        {
1613                printf("Got env %s", env);
1614                trace_set_configuration(libtrace, env);
1615        }
1616
1617        /* Then format settings LIBTRACE_CONF_<FORMAT> */
1618        if (mark != 0) {
1619                env_name[mark] = 0;
1620                env = getenv(env_name);
1621                if (env) {
1622                        trace_set_configuration(libtrace, env);
1623                }
1624                env_name[mark] = '_';
1625        }
1626
1627        /* Finally this specific trace LIBTRACE_CONF_<FORMAT_URI> */
1628        env = getenv(env_name);
1629        if (env) {
1630                trace_set_configuration(libtrace, env);
1631        }
1632}
1633
1634DLLEXPORT bool trace_is_parallel(libtrace_t * libtrace) {
1635        if (libtrace->state == STATE_NEW)
1636                return trace_supports_parallel(libtrace);
1637        return libtrace->pread == trace_pread_packet_wrapper;
1638}
1639
1640DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob,
1641                           libtrace_callback_set_t *per_packet_cbs,
1642                           libtrace_callback_set_t *reporter_cbs) {
1643        int i;
1644        int ret = -1;
1645        char name[16];
1646        sigset_t sig_before, sig_block_all;
1647        assert(libtrace);
1648
1649        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1650        if (trace_is_err(libtrace)) {
1651                goto cleanup_none;
1652        }
1653
1654        if (libtrace->state == STATE_PAUSED) {
1655                ret = trace_prestart(libtrace, global_blob, per_packet_cbs, 
1656                                reporter_cbs);
1657                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1658                return ret;
1659        }
1660
1661        if (libtrace->state != STATE_NEW) {
1662                trace_set_err(libtrace, TRACE_ERR_BAD_STATE, "trace_pstart "
1663                              "should be called on a NEW or PAUSED trace but "
1664                              "instead was called from %s",
1665                              get_trace_state_name(libtrace->state));
1666                goto cleanup_none;
1667        }
1668
1669        /* Store the user defined things against the trace */
1670        libtrace->global_blob = global_blob;
1671
1672        /* Save a copy of the callbacks in case the user tries to change them
1673         * on us later */
1674        if (!per_packet_cbs) {
1675                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "trace_pstart "
1676                                "requires a non-NULL set of per packet "
1677                                "callbacks.");
1678                goto cleanup_none;
1679        }
1680
1681        if (per_packet_cbs->message_packet == NULL) {
1682                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "The per "
1683                                "packet callbacks must include a handler "
1684                                "for a packet. Please set this using "
1685                                "trace_set_packet_cb().");
1686                goto cleanup_none;
1687        }
1688
1689        libtrace->perpkt_cbs = trace_create_callback_set();
1690        memcpy(libtrace->perpkt_cbs, per_packet_cbs, sizeof(libtrace_callback_set_t));
1691       
1692        if (reporter_cbs) {
1693                libtrace->reporter_cbs = trace_create_callback_set();
1694                memcpy(libtrace->reporter_cbs, reporter_cbs, sizeof(libtrace_callback_set_t));
1695        }
1696
1697       
1698
1699
1700        /* And zero other fields */
1701        for (i = 0; i < THREAD_STATE_MAX; ++i) {
1702                libtrace->perpkt_thread_states[i] = 0;
1703        }
1704        libtrace->first_packets.first = 0;
1705        libtrace->first_packets.count = 0;
1706        libtrace->first_packets.packets = NULL;
1707        libtrace->perpkt_threads = NULL;
1708        /* Set a global which says we are using a parallel trace. This is
1709         * for backwards compatability due to changes when destroying packets */
1710        libtrace_parallel = 1;
1711
1712        /* Parses configuration passed through environment variables */
1713        parse_env_config(libtrace);
1714        verify_configuration(libtrace);
1715
1716        ret = -1;
1717        /* Try start the format - we prefer parallel over single threaded, as
1718         * these formats should support messages better */
1719        if (trace_supports_parallel(libtrace) &&
1720            !trace_has_dedicated_hasher(libtrace)) {
1721                ret = libtrace->format->pstart_input(libtrace);
1722                libtrace->pread = trace_pread_packet_wrapper;
1723        }
1724        if (ret != 0) {
1725                if (libtrace->format->start_input) {
1726                        ret = libtrace->format->start_input(libtrace);
1727                }
1728                if (libtrace->perpkt_thread_count > 1)
1729                        libtrace->pread = trace_pread_packet_first_in_first_served;
1730                else
1731                        /* Use standard read_packet */
1732                        libtrace->pread = NULL;
1733        }
1734
1735        if (ret != 0) {
1736                goto cleanup_none;
1737        }
1738
1739        /* --- Start all the threads we need --- */
1740        /* Disable signals because it is inherited by the threads we start */
1741        sigemptyset(&sig_block_all);
1742        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_block_all, &sig_before), == 0);
1743
1744        /* If we need a hasher thread start it
1745         * Special Case: If single threaded we don't need a hasher
1746         */
1747        if (trace_has_dedicated_hasher(libtrace)) {
1748                libtrace->hasher_thread.type = THREAD_EMPTY;
1749                ret = trace_start_thread(libtrace, &libtrace->hasher_thread,
1750                                   THREAD_HASHER, hasher_entry, -1,
1751                                   "hasher-thread");
1752                if (ret != 0)
1753                        goto cleanup_started;
1754                libtrace->pread = trace_pread_packet_hasher_thread;
1755        } else {
1756                libtrace->hasher_thread.type = THREAD_EMPTY;
1757        }
1758
1759        /* Start up our perpkt threads */
1760        libtrace->perpkt_threads = calloc(sizeof(libtrace_thread_t),
1761                                          libtrace->perpkt_thread_count);
1762        if (!libtrace->perpkt_threads) {
1763                trace_set_err(libtrace, errno, "trace_pstart "
1764                              "failed to allocate memory.");
1765                goto cleanup_threads;
1766        }
1767        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1768                snprintf(name, sizeof(name), "perpkt-%d", i);
1769                libtrace_zero_thread(&libtrace->perpkt_threads[i]);
1770                ret = trace_start_thread(libtrace, &libtrace->perpkt_threads[i],
1771                                   THREAD_PERPKT, perpkt_threads_entry, i,
1772                                   name);
1773                if (ret != 0)
1774                        goto cleanup_threads;
1775        }
1776
1777        /* Start the reporter thread */
1778        if (reporter_cbs) {
1779                if (libtrace->combiner.initialise)
1780                        libtrace->combiner.initialise(libtrace, &libtrace->combiner);
1781                ret = trace_start_thread(libtrace, &libtrace->reporter_thread,
1782                                   THREAD_REPORTER, reporter_entry, -1,
1783                                   "reporter_thread");
1784                if (ret != 0)
1785                        goto cleanup_threads;
1786        }
1787
1788        /* Start the keepalive thread */
1789        if (libtrace->config.tick_interval > 0) {
1790                ret = trace_start_thread(libtrace, &libtrace->keepalive_thread,
1791                                   THREAD_KEEPALIVE, keepalive_entry, -1,
1792                                   "keepalive_thread");
1793                if (ret != 0)
1794                        goto cleanup_threads;
1795        }
1796
1797        /* Init other data structures */
1798        libtrace->perpkt_thread_states[THREAD_RUNNING] = libtrace->perpkt_thread_count;
1799        ASSERT_RET(pthread_spin_init(&libtrace->first_packets.lock, 0), == 0);
1800        libtrace->first_packets.packets = calloc(libtrace->perpkt_thread_count,
1801                                                 sizeof(*libtrace->first_packets.packets));
1802        if (libtrace->first_packets.packets == NULL) {
1803                trace_set_err(libtrace, errno, "trace_pstart "
1804                              "failed to allocate memory.");
1805                goto cleanup_threads;
1806        }
1807
1808        if (libtrace_ocache_init(&libtrace->packet_freelist,
1809                             (void* (*)()) trace_create_packet,
1810                             (void (*)(void *))trace_destroy_packet,
1811                             libtrace->config.thread_cache_size,
1812                             libtrace->config.cache_size * 4,
1813                             libtrace->config.fixed_count) != 0) {
1814                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "trace_pstart "
1815                              "failed to allocate ocache.");
1816                goto cleanup_threads;
1817        }
1818
1819        /* Threads don't start */
1820        libtrace->started = true;
1821        libtrace_change_state(libtrace, STATE_RUNNING, false);
1822
1823        ret = 0;
1824        goto success;
1825cleanup_threads:
1826        if (libtrace->first_packets.packets) {
1827                free(libtrace->first_packets.packets);
1828                libtrace->first_packets.packets = NULL;
1829        }
1830        libtrace_change_state(libtrace, STATE_ERROR, false);
1831        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1832        if (libtrace->hasher_thread.type == THREAD_HASHER) {
1833                pthread_join(libtrace->hasher_thread.tid, NULL);
1834                libtrace_zero_thread(&libtrace->hasher_thread);
1835        }
1836
1837        if (libtrace->perpkt_threads) {
1838                for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1839                        if (libtrace->perpkt_threads[i].type == THREAD_PERPKT) {
1840                                pthread_join(libtrace->perpkt_threads[i].tid, NULL);
1841                                libtrace_zero_thread(&libtrace->perpkt_threads[i]);
1842                        } else break;
1843                }
1844                free(libtrace->perpkt_threads);
1845                libtrace->perpkt_threads = NULL;
1846        }
1847
1848        if (libtrace->reporter_thread.type == THREAD_REPORTER) {
1849                pthread_join(libtrace->reporter_thread.tid, NULL);
1850                libtrace_zero_thread(&libtrace->reporter_thread);
1851        }
1852
1853        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
1854                pthread_join(libtrace->keepalive_thread.tid, NULL);
1855                libtrace_zero_thread(&libtrace->keepalive_thread);
1856        }
1857        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1858        libtrace_change_state(libtrace, STATE_NEW, false);
1859        assert(libtrace->perpkt_thread_states[THREAD_RUNNING] == 0);
1860        libtrace->perpkt_thread_states[THREAD_FINISHED] = 0;
1861cleanup_started:
1862        if (libtrace->pread == trace_pread_packet_wrapper) {
1863                if (libtrace->format->ppause_input)
1864                        libtrace->format->ppause_input(libtrace);
1865        } else {
1866                if (libtrace->format->pause_input)
1867                        libtrace->format->pause_input(libtrace);
1868        }
1869        ret = -1;
1870success:
1871        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_before, NULL), == 0);
1872cleanup_none:
1873        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1874        return ret;
1875}
1876
1877DLLEXPORT int trace_set_starting_cb(libtrace_callback_set_t *cbset,
1878                fn_cb_starting handler) {
1879        cbset->message_starting = handler;
1880        return 0;
1881}
1882
1883DLLEXPORT int trace_set_pausing_cb(libtrace_callback_set_t *cbset,
1884                fn_cb_dataless handler) {
1885        cbset->message_pausing = handler;
1886        return 0;
1887}
1888
1889DLLEXPORT int trace_set_resuming_cb(libtrace_callback_set_t *cbset,
1890                fn_cb_dataless handler) {
1891        cbset->message_resuming = handler;
1892        return 0;
1893}
1894
1895DLLEXPORT int trace_set_stopping_cb(libtrace_callback_set_t *cbset,
1896                fn_cb_dataless handler) {
1897        cbset->message_stopping = handler;
1898        return 0;
1899}
1900
1901DLLEXPORT int trace_set_packet_cb(libtrace_callback_set_t *cbset,
1902                fn_cb_packet handler) {
1903        cbset->message_packet = handler;
1904        return 0;
1905}
1906
1907DLLEXPORT int trace_set_first_packet_cb(libtrace_callback_set_t *cbset,
1908                fn_cb_first_packet handler) {
1909        cbset->message_first_packet = handler;
1910        return 0;
1911}
1912
1913DLLEXPORT int trace_set_tick_count_cb(libtrace_callback_set_t *cbset,
1914                fn_cb_tick handler) {
1915        cbset->message_tick_count = handler;
1916        return 0;
1917}
1918
1919DLLEXPORT int trace_set_tick_interval_cb(libtrace_callback_set_t *cbset,
1920                fn_cb_tick handler) {
1921        cbset->message_tick_interval = handler;
1922        return 0;
1923}
1924
1925DLLEXPORT int trace_set_result_cb(libtrace_callback_set_t *cbset,
1926                fn_cb_result handler) {
1927        cbset->message_result = handler;
1928        return 0;
1929}
1930
1931DLLEXPORT int trace_set_user_message_cb(libtrace_callback_set_t *cbset,
1932                fn_cb_usermessage handler) {
1933        cbset->message_user = handler;
1934        return 0;
1935}
1936
1937/*
1938 * Pauses a trace, this should only be called by the main thread
1939 * 1. Set started = false
1940 * 2. All perpkt threads are paused waiting on a condition var
1941 * 3. Then call ppause on the underlying format if found
1942 * 4. The traces state is paused
1943 *
1944 * Once done you should be able to modify the trace setup and call pstart again
1945 * TODO add support to change the number of threads.
1946 */
1947DLLEXPORT int trace_ppause(libtrace_t *libtrace)
1948{
1949        libtrace_thread_t *t;
1950        int i;
1951        assert(libtrace);
1952
1953        t = get_thread_table(libtrace);
1954        // Check state from within the lock if we are going to change it
1955        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1956
1957        /* If we are already paused, just treat this as a NOOP */
1958        if (libtrace->state == STATE_PAUSED) {
1959                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1960                return 0;
1961        }
1962        if (!libtrace->started || libtrace->state != STATE_RUNNING) {
1963                trace_set_err(libtrace,TRACE_ERR_BAD_STATE, "You must call trace_start() before calling trace_ppause()");
1964                return -1;
1965        }
1966
1967        libtrace_change_state(libtrace, STATE_PAUSING, false);
1968        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1969
1970        // Special case handle the hasher thread case
1971        if (trace_has_dedicated_hasher(libtrace)) {
1972                if (libtrace->config.debug_state)
1973                        fprintf(stderr, "Hasher thread is running, asking it to pause ...");
1974                libtrace_message_t message = {0, {.uint64=0}, NULL};
1975                message.code = MESSAGE_DO_PAUSE;
1976                trace_message_thread(libtrace, &libtrace->hasher_thread, &message);
1977                // Wait for it to pause
1978                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1979                while (libtrace->hasher_thread.state == THREAD_RUNNING) {
1980                        ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
1981                }
1982                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1983                if (libtrace->config.debug_state)
1984                        fprintf(stderr, " DONE\n");
1985        }
1986
1987        if (libtrace->config.debug_state)
1988                fprintf(stderr, "Asking perpkt threads to pause ...");
1989        // Stop threads, skip this one if it's a perpkt
1990        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1991                if (&libtrace->perpkt_threads[i] != t) {
1992                        libtrace_message_t message = {0, {.uint64=0}, NULL};
1993                        message.code = MESSAGE_DO_PAUSE;
1994                        ASSERT_RET(trace_message_thread(libtrace, &libtrace->perpkt_threads[i], &message), != -1);
1995                        if(trace_has_dedicated_hasher(libtrace)) {
1996                                // The hasher has stopped and other threads have messages waiting therefore
1997                                // If the queues are empty the other threads would have no data
1998                                // So send some message packets to simply ask the threads to check
1999                                // We are the only writer since hasher has paused
2000                                libtrace_packet_t *pkt;
2001                                libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &pkt, 1, 1);
2002                                pkt->error = READ_MESSAGE;
2003                                libtrace_ringbuffer_write(&libtrace->perpkt_threads[i].rbuffer, pkt);
2004                        }
2005                } else {
2006                        fprintf(stderr, "Mapper threads should not be used to pause a trace this could cause any number of problems!!\n");
2007                }
2008        }
2009
2010        if (t) {
2011                // A perpkt is doing the pausing, interesting, fake an extra thread paused
2012                // We rely on the user to *not* return before starting the trace again
2013                thread_change_state(libtrace, t, THREAD_PAUSED, true);
2014        }
2015
2016        // Wait for all threads to pause
2017        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
2018        while(libtrace->perpkt_thread_states[THREAD_RUNNING]) {
2019                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
2020        }
2021        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2022
2023        if (libtrace->config.debug_state)
2024                fprintf(stderr, " DONE\n");
2025
2026        // Deal with the reporter
2027        if (trace_has_reporter(libtrace)) {
2028                if (libtrace->config.debug_state)
2029                        fprintf(stderr, "Reporter thread is running, asking it to pause ...");
2030                if (pthread_equal(pthread_self(), libtrace->reporter_thread.tid)) {
2031                        libtrace->combiner.pause(libtrace, &libtrace->combiner);
2032                        thread_change_state(libtrace, &libtrace->reporter_thread, THREAD_PAUSED, true);
2033               
2034                } else {
2035                        libtrace_message_t message = {0, {.uint64=0}, NULL};
2036                        message.code = MESSAGE_DO_PAUSE;
2037                        trace_message_thread(libtrace, &libtrace->reporter_thread, &message);
2038                        // Wait for it to pause
2039                        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
2040                        while (libtrace->reporter_thread.state == THREAD_RUNNING) {
2041                                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
2042                        }
2043                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2044                }
2045                if (libtrace->config.debug_state)
2046                        fprintf(stderr, " DONE\n");
2047        }
2048
2049        /* Cache values before we pause */
2050        if (libtrace->stats == NULL)
2051                libtrace->stats = trace_create_statistics();
2052        // Save the statistics against the trace
2053        trace_get_statistics(libtrace, NULL);
2054        if (trace_is_parallel(libtrace)) {
2055                libtrace->started = false;
2056                if (libtrace->format->ppause_input)
2057                        libtrace->format->ppause_input(libtrace);
2058                // TODO What happens if we don't have pause input??
2059        } else {
2060                int err;
2061                err = trace_pause(libtrace);
2062                // We should handle this a bit better
2063                if (err)
2064                        return err;
2065        }
2066
2067        // Only set as paused after the pause has been called on the trace
2068        libtrace_change_state(libtrace, STATE_PAUSED, true);
2069        return 0;
2070}
2071
2072/**
2073 * Stop trace finish prematurely as though it meet an EOF
2074 * This should only be called by the main thread
2075 * 1. Calls ppause
2076 * 2. Sends a message asking for threads to finish
2077 * 3. Releases threads which will pause
2078 */
2079DLLEXPORT int trace_pstop(libtrace_t *libtrace)
2080{
2081        int i, err;
2082        libtrace_message_t message = {0, {.uint64=0}, NULL};
2083        assert(libtrace);
2084
2085        // Ensure all threads have paused and the underlying trace format has
2086        // been closed and all packets associated are cleaned up
2087        // Pause will do any state checks for us
2088        err = trace_ppause(libtrace);
2089        if (err)
2090                return err;
2091
2092        // Now send a message asking the threads to stop
2093        // This will be retrieved before trying to read another packet
2094
2095        message.code = MESSAGE_DO_STOP;
2096        trace_message_perpkts(libtrace, &message);
2097        if (trace_has_dedicated_hasher(libtrace))
2098                trace_message_thread(libtrace, &libtrace->hasher_thread, &message);
2099
2100        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2101                trace_message_thread(libtrace, &libtrace->perpkt_threads[i], &message);
2102        }
2103
2104        /* Now release the threads and let them stop - when the threads finish
2105         * the state will be set to finished */
2106        libtrace_change_state(libtrace, STATE_FINISHING, true);
2107        return 0;
2108}
2109
2110DLLEXPORT int trace_set_hasher(libtrace_t *trace, enum hasher_types type, fn_hasher hasher, void *data) {
2111        int ret = -1;
2112        if ((type == HASHER_CUSTOM && !hasher) || (type == HASHER_BALANCE && hasher)) {
2113                return -1;
2114        }
2115
2116        // Save the requirements
2117        trace->hasher_type = type;
2118        if (hasher) {
2119                trace->hasher = hasher;
2120                trace->hasher_data = data;
2121        } else {
2122                trace->hasher = NULL;
2123                trace->hasher_data = NULL;
2124        }
2125
2126        // Try push this to hardware - NOTE hardware could do custom if
2127        // there is a more efficient way to apply it, in this case
2128        // it will simply grab the function out of libtrace_t
2129        if (trace_supports_parallel(trace) && trace->format->config_input)
2130                ret = trace->format->config_input(trace, TRACE_OPTION_HASHER, &type);
2131
2132        if (ret == -1) {
2133                /* We have to deal with this ourself */
2134                if (!hasher) {
2135                        switch (type)
2136                        {
2137                                case HASHER_CUSTOM:
2138                                case HASHER_BALANCE:
2139                                        return 0;
2140                                case HASHER_BIDIRECTIONAL:
2141                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
2142                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
2143                                        toeplitz_init_config(trace->hasher_data, 1);
2144                                        return 0;
2145                                case HASHER_UNIDIRECTIONAL:
2146                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
2147                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
2148                                        toeplitz_init_config(trace->hasher_data, 0);
2149                                        return 0;
2150                        }
2151                        return -1;
2152                }
2153        } else {
2154                /* If the hasher is hardware we zero out the hasher and hasher
2155                 * data fields - only if we need a hasher do we do this */
2156                trace->hasher = NULL;
2157                trace->hasher_data = NULL;
2158        }
2159
2160        return 0;
2161}
2162
2163// Waits for all threads to finish
2164DLLEXPORT void trace_join(libtrace_t *libtrace) {
2165        int i;
2166
2167        /* Firstly wait for the perpkt threads to finish, since these are
2168         * user controlled */
2169        for (i=0; i< libtrace->perpkt_thread_count; i++) {
2170                ASSERT_RET(pthread_join(libtrace->perpkt_threads[i].tid, NULL), == 0);
2171                // So we must do our best effort to empty the queue - so
2172                // the producer (or any other threads) don't block.
2173                libtrace_packet_t * packet;
2174                assert(libtrace->perpkt_threads[i].state == THREAD_FINISHED);
2175                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
2176                        if (packet) // This could be NULL iff the perpkt finishes early
2177                                trace_destroy_packet(packet);
2178        }
2179
2180        /* Now the hasher */
2181        if (trace_has_dedicated_hasher(libtrace)) {
2182                pthread_join(libtrace->hasher_thread.tid, NULL);
2183                assert(libtrace->hasher_thread.state == THREAD_FINISHED);
2184        }
2185
2186        // Now that everything is finished nothing can be touching our
2187        // buffers so clean them up
2188        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2189                // Its possible 1 packet got added by the reporter (or 1 per any other thread) since we cleaned up
2190                // if they lost timeslice before-during a write
2191                libtrace_packet_t * packet;
2192                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
2193                        trace_destroy_packet(packet);
2194                if (trace_has_dedicated_hasher(libtrace)) {
2195                        assert(libtrace_ringbuffer_is_empty(&libtrace->perpkt_threads[i].rbuffer));
2196                        libtrace_ringbuffer_destroy(&libtrace->perpkt_threads[i].rbuffer);
2197                }
2198                // Cannot destroy vector yet, this happens with trace_destroy
2199        }
2200
2201        if (trace_has_reporter(libtrace)) {
2202                pthread_join(libtrace->reporter_thread.tid, NULL);
2203                assert(libtrace->reporter_thread.state == THREAD_FINISHED);
2204        }
2205
2206        // Wait for the tick (keepalive) thread if it has been started
2207        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
2208                libtrace_message_t msg = {0, {.uint64=0}, NULL};
2209                msg.code = MESSAGE_DO_STOP;
2210                trace_message_thread(libtrace, &libtrace->keepalive_thread, &msg);
2211                pthread_join(libtrace->keepalive_thread.tid, NULL);
2212        }
2213
2214        libtrace_change_state(libtrace, STATE_JOINED, true);
2215        print_memory_stats();
2216}
2217
2218DLLEXPORT int libtrace_thread_get_message_count(libtrace_t * libtrace,
2219                                                libtrace_thread_t *t)
2220{
2221        int ret;
2222        if (t == NULL)
2223                t = get_thread_descriptor(libtrace);
2224        if (t == NULL)
2225                return -1;
2226        ret = libtrace_message_queue_count(&t->messages);
2227        return ret < 0 ? 0 : ret;
2228}
2229
2230DLLEXPORT int libtrace_thread_get_message(libtrace_t * libtrace,
2231                                          libtrace_thread_t *t,
2232                                          libtrace_message_t * message)
2233{
2234        int ret;
2235        if (t == NULL)
2236                t = get_thread_descriptor(libtrace);
2237        if (t == NULL)
2238                return -1;
2239        ret = libtrace_message_queue_get(&t->messages, message);
2240        return ret < 0 ? 0 : ret;
2241}
2242
2243DLLEXPORT int libtrace_thread_try_get_message(libtrace_t * libtrace,
2244                                              libtrace_thread_t *t,
2245                                              libtrace_message_t * message)
2246{
2247        if (t == NULL)
2248                t = get_thread_descriptor(libtrace);
2249        if (t == NULL)
2250                return -1;
2251        if (libtrace_message_queue_try_get(&t->messages, message) != LIBTRACE_MQ_FAILED)
2252                return 0;
2253        else
2254                return -1;
2255}
2256
2257DLLEXPORT int trace_message_thread(libtrace_t * libtrace, libtrace_thread_t *t, libtrace_message_t * message)
2258{
2259        int ret;
2260        if (!message->sender)
2261                message->sender = get_thread_descriptor(libtrace);
2262
2263        ret = libtrace_message_queue_put(&t->messages, message);
2264        return ret < 0 ? 0 : ret;
2265}
2266
2267DLLEXPORT int trace_message_reporter(libtrace_t * libtrace, libtrace_message_t * message)
2268{
2269        if (!trace_has_reporter(libtrace) ||
2270            !(libtrace->reporter_thread.state == THREAD_RUNNING
2271              || libtrace->reporter_thread.state == THREAD_PAUSED))
2272                return -1;
2273
2274        return trace_message_thread(libtrace, &libtrace->reporter_thread, message);
2275}
2276
2277DLLEXPORT int trace_post_reporter(libtrace_t *libtrace)
2278{
2279        libtrace_message_t message = {0, {.uint64=0}, NULL};
2280        message.code = MESSAGE_POST_REPORTER;
2281        return trace_message_reporter(libtrace, (void *) &message);
2282}
2283
2284DLLEXPORT int trace_message_perpkts(libtrace_t * libtrace, libtrace_message_t * message)
2285{
2286        int i;
2287        int missed = 0;
2288        if (message->sender == NULL)
2289                message->sender = get_thread_descriptor(libtrace);
2290        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2291                if (libtrace->perpkt_threads[i].state == THREAD_RUNNING ||
2292                    libtrace->perpkt_threads[i].state == THREAD_PAUSED) {
2293                        libtrace_message_queue_put(&libtrace->perpkt_threads[i].messages, message);
2294                } else {
2295                        missed += 1;
2296                }
2297        }
2298        return -missed;
2299}
2300
2301/**
2302 * Publishes a result to the reduce queue
2303 * Should only be called by a perpkt thread, i.e. from a perpkt handler
2304 */
2305DLLEXPORT void trace_publish_result(libtrace_t *libtrace, libtrace_thread_t *t, uint64_t key, libtrace_generic_t value, int type) {
2306        libtrace_result_t res;
2307        res.type = type;
2308        res.key = key;
2309        res.value = value;
2310        assert(libtrace->combiner.publish);
2311        libtrace->combiner.publish(libtrace, t->perpkt_num, &libtrace->combiner, &res);
2312        return;
2313}
2314
2315DLLEXPORT void trace_set_combiner(libtrace_t *trace, const libtrace_combine_t *combiner, libtrace_generic_t config){
2316        if (combiner) {
2317                trace->combiner = *combiner;
2318                trace->combiner.configuration = config;
2319        } else {
2320                // No combiner, so don't try use it
2321                memset(&trace->combiner, 0, sizeof(trace->combiner));
2322        }
2323}
2324
2325DLLEXPORT uint64_t trace_packet_get_order(libtrace_packet_t * packet) {
2326        return packet->order;
2327}
2328
2329DLLEXPORT uint64_t trace_packet_get_hash(libtrace_packet_t * packet) {
2330        return packet->hash;
2331}
2332
2333DLLEXPORT void trace_packet_set_order(libtrace_packet_t * packet, uint64_t order) {
2334        packet->order = order;
2335}
2336
2337DLLEXPORT void trace_packet_set_hash(libtrace_packet_t * packet, uint64_t hash) {
2338        packet->hash = hash;
2339}
2340
2341DLLEXPORT bool trace_has_finished(libtrace_t * libtrace) {
2342        return libtrace->state == STATE_FINISHED || libtrace->state == STATE_JOINED;
2343}
2344
2345/**
2346 * @return True if the trace is not running such that it can be configured
2347 */
2348static inline bool trace_is_configurable(libtrace_t *trace) {
2349        return trace->state == STATE_NEW ||
2350                        trace->state == STATE_PAUSED;
2351}
2352
2353DLLEXPORT int trace_set_perpkt_threads(libtrace_t *trace, int nb) {
2354        if (!trace_is_configurable(trace)) return -1;
2355
2356        /* TODO consider allowing an offset from the total number of cores i.e.
2357         * -1 reserve 1 core */
2358        if (nb >= 0) {
2359                trace->config.perpkt_threads = nb;
2360                return 0;
2361        } else {
2362                return -1;
2363        }
2364}
2365
2366DLLEXPORT int trace_set_tick_interval(libtrace_t *trace, size_t millisec) {
2367        if (!trace_is_configurable(trace)) return -1;
2368
2369        trace->config.tick_interval = millisec;
2370        return 0;
2371}
2372
2373DLLEXPORT int trace_set_tick_count(libtrace_t *trace, size_t count) {
2374        if (!trace_is_configurable(trace)) return -1;
2375
2376        trace->config.tick_count = count;
2377        return 0;
2378}
2379
2380DLLEXPORT int trace_set_tracetime(libtrace_t *trace, bool tracetime) {
2381        if (!trace_is_configurable(trace)) return -1;
2382
2383        trace->tracetime = tracetime;
2384        return 0;
2385}
2386
2387DLLEXPORT int trace_set_cache_size(libtrace_t *trace, size_t size) {
2388        if (!trace_is_configurable(trace)) return -1;
2389
2390        trace->config.cache_size = size;
2391        return 0;
2392}
2393
2394DLLEXPORT int trace_set_thread_cache_size(libtrace_t *trace, size_t size) {
2395        if (!trace_is_configurable(trace)) return -1;
2396
2397        trace->config.thread_cache_size = size;
2398        return 0;
2399}
2400
2401DLLEXPORT int trace_set_fixed_count(libtrace_t *trace, bool fixed) {
2402        if (!trace_is_configurable(trace)) return -1;
2403
2404        trace->config.fixed_count = fixed;
2405        return 0;
2406}
2407
2408DLLEXPORT int trace_set_burst_size(libtrace_t *trace, size_t size) {
2409        if (!trace_is_configurable(trace)) return -1;
2410
2411        trace->config.burst_size = size;
2412        return 0;
2413}
2414
2415DLLEXPORT int trace_set_hasher_queue_size(libtrace_t *trace, size_t size) {
2416        if (!trace_is_configurable(trace)) return -1;
2417
2418        trace->config.hasher_queue_size = size;
2419        return 0;
2420}
2421
2422DLLEXPORT int trace_set_hasher_polling(libtrace_t *trace, bool polling) {
2423        if (!trace_is_configurable(trace)) return -1;
2424
2425        trace->config.hasher_polling = polling;
2426        return 0;
2427}
2428
2429DLLEXPORT int trace_set_reporter_polling(libtrace_t *trace, bool polling) {
2430        if (!trace_is_configurable(trace)) return -1;
2431
2432        trace->config.reporter_polling = polling;
2433        return 0;
2434}
2435
2436DLLEXPORT int trace_set_reporter_thold(libtrace_t *trace, size_t thold) {
2437        if (!trace_is_configurable(trace)) return -1;
2438
2439        trace->config.reporter_thold = thold;
2440        return 0;
2441}
2442
2443DLLEXPORT int trace_set_debug_state(libtrace_t *trace, bool debug_state) {
2444        if (!trace_is_configurable(trace)) return -1;
2445
2446        trace->config.debug_state = debug_state;
2447        return 0;
2448}
2449
2450static bool config_bool_parse(char *value, size_t nvalue) {
2451        if (strncmp(value, "true", nvalue) == 0)
2452                return true;
2453        else if (strncmp(value, "false", nvalue) == 0)
2454                return false;
2455        else
2456                return strtoll(value, NULL, 10) != 0;
2457}
2458
2459/* Note update documentation on trace_set_configuration */
2460static void config_string(struct user_configuration *uc, char *key, size_t nkey, char *value, size_t nvalue) {
2461        assert(key);
2462        assert(value);
2463        assert(uc);
2464        if (strncmp(key, "cache_size", nkey) == 0
2465            || strncmp(key, "cs", nkey) == 0) {
2466                uc->cache_size = strtoll(value, NULL, 10);
2467        } else if (strncmp(key, "thread_cache_size", nkey) == 0
2468                   || strncmp(key, "tcs", nkey) == 0) {
2469                uc->thread_cache_size = strtoll(value, NULL, 10);
2470        } else if (strncmp(key, "fixed_count", nkey) == 0
2471                   || strncmp(key, "fc", nkey) == 0) {
2472                uc->fixed_count = config_bool_parse(value, nvalue);
2473        } else if (strncmp(key, "burst_size", nkey) == 0
2474                   || strncmp(key, "bs", nkey) == 0) {
2475                uc->burst_size = strtoll(value, NULL, 10);
2476        } else if (strncmp(key, "tick_interval", nkey) == 0
2477                   || strncmp(key, "ti", nkey) == 0) {
2478                uc->tick_interval = strtoll(value, NULL, 10);
2479        } else if (strncmp(key, "tick_count", nkey) == 0
2480                   || strncmp(key, "tc", nkey) == 0) {
2481                uc->tick_count = strtoll(value, NULL, 10);
2482        } else if (strncmp(key, "perpkt_threads", nkey) == 0
2483                   || strncmp(key, "pt", nkey) == 0) {
2484                uc->perpkt_threads = strtoll(value, NULL, 10);
2485        } else if (strncmp(key, "hasher_queue_size", nkey) == 0
2486                   || strncmp(key, "hqs", nkey) == 0) {
2487                uc->hasher_queue_size = strtoll(value, NULL, 10);
2488        } else if (strncmp(key, "hasher_polling", nkey) == 0
2489                   || strncmp(key, "hp", nkey) == 0) {
2490                uc->hasher_polling = config_bool_parse(value, nvalue);
2491        } else if (strncmp(key, "reporter_polling", nkey) == 0
2492                   || strncmp(key, "rp", nkey) == 0) {
2493                uc->reporter_polling = config_bool_parse(value, nvalue);
2494        } else if (strncmp(key, "reporter_thold", nkey) == 0
2495                   || strncmp(key, "rt", nkey) == 0) {
2496                uc->reporter_thold = strtoll(value, NULL, 10);
2497        } else if (strncmp(key, "debug_state", nkey) == 0
2498                   || strncmp(key, "ds", nkey) == 0) {
2499                uc->debug_state = config_bool_parse(value, nvalue);
2500        } else {
2501                fprintf(stderr, "No matching option %s(=%s), ignoring\n", key, value);
2502        }
2503}
2504
2505DLLEXPORT int trace_set_configuration(libtrace_t *trace, const char *str) {
2506        char *pch;
2507        char key[100];
2508        char value[100];
2509        char *dup;
2510        assert(str);
2511        assert(trace);
2512
2513        if (!trace_is_configurable(trace)) return -1;
2514
2515        dup = strdup(str);
2516        pch = strtok (dup," ,.-");
2517        while (pch != NULL)
2518        {
2519                if (sscanf(pch, "%99[^=]=%99s", key, value) == 2) {
2520                        config_string(&trace->config, key, sizeof(key), value, sizeof(value));
2521                } else {
2522                        fprintf(stderr, "Error parsing option %s\n", pch);
2523                }
2524                pch = strtok (NULL," ,.-");
2525        }
2526        free(dup);
2527
2528        return 0;
2529}
2530
2531DLLEXPORT int trace_set_configuration_file(libtrace_t *trace, FILE *file) {
2532        char line[1024];
2533        if (!trace_is_configurable(trace)) return -1;
2534
2535        while (fgets(line, sizeof(line), file) != NULL)
2536        {
2537                trace_set_configuration(trace, line);
2538        }
2539
2540        if(ferror(file))
2541                return -1;
2542        else
2543                return 0;
2544}
2545
2546DLLEXPORT void trace_free_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
2547        assert(packet);
2548        /* Always release any resources this might be holding */
2549        trace_fin_packet(packet);
2550        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packet, 1, 1);
2551}
2552
2553DLLEXPORT libtrace_info_t *trace_get_information(libtrace_t * libtrace) {
2554        if (libtrace->format)
2555                return &libtrace->format->info;
2556        else
2557                return NULL;
2558}
Note: See TracBrowser for help on using the repository browser.