source: lib/trace_parallel.c @ b24d186

libtrace4pfring
Last change on this file since b24d186 was b24d186, checked in by Shane Alcock <salcock@…>, 6 years ago

Couple of optimisations introduced after looking at callgrind output

Only reset trace->last_packet to NULL in situations where we aren't
about to immediately read a new packet. The overhead of grabbing
and releasing the mutex to do this is massive for an operation we
are about to immediately overwrite.

Removed loop that checks the integrity of the packet array before
filling it with new data. This consumes a lot of CPU cycles for a
condition that really shouldn't happen anyway -- the fact that we
have asserts here suggest that this is more of a sanity check during
development than a condition we might have to deal with.

  • Property mode set to 100644
File size: 82.3 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007-2015 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * All rights reserved.
8 *
9 * This code has been developed by the University of Waikato WAND
10 * research group. For further information please see http://www.wand.net.nz/
11 *
12 * libtrace is free software; you can redistribute it and/or modify
13 * it under the terms of the GNU General Public License as published by
14 * the Free Software Foundation; either version 2 of the License, or
15 * (at your option) any later version.
16 *
17 * libtrace is distributed in the hope that it will be useful,
18 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 * GNU General Public License for more details.
21 *
22 * You should have received a copy of the GNU General Public License
23 * along with libtrace; if not, write to the Free Software
24 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25 *
26 * $Id$
27 *
28 */
29
30
31#define _GNU_SOURCE
32#include "common.h"
33#include "config.h"
34#include <assert.h>
35#include <errno.h>
36#include <fcntl.h>
37#include <stdio.h>
38#include <stdlib.h>
39#include <string.h>
40#include <sys/stat.h>
41#include <sys/types.h>
42#ifndef WIN32
43#include <sys/socket.h>
44#endif
45#include <stdarg.h>
46#include <sys/param.h>
47
48#ifdef HAVE_LIMITS_H
49#  include <limits.h>
50#endif
51
52#ifdef HAVE_SYS_LIMITS_H
53#  include <sys/limits.h>
54#endif
55
56#ifdef HAVE_NET_IF_ARP_H
57#  include <net/if_arp.h>
58#endif
59
60#ifdef HAVE_NET_IF_H
61#  include <net/if.h>
62#endif
63
64#ifdef HAVE_NETINET_IN_H
65#  include <netinet/in.h>
66#endif
67
68#ifdef HAVE_NET_ETHERNET_H
69#  include <net/ethernet.h>
70#endif
71
72#ifdef HAVE_NETINET_IF_ETHER_H
73#  include <netinet/if_ether.h>
74#endif
75
76#include <time.h>
77#ifdef WIN32
78#include <sys/timeb.h>
79#endif
80
81#include "libtrace.h"
82#include "libtrace_parallel.h"
83
84#ifdef HAVE_NET_BPF_H
85#  include <net/bpf.h>
86#else
87#  ifdef HAVE_PCAP_BPF_H
88#    include <pcap-bpf.h>
89#  endif
90#endif
91
92
93#include "libtrace_int.h"
94#include "format_helper.h"
95#include "rt_protocol.h"
96#include "hash_toeplitz.h"
97
98#include <pthread.h>
99#include <signal.h>
100#include <unistd.h>
101#include <ctype.h>
102
103static inline int delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t);
104extern int libtrace_parallel;
105
106struct mem_stats {
107        struct memfail {
108           uint64_t cache_hit;
109           uint64_t ring_hit;
110           uint64_t miss;
111           uint64_t recycled;
112        } readbulk, read, write, writebulk;
113};
114
115
116#ifdef ENABLE_MEM_STATS
117// Grrr gcc wants this spelt out
118__thread struct mem_stats mem_hits = {{0},{0},{0},{0}};
119
120
121static void print_memory_stats() {
122        uint64_t total;
123#if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__)
124        char t_name[50];
125        pthread_getname_np(pthread_self(), t_name, sizeof(t_name));
126
127        fprintf(stderr, "Thread ID#%d - %s\n", (int) pthread_self(), t_name);
128#else
129        fprintf(stderr, "Thread ID#%d\n", (int) pthread_self());
130#endif
131
132        total = mem_hits.read.cache_hit + mem_hits.read.ring_hit + mem_hits.read.miss;
133        if (total) {
134                fprintf(stderr, "\tRead:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
135                                mem_hits.read.cache_hit, mem_hits.read.ring_hit, mem_hits.read.miss, mem_hits.read.recycled);
136                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
137                                total, (double) mem_hits.read.miss / (double) total * 100.0);
138        }
139
140        total = mem_hits.readbulk.cache_hit + mem_hits.readbulk.ring_hit + mem_hits.readbulk.miss;
141        if (total) {
142                fprintf(stderr, "\tReadbulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
143                                mem_hits.readbulk.cache_hit, mem_hits.readbulk.ring_hit, mem_hits.readbulk.miss, mem_hits.readbulk.recycled);
144
145
146                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
147                                total, (double) mem_hits.readbulk.miss / (double) total * 100.0);
148        }
149
150        total = mem_hits.write.cache_hit + mem_hits.write.ring_hit + mem_hits.write.miss;
151        if (total) {
152                fprintf(stderr, "\tWrite:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
153                                mem_hits.write.cache_hit, mem_hits.write.ring_hit, mem_hits.write.miss, mem_hits.write.recycled);
154
155                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
156                                total, (double) mem_hits.write.miss / (double) total * 100.0);
157        }
158
159        total = mem_hits.writebulk.cache_hit + mem_hits.writebulk.ring_hit + mem_hits.writebulk.miss;
160        if (total) {
161                fprintf(stderr, "\tWritebulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
162                                mem_hits.writebulk.cache_hit, mem_hits.writebulk.ring_hit, mem_hits.writebulk.miss, mem_hits.writebulk.recycled);
163
164                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
165                                total, (double) mem_hits.writebulk.miss / (double) total * 100.0);
166        }
167}
168#else
169static void print_memory_stats() {}
170#endif
171
172static const libtrace_generic_t gen_zero = {0};
173
174/* This should optimise away the switch to nothing in the explict cases */
175inline void send_message(libtrace_t *trace, libtrace_thread_t *thread,
176                const enum libtrace_messages type,
177                libtrace_generic_t data, libtrace_thread_t *sender) {
178
179        fn_cb_dataless fn = NULL;
180        enum libtrace_messages switchtype;
181        libtrace_callback_set_t *cbs = NULL;
182
183        if (thread == &trace->reporter_thread) {
184                cbs = trace->reporter_cbs;
185        } else {
186                cbs = trace->perpkt_cbs;
187        }
188
189        if (cbs == NULL)
190                return;
191
192        if (type >= MESSAGE_USER)
193                switchtype = MESSAGE_USER;
194        else
195                switchtype = (enum libtrace_messages) type;
196
197        switch (switchtype) {
198        case MESSAGE_STARTING:
199                if (cbs->message_starting)
200                        thread->user_data = (*cbs->message_starting)(trace,
201                                        thread, trace->global_blob);
202                return;
203        case MESSAGE_FIRST_PACKET:
204                if (cbs->message_first_packet)
205                                (*cbs->message_first_packet)(trace, thread,
206                                trace->global_blob, thread->user_data,
207                                sender);
208                return;
209        case MESSAGE_TICK_COUNT:
210                if (cbs->message_tick_count)
211                        (*cbs->message_tick_count)(trace, thread,
212                                        trace->global_blob, thread->user_data,
213                                        data.uint64);
214                return;
215        case MESSAGE_TICK_INTERVAL:
216                if (cbs->message_tick_interval)
217                        (*cbs->message_tick_interval)(trace, thread,
218                                        trace->global_blob, thread->user_data,
219                                        data.uint64);
220                return;
221        case MESSAGE_STOPPING:
222                fn = cbs->message_stopping;
223                break;
224        case MESSAGE_RESUMING:
225                fn = cbs->message_resuming;
226                break;
227        case MESSAGE_PAUSING:
228                fn = cbs->message_pausing;
229                break;
230        case MESSAGE_USER:
231                if (cbs->message_user)
232                        (*cbs->message_user)(trace, thread, trace->global_blob,
233                                        thread->user_data, type, data, sender);
234                return;
235        case MESSAGE_RESULT:
236                if (cbs->message_result)
237                        (*cbs->message_result)(trace, thread,
238                                        trace->global_blob, thread->user_data,
239                                        data.res);
240                return;
241
242        /* These should be unused */
243        case MESSAGE_DO_PAUSE:
244        case MESSAGE_DO_STOP:
245        case MESSAGE_POST_REPORTER:
246        case MESSAGE_PACKET:
247                return;
248        }
249
250        if (fn)
251                (*fn)(trace, thread, trace->global_blob, thread->user_data);
252}
253
254DLLEXPORT void trace_destroy_callback_set(libtrace_callback_set_t *cbset) {
255        free(cbset);
256}
257
258DLLEXPORT libtrace_callback_set_t *trace_create_callback_set() {
259        libtrace_callback_set_t *cbset;
260
261        cbset = (libtrace_callback_set_t *)malloc(sizeof(libtrace_callback_set_t));
262        memset(cbset, 0, sizeof(libtrace_callback_set_t));
263        return cbset;
264}
265
266/*
267 * This can be used once the hasher thread has been started and internally after
268 * verify_configuration.
269 */
270DLLEXPORT bool trace_has_dedicated_hasher(libtrace_t * libtrace)
271{
272        return libtrace->hasher_thread.type == THREAD_HASHER;
273}
274
275DLLEXPORT bool trace_has_reporter(libtrace_t * libtrace)
276{
277        assert(libtrace->state != STATE_NEW);
278        return libtrace->reporter_thread.type == THREAD_REPORTER && libtrace->reporter_cbs;
279}
280
281/**
282 * When running the number of perpkt threads in use.
283 * TODO what if the trace is not running yet, or has finished??
284 *
285 * @brief libtrace_perpkt_thread_nb
286 * @param t The trace
287 * @return
288 */
289DLLEXPORT int trace_get_perpkt_threads(libtrace_t * t) {
290        return t->perpkt_thread_count;
291}
292
293/**
294 * Changes the overall traces state and signals the condition.
295 *
296 * @param trace A pointer to the trace
297 * @param new_state The new state of the trace
298 * @param need_lock Set to true if libtrace_lock is not held, otherwise
299 *        false in the case the lock is currently held by this thread.
300 */
301static inline void libtrace_change_state(libtrace_t *trace,
302        const enum trace_state new_state, const bool need_lock)
303{
304        UNUSED enum trace_state prev_state;
305        if (need_lock)
306                pthread_mutex_lock(&trace->libtrace_lock);
307        prev_state = trace->state;
308        trace->state = new_state;
309
310        if (trace->config.debug_state)
311                fprintf(stderr, "Trace(%s) state changed from %s to %s\n",
312                        trace->uridata, get_trace_state_name(prev_state),
313                        get_trace_state_name(trace->state));
314
315        pthread_cond_broadcast(&trace->perpkt_cond);
316        if (need_lock)
317                pthread_mutex_unlock(&trace->libtrace_lock);
318}
319
320/**
321 * Changes a thread's state and broadcasts the condition variable. This
322 * should always be done when the lock is held.
323 *
324 * Additionally for perpkt threads the state counts are updated.
325 *
326 * @param trace A pointer to the trace
327 * @param t A pointer to the thread to modify
328 * @param new_state The new state of the thread
329 * @param need_lock Set to true if libtrace_lock is not held, otherwise
330 *        false in the case the lock is currently held by this thread.
331 */
332static inline void thread_change_state(libtrace_t *trace, libtrace_thread_t *t,
333        const enum thread_states new_state, const bool need_lock)
334{
335        enum thread_states prev_state;
336        if (need_lock)
337                pthread_mutex_lock(&trace->libtrace_lock);
338        prev_state = t->state;
339        t->state = new_state;
340        if (t->type == THREAD_PERPKT) {
341                --trace->perpkt_thread_states[prev_state];
342                ++trace->perpkt_thread_states[new_state];
343        }
344
345        if (trace->config.debug_state)
346                fprintf(stderr, "Thread %d state changed from %d to %d\n",
347                        (int) t->tid, prev_state, t->state);
348
349        if (trace->perpkt_thread_states[THREAD_FINISHED] == trace->perpkt_thread_count)
350                libtrace_change_state(trace, STATE_FINISHED, false);
351
352        pthread_cond_broadcast(&trace->perpkt_cond);
353        if (need_lock)
354                pthread_mutex_unlock(&trace->libtrace_lock);
355}
356
357/**
358 * This is valid once a trace is initialised
359 *
360 * @return True if the format supports parallel threads.
361 */
362static inline bool trace_supports_parallel(libtrace_t *trace)
363{
364        assert(trace);
365        assert(trace->format);
366        if (trace->format->pstart_input)
367                return true;
368        else
369                return false;
370}
371
372void libtrace_zero_thread(libtrace_thread_t * t) {
373        t->accepted_packets = 0;
374        t->filtered_packets = 0;
375        t->recorded_first = false;
376        t->tracetime_offset_usec = 0;
377        t->user_data = 0;
378        t->format_data = 0;
379        libtrace_zero_ringbuffer(&t->rbuffer);
380        t->trace = NULL;
381        t->ret = NULL;
382        t->type = THREAD_EMPTY;
383        t->perpkt_num = -1;
384}
385
386// Ints are aligned int is atomic so safe to read and write at same time
387// However write must be locked, read doesn't (We never try read before written to table)
388libtrace_thread_t * get_thread_table(libtrace_t *libtrace) {
389        int i = 0;
390        pthread_t tid = pthread_self();
391
392        for (;i<libtrace->perpkt_thread_count ;++i) {
393                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
394                        return &libtrace->perpkt_threads[i];
395        }
396        return NULL;
397}
398
399static libtrace_thread_t * get_thread_descriptor(libtrace_t *libtrace) {
400        libtrace_thread_t *ret;
401        if (!(ret = get_thread_table(libtrace))) {
402                pthread_t tid = pthread_self();
403                // Check if we are reporter or something else
404                if (libtrace->hasher_thread.type == THREAD_REPORTER &&
405                                pthread_equal(tid, libtrace->reporter_thread.tid))
406                        ret = &libtrace->reporter_thread;
407                else if (libtrace->hasher_thread.type == THREAD_HASHER &&
408                         pthread_equal(tid, libtrace->hasher_thread.tid))
409                        ret = &libtrace->hasher_thread;
410                else
411                        ret = NULL;
412        }
413        return ret;
414}
415
416DLLEXPORT void libtrace_make_packet_safe(libtrace_packet_t *pkt) {
417        // Duplicate the packet in standard malloc'd memory and free the
418        // original, This is a 1:1 exchange so the ocache count remains unchanged.
419        if (pkt->buf_control != TRACE_CTRL_PACKET) {
420                libtrace_packet_t *dup;
421                dup = trace_copy_packet(pkt);
422                /* Release the external buffer */
423                trace_fin_packet(pkt, 1);
424                /* Copy the duplicated packet over the existing */
425                memcpy(pkt, dup, sizeof(libtrace_packet_t));
426                /* Free the packet structure */
427                free(dup);
428        }
429}
430
431/**
432 * Makes a libtrace_result_t safe, used when pausing a trace.
433 * This will call libtrace_make_packet_safe if the result is
434 * a packet.
435 */
436DLLEXPORT void libtrace_make_result_safe(libtrace_result_t *res) {
437        if (res->type == RESULT_PACKET) {
438                libtrace_make_packet_safe(res->value.pkt);
439        }
440}
441
442/**
443 * Holds threads in a paused state, until released by broadcasting
444 * the condition mutex.
445 */
446static void trace_thread_pause(libtrace_t *trace, libtrace_thread_t *t) {
447        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
448        thread_change_state(trace, t, THREAD_PAUSED, false);
449        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
450                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
451        }
452        thread_change_state(trace, t, THREAD_RUNNING, false);
453        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
454}
455
456/**
457 * Sends a packet to the user, expects either a valid packet or a TICK packet.
458 *
459 * @param trace The trace
460 * @param t The current thread
461 * @param packet A pointer to the packet storage, which may be set to null upon
462 *               return, or a packet to be finished.
463 * @param tracetime If true packets are delayed to match with tracetime
464 * @return 0 is successful, otherwise if playing back in tracetime
465 *         READ_MESSAGE(-2) can be returned in which case the packet is not sent.
466 *
467 * @note READ_MESSAGE will only be returned if tracetime is true.
468 */
469static inline int dispatch_packet(libtrace_t *trace,
470                                  libtrace_thread_t *t,
471                                  libtrace_packet_t **packet,
472                                  bool tracetime) {
473
474        if ((*packet)->error > 0) {
475                if (tracetime) {
476                        if (delay_tracetime(trace, packet[0], t) == READ_MESSAGE)
477                                return READ_MESSAGE;
478                }
479                t->accepted_packets++;
480                if (trace->perpkt_cbs->message_packet)
481                        *packet = (*trace->perpkt_cbs->message_packet)(trace, t, trace->global_blob, t->user_data, *packet);
482                trace_fin_packet(*packet, 0);
483        } else {
484                assert((*packet)->error == READ_TICK);
485                libtrace_generic_t data = {.uint64 = trace_packet_get_order(*packet)};
486                send_message(trace, t, MESSAGE_TICK_COUNT, data, t);
487        }
488        return 0;
489}
490
491/**
492 * Sends a batch of packets to the user, expects either a valid packet or a
493 * TICK packet.
494 *
495 * @param trace The trace
496 * @param t The current thread
497 * @param packets [in,out] An array of packets, these may be null upon return
498 * @param nb_packets The total number of packets in the list
499 * @param empty [in,out] A pointer to an integer storing the first empty slot,
500 * upon return this is updated
501 * @param offset [in,out] The offset into the array, upon return this is updated
502 * @param tracetime If true packets are delayed to match with tracetime
503 * @return 0 is successful, otherwise if playing back in tracetime
504 *         READ_MESSAGE(-2) can be returned in which case the packet is not sent.
505 *
506 * @note READ_MESSAGE will only be returned if tracetime is true.
507 */
508static inline int dispatch_packets(libtrace_t *trace,
509                                  libtrace_thread_t *t,
510                                  libtrace_packet_t *packets[],
511                                  int nb_packets, int *empty, int *offset,
512                                  bool tracetime) {
513        for (;*offset < nb_packets; ++*offset) {
514                int ret;
515                ret = dispatch_packet(trace, t, &packets[*offset], tracetime);
516                if (ret == 0) {
517                        /* Move full slots to front as we go */
518                        if (packets[*offset]) {
519                                if (*empty != *offset) {
520                                        packets[*empty] = packets[*offset];
521                                        packets[*offset] = NULL;
522                                }
523                                ++*empty;
524                        }
525                } else {
526                        /* Break early */
527                        assert(ret == READ_MESSAGE);
528                        return READ_MESSAGE;
529                }
530        }
531
532        return 0;
533}
534
535/**
536 * Pauses a per packet thread, messages will not be processed when the thread
537 * is paused.
538 *
539 * This process involves reading packets if a hasher thread is used. As such
540 * this function can fail to pause due to errors when reading in which case
541 * the thread should be stopped instead.
542 *
543 *
544 * @brief trace_perpkt_thread_pause
545 * @return READ_ERROR(-1) or READ_EOF(0) or 1 if successfull
546 */
547static int trace_perpkt_thread_pause(libtrace_t *trace, libtrace_thread_t *t,
548                                     libtrace_packet_t *packets[],
549                                     int nb_packets, int *empty, int *offset) {
550        libtrace_packet_t * packet = NULL;
551
552        /* Let the user thread know we are going to pause */
553        send_message(trace, t, MESSAGE_PAUSING, gen_zero, t);
554
555        /* Send through any remaining packets (or messages) without delay */
556
557        /* First send those packets already read, as fast as possible
558         * This should never fail or check for messages etc. */
559        ASSERT_RET(dispatch_packets(trace, t, packets, nb_packets, empty,
560                                    offset, false), == 0);
561
562        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
563        /* If a hasher thread is running, empty input queues so we don't lose data */
564        if (trace_has_dedicated_hasher(trace)) {
565                // The hasher has stopped by this point, so the queue shouldn't be filling
566                while(!libtrace_ringbuffer_is_empty(&t->rbuffer) || t->format_data) {
567                        int ret = trace->pread(trace, t, &packet, 1);
568                        if (ret == 1) {
569                                if (packet->error > 0) {
570                                        store_first_packet(trace, packet, t);
571                                }
572                                ASSERT_RET(dispatch_packet(trace, t, &packet, false), == 0);
573                                if (packet == NULL)
574                                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
575                        } else if (ret != READ_MESSAGE) {
576                                /* Ignore messages we pick these up next loop */
577                                assert (ret == READ_EOF || ret == READ_ERROR);
578                                /* Verify no packets are remaining */
579                                /* TODO refactor this sanity check out!! */
580                                while (!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
581                                        ASSERT_RET(trace->pread(trace, t, &packet, 1), <= 0);
582                                        // No packets after this should have any data in them
583                                        assert(packet->error <= 0);
584                                }
585                                libtrace_ocache_free(&trace->packet_freelist, (void **) &packet, 1, 1);
586                                return -1;
587                        }
588                }
589        }
590        libtrace_ocache_free(&trace->packet_freelist, (void **) &packet, 1, 1);
591
592        /* Now we do the actual pause, this returns when we resumed */
593        trace_thread_pause(trace, t);
594        send_message(trace, t, MESSAGE_RESUMING, gen_zero, t);
595        return 1;
596}
597
598/**
599 * The is the entry point for our packet processing threads.
600 */
601static void* perpkt_threads_entry(void *data) {
602        libtrace_t *trace = (libtrace_t *)data;
603        libtrace_thread_t *t;
604        libtrace_message_t message = {0, {.uint64=0}, NULL};
605        libtrace_packet_t *packets[trace->config.burst_size];
606        size_t i;
607        //int ret;
608        /* The current reading position into the packets */
609        int offset = 0;
610        /* The number of packets last read */
611        int nb_packets = 0;
612        /* The offset to the first NULL packet upto offset */
613        int empty = 0;
614
615        /* Wait until trace_pstart has been completed */
616        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
617        t = get_thread_table(trace);
618        assert(t);
619        if (trace->state == STATE_ERROR) {
620                thread_change_state(trace, t, THREAD_FINISHED, false);
621                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
622                pthread_exit(NULL);
623        }
624        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
625
626        if (trace->format->pregister_thread) {
627                if (trace->format->pregister_thread(trace, t, 
628                                trace_is_parallel(trace)) < 0) {
629                        thread_change_state(trace, t, THREAD_FINISHED, false);
630                        pthread_exit(NULL);
631                }
632        }
633
634        /* Fill our buffer with empty packets */
635        memset(&packets, 0, sizeof(void*) * trace->config.burst_size);
636        libtrace_ocache_alloc(&trace->packet_freelist, (void **) packets,
637                              trace->config.burst_size,
638                              trace->config.burst_size);
639
640        /* ~~~~~~~~~~~ Setup complete now we loop ~~~~~~~~~~~~~~~ */
641
642        /* Let the per_packet function know we have started */
643        send_message(trace, t, MESSAGE_STARTING, gen_zero, t);
644        send_message(trace, t, MESSAGE_RESUMING, gen_zero, t);
645
646        for (;;) {
647
648                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
649                        int ret;
650                        switch (message.code) {
651                                case MESSAGE_DO_PAUSE: // This is internal
652                                        ret = trace_perpkt_thread_pause(trace, t,
653                                              packets, nb_packets, &empty, &offset);
654                                        if (ret == READ_EOF) {
655                                                goto eof;
656                                        } else if (ret == READ_ERROR) {
657                                                goto error;
658                                        }
659                                        assert(ret == 1);
660                                        continue;
661                                case MESSAGE_DO_STOP: // This is internal
662                                        goto eof;
663                        }
664                        send_message(trace, t, message.code, message.data, 
665                                        message.sender);
666                        /* Continue and the empty messages out before packets */
667                        continue;
668                }
669
670
671                /* Do we need to read a new set of packets MOST LIKELY we do */
672                if (offset == nb_packets) {
673                        /* Refill the packet buffer */
674                        if (empty != nb_packets) {
675                                // Refill the empty packets
676                                libtrace_ocache_alloc(&trace->packet_freelist,
677                                                      (void **) &packets[empty],
678                                                      nb_packets - empty,
679                                                      nb_packets - empty);
680                        }
681                        if (!trace->pread) {
682                                assert(packets[0]);
683                                nb_packets = trace_read_packet(trace, packets[0]);
684                                packets[0]->error = nb_packets;
685                                if (nb_packets > 0)
686                                        nb_packets = 1;
687                        } else {
688                                nb_packets = trace->pread(trace, t, packets, trace->config.burst_size);
689                        }
690                        offset = 0;
691                        empty = 0;
692                }
693
694                /* Handle error/message cases */
695                if (nb_packets > 0) {
696                        /* Store the first packet */
697                        if (packets[0]->error > 0) {
698                                store_first_packet(trace, packets[0], t);
699                        }
700                        dispatch_packets(trace, t, packets, nb_packets, &empty,
701                                         &offset, trace->tracetime);
702                } else {
703                        switch (nb_packets) {
704                        case READ_EOF:
705                                goto eof;
706                        case READ_ERROR:
707                                goto error;
708                        case READ_MESSAGE:
709                                nb_packets = 0;
710                                continue;
711                        default:
712                                fprintf(stderr, "Unexpected error %d!!\n", nb_packets);
713                                goto error;
714                        }
715                }
716
717        }
718
719error:
720        message.code = MESSAGE_DO_STOP;
721        message.sender = t;
722        message.data.uint64 = 0;
723        trace_message_perpkts(trace, &message);
724eof:
725        /* ~~~~~~~~~~~~~~ Trace is finished do tear down ~~~~~~~~~~~~~~~~~~~~~ */
726
727        // Let the per_packet function know we have stopped
728        send_message(trace, t, MESSAGE_PAUSING, gen_zero, t);
729        send_message(trace, t, MESSAGE_STOPPING, gen_zero, t);
730
731        // Free any remaining packets
732        for (i = 0; i < trace->config.burst_size; i++) {
733                if (packets[i]) {
734                        libtrace_ocache_free(&trace->packet_freelist, (void **) &packets[i], 1, 1);
735                        packets[i] = NULL;
736                }
737        }
738
739        thread_change_state(trace, t, THREAD_FINISHED, true);
740
741        /* Make sure the reporter sees we have finished */
742        if (trace_has_reporter(trace))
743                trace_post_reporter(trace);
744
745        // Release all ocache memory before unregistering with the format
746        // because this might(it does in DPDK) unlink the formats mempool
747        // causing destroy/finish packet to fail.
748        libtrace_ocache_unregister_thread(&trace->packet_freelist);
749        if (trace->format->punregister_thread) {
750                trace->format->punregister_thread(trace, t);
751        }
752        print_memory_stats();
753
754        pthread_exit(NULL);
755}
756
757/**
758 * The start point for our single threaded hasher thread, this will read
759 * and hash a packet from a data source and queue it against the correct
760 * core to process it.
761 */
762static void* hasher_entry(void *data) {
763        libtrace_t *trace = (libtrace_t *)data;
764        libtrace_thread_t * t;
765        int i;
766        libtrace_packet_t * packet;
767        libtrace_message_t message = {0, {.uint64=0}, NULL};
768        int pkt_skipped = 0;
769
770        assert(trace_has_dedicated_hasher(trace));
771        /* Wait until all threads are started and objects are initialised (ring buffers) */
772        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
773        t = &trace->hasher_thread;
774        assert(t->type == THREAD_HASHER && pthread_equal(pthread_self(), t->tid));
775        if (trace->state == STATE_ERROR) {
776                thread_change_state(trace, t, THREAD_FINISHED, false);
777                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
778                pthread_exit(NULL);
779        }
780        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
781
782        /* We are reading but it is not the parallel API */
783        if (trace->format->pregister_thread) {
784                trace->format->pregister_thread(trace, t, true);
785        }
786
787        /* Read all packets in then hash and queue against the correct thread */
788        while (1) {
789                int thread;
790                if (!pkt_skipped)
791                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
792                assert(packet);
793
794                if (libtrace_halt) {
795                        packet->error = 0;
796                        break;
797                }
798
799                // Check for messages that we expect MESSAGE_DO_PAUSE, (internal messages only)
800                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
801                        switch(message.code) {
802                                case MESSAGE_DO_PAUSE:
803                                        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
804                                        thread_change_state(trace, t, THREAD_PAUSED, false);
805                                        pthread_cond_broadcast(&trace->perpkt_cond);
806                                        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
807                                                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
808                                        }
809                                        thread_change_state(trace, t, THREAD_RUNNING, false);
810                                        pthread_cond_broadcast(&trace->perpkt_cond);
811                                        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
812                                        break;
813                                case MESSAGE_DO_STOP:
814                                        assert(trace->started == false);
815                                        assert(trace->state == STATE_FINISHED);
816                                        /* Mark the current packet as EOF */
817                                        packet->error = 0;
818                                        break;
819                                default:
820                                        fprintf(stderr, "Hasher thread didn't expect message code=%d\n", message.code);
821                        }
822                        pkt_skipped = 1;
823                        continue;
824                }
825
826                if ((packet->error = trace_read_packet(trace, packet)) <1) {
827                        break; /* We are EOF or error'd either way we stop  */
828                }
829
830                /* We are guaranteed to have a hash function i.e. != NULL */
831                trace_packet_set_hash(packet, (*trace->hasher)(packet, trace->hasher_data));
832                thread = trace_packet_get_hash(packet) % trace->perpkt_thread_count;
833                /* Blocking write to the correct queue - I'm the only writer */
834                if (trace->perpkt_threads[thread].state != THREAD_FINISHED) {
835                        uint64_t order = trace_packet_get_order(packet);
836                        libtrace_ringbuffer_write(&trace->perpkt_threads[thread].rbuffer, packet);
837                        if (trace->config.tick_count && order % trace->config.tick_count == 0) {
838                                // Write ticks to everyone else
839                                libtrace_packet_t * pkts[trace->perpkt_thread_count];
840                                memset(pkts, 0, sizeof(void *) * trace->perpkt_thread_count);
841                                libtrace_ocache_alloc(&trace->packet_freelist, (void **) pkts, trace->perpkt_thread_count, trace->perpkt_thread_count);
842                                for (i = 0; i < trace->perpkt_thread_count; i++) {
843                                        pkts[i]->error = READ_TICK;
844                                        trace_packet_set_order(pkts[i], order);
845                                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, pkts[i]);
846                                }
847                        }
848                        pkt_skipped = 0;
849                } else {
850                        assert(!"Dropping a packet!!");
851                        pkt_skipped = 1; // Reuse that packet no one read it
852                }
853        }
854
855        /* Broadcast our last failed read to all threads */
856        for (i = 0; i < trace->perpkt_thread_count; i++) {
857                libtrace_packet_t * bcast;
858                if (i == trace->perpkt_thread_count - 1) {
859                        bcast = packet;
860                } else {
861                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &bcast, 1, 1);
862                        bcast->error = packet->error;
863                }
864                ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
865                if (trace->perpkt_threads[i].state != THREAD_FINISHED) {
866                        // Unlock early otherwise we could deadlock
867                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, bcast);
868                }
869                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
870        }
871
872        // We don't need to free the packet
873        thread_change_state(trace, t, THREAD_FINISHED, true);
874
875        libtrace_ocache_unregister_thread(&trace->packet_freelist);
876        if (trace->format->punregister_thread) {
877                trace->format->punregister_thread(trace, t);
878        }
879        print_memory_stats();
880
881        // TODO remove from TTABLE t sometime
882        pthread_exit(NULL);
883}
884
885/* Our simplest case when a thread becomes ready it can obtain an exclusive
886 * lock to read packets from the underlying trace.
887 */
888static int trace_pread_packet_first_in_first_served(libtrace_t *libtrace,
889                                                    libtrace_thread_t *t,
890                                                    libtrace_packet_t *packets[],
891                                                    size_t nb_packets) {
892        size_t i = 0;
893        //bool tick_hit = false;
894
895        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
896        /* Read nb_packets */
897        for (i = 0; i < nb_packets; ++i) {
898                if (libtrace_halt) {
899                        break;
900                }
901                packets[i]->error = trace_read_packet(libtrace, packets[i]);
902
903                if (packets[i]->error <= 0) {
904                        /* We'll catch this next time if we have already got packets */
905                        if ( i==0 ) {
906                                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
907                                return packets[i]->error;
908                        } else {
909                                break;
910                        }
911                }
912                /*
913                if (libtrace->config.tick_count && trace_packet_get_order(packets[i]) % libtrace->config.tick_count == 0) {
914                        tick_hit = true;
915                }*/
916        }
917        // Doing this inside the lock ensures the first packet is always
918        // recorded first
919        if (packets[0]->error > 0) {
920                store_first_packet(libtrace, packets[0], t);
921        }
922        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
923        /* XXX TODO this needs to be inband with packets, or we don't bother in this case
924        if (tick_hit) {
925                libtrace_message_t tick;
926                tick.additional.uint64 = trace_packet_get_order(packets[i]);
927                tick.code = MESSAGE_TICK;
928                trace_send_message_to_perpkts(libtrace, &tick);
929        } */
930        return i;
931}
932
933/**
934 * For the case that we have a dedicated hasher thread
935 * 1. We read a packet from our buffer
936 * 2. Move that into the packet provided (packet)
937 */
938inline static int trace_pread_packet_hasher_thread(libtrace_t *libtrace,
939                                                   libtrace_thread_t *t,
940                                                   libtrace_packet_t *packets[],
941                                                   size_t nb_packets) {
942        size_t i;
943
944        /* We store the last error message here */
945        if (t->format_data) {
946                return ((libtrace_packet_t *)t->format_data)->error;
947        }
948
949        // Always grab at least one
950        if (packets[0]) // Recycle the old get the new
951                libtrace_ocache_free(&libtrace->packet_freelist, (void **) packets, 1, 1);
952        packets[0] = libtrace_ringbuffer_read(&t->rbuffer);
953
954        if (packets[0]->error <= 0 && packets[0]->error != READ_TICK) {
955                return packets[0]->error;
956        }
957
958        for (i = 1; i < nb_packets; i++) {
959                if (packets[i]) // Recycle the old get the new
960                        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packets[i], 1, 1);
961                if (!libtrace_ringbuffer_try_read(&t->rbuffer, (void **) &packets[i])) {
962                        packets[i] = NULL;
963                        break;
964                }
965
966                /* We will return an error or EOF the next time around */
967                if (packets[i]->error <= 0 && packets[0]->error != READ_TICK) {
968                        /* The message case will be checked automatically -
969                           However other cases like EOF and error will only be
970                           sent once*/
971                        if (packets[i]->error != READ_MESSAGE) {
972                                assert(t->format_data == NULL);
973                                t->format_data = packets[i];
974                        }
975                        break;
976                }
977        }
978
979        return i;
980}
981
982/**
983 * For the first packet of each queue we keep a copy and note the system
984 * time it was received at.
985 *
986 * This is used for finding the first packet when playing back a trace
987 * in trace time. And can be used by real time applications to print
988 * results out every XXX seconds.
989 */
990void store_first_packet(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t)
991{
992        if (!t->recorded_first) {
993                libtrace_message_t mesg = {0, {.uint64=0}, NULL};
994                struct timeval tv;
995                libtrace_packet_t * dup;
996
997                /* We mark system time against a copy of the packet */
998                gettimeofday(&tv, NULL);
999                dup = trace_copy_packet(packet);
1000
1001                ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
1002                libtrace->first_packets.packets[t->perpkt_num].packet = dup;
1003                memcpy(&libtrace->first_packets.packets[t->perpkt_num].tv, &tv, sizeof(tv));
1004                libtrace->first_packets.count++;
1005
1006                /* Now update the first */
1007                if (libtrace->first_packets.count == 1) {
1008                        /* We the first entry hence also the first known packet */
1009                        libtrace->first_packets.first = t->perpkt_num;
1010                } else {
1011                        /* Check if we are newer than the previous 'first' packet */
1012                        size_t first = libtrace->first_packets.first;
1013                        if (trace_get_seconds(dup) <
1014                                trace_get_seconds(libtrace->first_packets.packets[first].packet))
1015                                libtrace->first_packets.first = t->perpkt_num;
1016                }
1017                ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
1018
1019                mesg.code = MESSAGE_FIRST_PACKET;
1020                trace_message_reporter(libtrace, &mesg);
1021                trace_message_perpkts(libtrace, &mesg);
1022                t->recorded_first = true;
1023        }
1024}
1025
1026DLLEXPORT int trace_get_first_packet(libtrace_t *libtrace,
1027                                     libtrace_thread_t *t,
1028                                     const libtrace_packet_t **packet,
1029                                     const struct timeval **tv)
1030{
1031        void * tmp;
1032        int ret = 0;
1033
1034        if (t) {
1035                if (t->type != THREAD_PERPKT || t->trace != libtrace)
1036                        return -1;
1037        }
1038
1039        /* Throw away these which we don't use */
1040        if (!packet)
1041                packet = (const libtrace_packet_t **) &tmp;
1042        if (!tv)
1043                tv = (const struct timeval **) &tmp;
1044
1045        ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
1046        if (t) {
1047                /* Get the requested thread */
1048                *packet = libtrace->first_packets.packets[t->perpkt_num].packet;
1049                *tv = &libtrace->first_packets.packets[t->perpkt_num].tv;
1050        } else if (libtrace->first_packets.count) {
1051                /* Get the first packet across all threads */
1052                *packet = libtrace->first_packets.packets[libtrace->first_packets.first].packet;
1053                *tv = &libtrace->first_packets.packets[libtrace->first_packets.first].tv;
1054                if (libtrace->first_packets.count == (size_t) libtrace->perpkt_thread_count) {
1055                        ret = 1;
1056                } else {
1057                        struct timeval curr_tv;
1058                        // If a second has passed since the first entry we will assume this is the very first packet
1059                        gettimeofday(&curr_tv, NULL);
1060                        if (curr_tv.tv_sec > (*tv)->tv_sec) {
1061                                if(curr_tv.tv_usec > (*tv)->tv_usec || curr_tv.tv_sec - (*tv)->tv_sec > 1) {
1062                                        ret = 1;
1063                                }
1064                        }
1065                }
1066        } else {
1067                *packet = NULL;
1068                *tv = NULL;
1069        }
1070        ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
1071        return ret;
1072}
1073
1074
1075DLLEXPORT uint64_t tv_to_usec(const struct timeval *tv)
1076{
1077        return (uint64_t) tv->tv_sec*1000000ull + (uint64_t) tv->tv_usec;
1078}
1079
1080inline static struct timeval usec_to_tv(uint64_t usec)
1081{
1082        struct timeval tv;
1083        tv.tv_sec = usec / 1000000;
1084        tv.tv_usec = usec % 1000000;
1085        return tv;
1086}
1087
1088/** Similar to delay_tracetime but send messages to all threads periodically */
1089static void* reporter_entry(void *data) {
1090        libtrace_message_t message = {0, {.uint64=0}, NULL};
1091        libtrace_t *trace = (libtrace_t *)data;
1092        libtrace_thread_t *t = &trace->reporter_thread;
1093
1094        /* Wait until all threads are started */
1095        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
1096        if (trace->state == STATE_ERROR) {
1097                thread_change_state(trace, t, THREAD_FINISHED, false);
1098                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1099                pthread_exit(NULL);
1100        }
1101        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1102
1103        if (trace->format->pregister_thread) {
1104                trace->format->pregister_thread(trace, t, false);
1105        }
1106
1107        send_message(trace, t, MESSAGE_STARTING, (libtrace_generic_t){0}, t);
1108        send_message(trace, t, MESSAGE_RESUMING, (libtrace_generic_t){0}, t);
1109
1110        while (!trace_has_finished(trace)) {
1111                if (trace->config.reporter_polling) {
1112                        if (libtrace_message_queue_try_get(&t->messages, &message) == LIBTRACE_MQ_FAILED)
1113                                message.code = MESSAGE_POST_REPORTER;
1114                } else {
1115                        libtrace_message_queue_get(&t->messages, &message);
1116                }
1117                switch (message.code) {
1118                        // Check for results
1119                        case MESSAGE_POST_REPORTER:
1120                                trace->combiner.read(trace, &trace->combiner);
1121                                break;
1122                        case MESSAGE_DO_PAUSE:
1123                                assert(trace->combiner.pause);
1124                                trace->combiner.pause(trace, &trace->combiner);
1125                                send_message(trace, t, MESSAGE_PAUSING,
1126                                                (libtrace_generic_t) {0}, t);
1127                                trace_thread_pause(trace, t);
1128                                send_message(trace, t, MESSAGE_RESUMING,
1129                                                (libtrace_generic_t) {0}, t);
1130                                break;
1131                default:
1132                        send_message(trace, t, message.code, message.data,
1133                                        message.sender);
1134                }
1135        }
1136
1137        // Flush out whats left now all our threads have finished
1138        trace->combiner.read_final(trace, &trace->combiner);
1139
1140        // GOODBYE
1141        send_message(trace, t, MESSAGE_PAUSING,(libtrace_generic_t) {0}, t);
1142        send_message(trace, t, MESSAGE_STOPPING,(libtrace_generic_t) {0}, t);
1143
1144        thread_change_state(trace, &trace->reporter_thread, THREAD_FINISHED, true);
1145        print_memory_stats();
1146        return NULL;
1147}
1148
1149/** Similar to delay_tracetime but send messages to all threads periodically */
1150static void* keepalive_entry(void *data) {
1151        struct timeval prev, next;
1152        libtrace_message_t message = {0, {.uint64=0}, NULL};
1153        libtrace_t *trace = (libtrace_t *)data;
1154        uint64_t next_release;
1155        libtrace_thread_t *t = &trace->keepalive_thread;
1156
1157        /* Wait until all threads are started */
1158        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
1159        if (trace->state == STATE_ERROR) {
1160                thread_change_state(trace, t, THREAD_FINISHED, false);
1161                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1162                pthread_exit(NULL);
1163        }
1164        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1165
1166        gettimeofday(&prev, NULL);
1167        message.code = MESSAGE_TICK_INTERVAL;
1168
1169        while (trace->state != STATE_FINISHED) {
1170                fd_set rfds;
1171                next_release = tv_to_usec(&prev) + (trace->config.tick_interval * 1000);
1172                gettimeofday(&next, NULL);
1173                if (next_release > tv_to_usec(&next)) {
1174                        next = usec_to_tv(next_release - tv_to_usec(&next));
1175                        // Wait for timeout or a message
1176                        FD_ZERO(&rfds);
1177                        FD_SET(libtrace_message_queue_get_fd(&t->messages), &rfds);
1178                        if (select(libtrace_message_queue_get_fd(&t->messages)+1, &rfds, NULL, NULL, &next) == 1) {
1179                                libtrace_message_t msg;
1180                                libtrace_message_queue_get(&t->messages, &msg);
1181                                assert(msg.code == MESSAGE_DO_STOP);
1182                                goto done;
1183                        }
1184                }
1185                prev = usec_to_tv(next_release);
1186                if (trace->state == STATE_RUNNING) {
1187                        message.data.uint64 = ((((uint64_t)prev.tv_sec) << 32) +
1188                                               (((uint64_t)prev.tv_usec << 32)/1000000));
1189                        trace_message_perpkts(trace, &message);
1190                }
1191        }
1192done:
1193
1194        thread_change_state(trace, t, THREAD_FINISHED, true);
1195        return NULL;
1196}
1197
1198/**
1199 * Delays a packets playback so the playback will be in trace time.
1200 * This may break early if a message becomes available.
1201 *
1202 * Requires the first packet for this thread to be received.
1203 * @param libtrace  The trace
1204 * @param packet    The packet to delay
1205 * @param t         The current thread
1206 * @return Either READ_MESSAGE(-2) or 0 is successful
1207 */
1208static inline int delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t) {
1209        struct timeval curr_tv, pkt_tv;
1210        uint64_t next_release = t->tracetime_offset_usec;
1211        uint64_t curr_usec;
1212
1213        if (!t->tracetime_offset_usec) {
1214                const libtrace_packet_t *first_pkt;
1215                const struct timeval *sys_tv;
1216                int64_t initial_offset;
1217                int stable = trace_get_first_packet(libtrace, NULL, &first_pkt, &sys_tv);
1218                assert(first_pkt);
1219                pkt_tv = trace_get_timeval(first_pkt);
1220                initial_offset = (int64_t)tv_to_usec(sys_tv) - (int64_t)tv_to_usec(&pkt_tv);
1221                /* In the unlikely case offset is 0, change it to 1 */
1222                if (stable)
1223                        t->tracetime_offset_usec = initial_offset ? initial_offset: 1;
1224                next_release = initial_offset;
1225        }
1226        /* next_release == offset */
1227        pkt_tv = trace_get_timeval(packet);
1228        next_release += tv_to_usec(&pkt_tv);
1229        gettimeofday(&curr_tv, NULL);
1230        curr_usec = tv_to_usec(&curr_tv);
1231        if (next_release > curr_usec) {
1232                int ret, mesg_fd = libtrace_message_queue_get_fd(&t->messages);
1233                struct timeval delay_tv = usec_to_tv(next_release-curr_usec);
1234                fd_set rfds;
1235                FD_ZERO(&rfds);
1236                FD_SET(mesg_fd, &rfds);
1237                // We need to wait
1238                ret = select(mesg_fd+1, &rfds, NULL, NULL, &delay_tv);
1239                if (ret == 0) {
1240                        return 0;
1241                } else if (ret > 0) {
1242                        return READ_MESSAGE;
1243                } else {
1244                        assert(!"trace_delay_packet: Unexpected return from select");
1245                }
1246        }
1247        return 0;
1248}
1249
1250/* Discards packets that don't match the filter.
1251 * Discarded packets are emptied and then moved to the end of the packet list.
1252 *
1253 * @param trace       The trace format, containing the filter
1254 * @param packets     An array of packets
1255 * @param nb_packets  The number of valid items in packets
1256 *
1257 * @return The number of packets that passed the filter, which are moved to
1258 *          the start of the packets array
1259 */
1260static inline size_t filter_packets(libtrace_t *trace,
1261                                    libtrace_packet_t **packets,
1262                                    size_t nb_packets) {
1263        size_t offset = 0;
1264        size_t i;
1265
1266        for (i = 0; i < nb_packets; ++i) {
1267                // The filter needs the trace attached to receive the link type
1268                packets[i]->trace = trace;
1269                if (trace_apply_filter(trace->filter, packets[i])) {
1270                        libtrace_packet_t *tmp;
1271                        tmp = packets[offset];
1272                        packets[offset++] = packets[i];
1273                        packets[i] = tmp;
1274                } else {
1275                        trace_fin_packet(packets[i], 0);
1276                }
1277        }
1278
1279        return offset;
1280}
1281
1282/* Read a batch of packets from the trace into a buffer.
1283 * Note that this function will block until a packet is read (or EOF is reached)
1284 *
1285 * @param libtrace    The trace
1286 * @param t           The thread
1287 * @param packets     An array of packets
1288 * @param nb_packets  The number of empty packets in packets
1289 * @return The number of packets read, 0 on EOF (or an error/message -1,-2).
1290 */
1291static int trace_pread_packet_wrapper(libtrace_t *libtrace,
1292                                      libtrace_thread_t *t,
1293                                      libtrace_packet_t *packets[],
1294                                      size_t nb_packets) {
1295        int i;
1296        assert(nb_packets);
1297        assert(libtrace && "libtrace is NULL in trace_read_packet()");
1298        if (trace_is_err(libtrace))
1299                return -1;
1300        if (!libtrace->started) {
1301                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
1302                              "You must call libtrace_start() before trace_read_packet()\n");
1303                return -1;
1304        }
1305
1306        if (libtrace->format->pread_packets) {
1307                int ret;
1308                /*
1309                for (i = 0; i < (int) nb_packets; ++i) {
1310                        assert(i[packets]);
1311                        if (!(packets[i]->buf_control==TRACE_CTRL_PACKET ||
1312                              packets[i]->buf_control==TRACE_CTRL_EXTERNAL)) {
1313                                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,
1314                                              "Packet passed to trace_read_packet() is invalid\n");
1315                                return -1;
1316                        }
1317                }
1318                */
1319                do {
1320                        ret=libtrace->format->pread_packets(libtrace, t,
1321                                                            packets,
1322                                                            nb_packets);
1323                        /* Error, EOF or message? */
1324                        if (ret <= 0) {
1325                                return ret;
1326                        }
1327
1328                        if (libtrace->filter) {
1329                                int remaining;
1330                                remaining = filter_packets(libtrace,
1331                                                           packets, ret);
1332                                t->filtered_packets += ret - remaining;
1333                                ret = remaining;
1334                        }
1335                        for (i = 0; i < ret; ++i) {
1336                                /* We do not mark the packet against the trace,
1337                                 * before hand or after. After breaks DAG meta
1338                                 * packets and before is inefficient */
1339                                //packets[i]->trace = libtrace;
1340                                /* TODO IN FORMAT?? Like traditional libtrace */
1341                                if (libtrace->snaplen>0)
1342                                        trace_set_capture_length(packets[i],
1343                                                        libtrace->snaplen);
1344                                trace_packet_set_order(packets[i], trace_get_erf_timestamp(packets[i]));
1345                        }
1346                } while(ret == 0);
1347                return ret;
1348        }
1349        trace_set_err(libtrace, TRACE_ERR_UNSUPPORTED,
1350                      "This format does not support reading packets\n");
1351        return ~0U;
1352}
1353
1354/* Restarts a parallel trace, this is called from trace_pstart.
1355 * The libtrace lock is held upon calling this function.
1356 * Typically with a parallel trace the threads are not
1357 * killed rather.
1358 */
1359static int trace_prestart(libtrace_t * libtrace, void *global_blob,
1360                          libtrace_callback_set_t *per_packet_cbs, 
1361                          libtrace_callback_set_t *reporter_cbs) {
1362        int i, err = 0;
1363        if (libtrace->state != STATE_PAUSED) {
1364                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
1365                        "trace(%s) is not currently paused",
1366                              libtrace->uridata);
1367                return -1;
1368        }
1369
1370        assert(libtrace_parallel);
1371        assert(!libtrace->perpkt_thread_states[THREAD_RUNNING]);
1372
1373        /* Reset first packets */
1374        pthread_spin_lock(&libtrace->first_packets.lock);
1375        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1376                assert(!!libtrace->perpkt_threads[i].recorded_first == !!libtrace->first_packets.packets[i].packet);
1377                if (libtrace->first_packets.packets[i].packet) {
1378                        trace_destroy_packet(libtrace->first_packets.packets[i].packet);
1379                        libtrace->first_packets.packets[i].packet = NULL;
1380                        libtrace->first_packets.packets[i].tv.tv_sec = 0;
1381                        libtrace->first_packets.packets[i].tv.tv_usec = 0;
1382                        libtrace->first_packets.count--;
1383                        libtrace->perpkt_threads[i].recorded_first = false;
1384                }
1385        }
1386        assert(libtrace->first_packets.count == 0);
1387        libtrace->first_packets.first = 0;
1388        pthread_spin_unlock(&libtrace->first_packets.lock);
1389
1390        /* Reset delay */
1391        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1392                libtrace->perpkt_threads[i].tracetime_offset_usec = 0;
1393        }
1394
1395        /* Reset statistics */
1396        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1397                libtrace->perpkt_threads[i].accepted_packets = 0;
1398                libtrace->perpkt_threads[i].filtered_packets = 0;
1399        }
1400        libtrace->accepted_packets = 0;
1401        libtrace->filtered_packets = 0;
1402
1403        /* Update functions if requested */
1404        if(global_blob)
1405                libtrace->global_blob = global_blob;
1406
1407        if (per_packet_cbs) {
1408                if (libtrace->perpkt_cbs)
1409                        trace_destroy_callback_set(libtrace->perpkt_cbs);
1410                libtrace->perpkt_cbs = trace_create_callback_set();
1411                memcpy(libtrace->perpkt_cbs, per_packet_cbs, 
1412                                sizeof(libtrace_callback_set_t));
1413        }
1414
1415        if (reporter_cbs) {
1416                if (libtrace->reporter_cbs)
1417                        trace_destroy_callback_set(libtrace->reporter_cbs);
1418
1419                libtrace->reporter_cbs = trace_create_callback_set();
1420                memcpy(libtrace->reporter_cbs, reporter_cbs, 
1421                                sizeof(libtrace_callback_set_t));
1422        }
1423
1424        if (trace_is_parallel(libtrace)) {
1425                err = libtrace->format->pstart_input(libtrace);
1426        } else {
1427                if (libtrace->format->start_input) {
1428                        err = libtrace->format->start_input(libtrace);
1429                }
1430        }
1431
1432        if (err == 0) {
1433                libtrace->started = true;
1434                libtrace_change_state(libtrace, STATE_RUNNING, false);
1435        }
1436        return err;
1437}
1438
1439/**
1440 * @return the number of CPU cores on the machine. -1 if unknown.
1441 */
1442SIMPLE_FUNCTION static int get_nb_cores() {
1443        int numCPU;
1444#ifdef _SC_NPROCESSORS_ONLN
1445        /* Most systems do this now */
1446        numCPU = sysconf(_SC_NPROCESSORS_ONLN);
1447
1448#else
1449        int mib[] = {CTL_HW, HW_AVAILCPU};
1450        size_t len = sizeof(numCPU);
1451
1452        /* get the number of CPUs from the system */
1453        sysctl(mib, 2, &numCPU, &len, NULL, 0);
1454#endif
1455        return numCPU <= 0 ? 1 : numCPU;
1456}
1457
1458/**
1459 * Verifies the configuration and sets default values for any values not
1460 * specified by the user.
1461 */
1462static void verify_configuration(libtrace_t *libtrace) {
1463
1464        if (libtrace->config.hasher_queue_size <= 0)
1465                libtrace->config.hasher_queue_size = 1000;
1466
1467        if (libtrace->config.perpkt_threads <= 0) {
1468                libtrace->perpkt_thread_count = get_nb_cores();
1469                if (libtrace->perpkt_thread_count <= 0)
1470                        // Lets just use one
1471                        libtrace->perpkt_thread_count = 1;
1472        } else {
1473                libtrace->perpkt_thread_count = libtrace->config.perpkt_threads;
1474        }
1475
1476        if (libtrace->config.reporter_thold <= 0)
1477                libtrace->config.reporter_thold = 100;
1478        if (libtrace->config.burst_size <= 0)
1479                libtrace->config.burst_size = 10;
1480        if (libtrace->config.thread_cache_size <= 0)
1481                libtrace->config.thread_cache_size = 20;
1482        if (libtrace->config.cache_size <= 0)
1483                libtrace->config.cache_size = (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count;
1484
1485        if (libtrace->config.cache_size <
1486                (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count)
1487                fprintf(stderr, "WARNING deadlocks may occur and extra memory allocating buffer sizes (packet_freelist_size) mismatched\n");
1488
1489        if (libtrace->combiner.initialise == NULL && libtrace->combiner.publish == NULL)
1490                libtrace->combiner = combiner_unordered;
1491
1492        /* Figure out if we are using a dedicated hasher thread? */
1493        if (libtrace->hasher && libtrace->perpkt_thread_count > 1) {
1494                libtrace->hasher_thread.type = THREAD_HASHER;
1495        }
1496}
1497
1498/**
1499 * Starts a libtrace_thread, including allocating memory for messaging.
1500 * Threads are expected to wait until the libtrace look is released.
1501 * Hence why we don't init structures until later.
1502 *
1503 * @param trace The trace the thread is associated with
1504 * @param t The thread that is filled when the thread is started
1505 * @param type The type of thread
1506 * @param start_routine The entry location of the thread
1507 * @param perpkt_num The perpkt thread number (should be set -1 if not perpkt)
1508 * @param name For debugging purposes set the threads name (Optional)
1509 *
1510 * @return 0 on success or -1 upon error in which case the libtrace error is set.
1511 *         In this situation the thread structure is zeroed.
1512 */
1513static int trace_start_thread(libtrace_t *trace,
1514                       libtrace_thread_t *t,
1515                       enum thread_types type,
1516                       void *(*start_routine) (void *),
1517                       int perpkt_num,
1518                       const char *name) {
1519#ifdef __linux__
1520        pthread_attr_t attrib;
1521        cpu_set_t cpus;
1522        int i;
1523#endif
1524        int ret;
1525        assert(t->type == THREAD_EMPTY);
1526        t->trace = trace;
1527        t->ret = NULL;
1528        t->user_data = NULL;
1529        t->type = type;
1530        t->state = THREAD_RUNNING;
1531
1532        assert(name);
1533
1534#ifdef __linux__
1535        CPU_ZERO(&cpus);
1536        for (i = 0; i < get_nb_cores(); i++)
1537                CPU_SET(i, &cpus);
1538        pthread_attr_init(&attrib);
1539        pthread_attr_setaffinity_np(&attrib, sizeof(cpus), &cpus);
1540        ret = pthread_create(&t->tid, &attrib, start_routine, (void *) trace);
1541        pthread_attr_destroy(&attrib);
1542#else
1543        ret = pthread_create(&t->tid, NULL, start_routine, (void *) trace);
1544#endif
1545        if (ret != 0) {
1546                libtrace_zero_thread(t);
1547                trace_set_err(trace, ret, "Failed to create a thread of type=%d\n", type);
1548                return -1;
1549        }
1550        libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
1551        if (trace_has_dedicated_hasher(trace) && type == THREAD_PERPKT) {
1552                libtrace_ringbuffer_init(&t->rbuffer,
1553                                         trace->config.hasher_queue_size,
1554                                         trace->config.hasher_polling?
1555                                                 LIBTRACE_RINGBUFFER_POLLING:
1556                                                 LIBTRACE_RINGBUFFER_BLOCKING);
1557        }
1558#if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__)
1559        if(name)
1560                pthread_setname_np(t->tid, name);
1561#endif
1562        t->perpkt_num = perpkt_num;
1563        return 0;
1564}
1565
1566/** Parses the environment variable LIBTRACE_CONF into the supplied
1567 * configuration structure.
1568 *
1569 * @param[in,out] libtrace The trace from which we determine the URI and set
1570 * the configuration.
1571 *
1572 * We search for 3 environment variables and apply them to the config in the
1573 * following order. Such that the first has the lowest priority.
1574 *
1575 * 1. LIBTRACE_CONF, The global environment configuration
1576 * 2. LIBTRACE_CONF_<FORMAT>, Applied to a given format
1577 * 3. LIBTRACE_CONF_<FORMAT_URI>, Applied the specified trace
1578 *
1579 * E.g.
1580 * - int:eth0 would match LIBTRACE_CONF, LIBTRACE_CONF_INT, LIBTRACE_CONF_INT_ETH0
1581 * - dag:/dev/dag0,0 would match LIBTRACE_CONF, LIBTRACE_CONF_DAG, LIBTRACE_CONF_DAG__DEV_DAG0_0
1582 * - test.erf would match LIBTRACE_CONF, LIBTRACE_CONF_ERF, LIBTRACE_CONF_ERF_TEST_ERF
1583 *
1584 * @note All environment variables names MUST only contian
1585 * [A-Z], [0-9] and [_] (underscore) and not start with a number. Any characters
1586 * outside of this range should be captilised if possible or replaced with an
1587 * underscore.
1588 */
1589static void parse_env_config (libtrace_t *libtrace) {
1590        char env_name[1024] = "LIBTRACE_CONF_";
1591        size_t len = strlen(env_name);
1592        size_t mark = 0;
1593        size_t i;
1594        char * env;
1595
1596        /* Make our compound string */
1597        strncpy(&env_name[len], libtrace->format->name, sizeof(env_name) - len);
1598        len += strlen(libtrace->format->name);
1599        strncpy(&env_name[len], ":", sizeof(env_name) - len);
1600        len += 1;
1601        strncpy(&env_name[len], libtrace->uridata, sizeof(env_name) - len);
1602
1603        /* env names are allowed to be A-Z (CAPS) 0-9 and _ */
1604        for (i = 0; env_name[i] != 0; ++i) {
1605                env_name[i] = toupper(env_name[i]);
1606                if(env_name[i] == ':') {
1607                        mark = i;
1608                }
1609                if (!( (env_name[i] >= 'A' && env_name[i] <= 'Z') ||
1610                       (env_name[i] >= '0' && env_name[i] <= '9') )) {
1611                        env_name[i] = '_';
1612                }
1613        }
1614
1615        /* First apply global env settings LIBTRACE_CONF */
1616        env = getenv("LIBTRACE_CONF");
1617        if (env)
1618        {
1619                printf("Got env %s", env);
1620                trace_set_configuration(libtrace, env);
1621        }
1622
1623        /* Then format settings LIBTRACE_CONF_<FORMAT> */
1624        if (mark != 0) {
1625                env_name[mark] = 0;
1626                env = getenv(env_name);
1627                if (env) {
1628                        trace_set_configuration(libtrace, env);
1629                }
1630                env_name[mark] = '_';
1631        }
1632
1633        /* Finally this specific trace LIBTRACE_CONF_<FORMAT_URI> */
1634        env = getenv(env_name);
1635        if (env) {
1636                trace_set_configuration(libtrace, env);
1637        }
1638}
1639
1640DLLEXPORT bool trace_is_parallel(libtrace_t * libtrace) {
1641        if (libtrace->state == STATE_NEW)
1642                return trace_supports_parallel(libtrace);
1643        return libtrace->pread == trace_pread_packet_wrapper;
1644}
1645
1646DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob,
1647                           libtrace_callback_set_t *per_packet_cbs,
1648                           libtrace_callback_set_t *reporter_cbs) {
1649        int i;
1650        int ret = -1;
1651        char name[16];
1652        sigset_t sig_before, sig_block_all;
1653        assert(libtrace);
1654
1655        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1656        if (trace_is_err(libtrace)) {
1657                goto cleanup_none;
1658        }
1659
1660        if (libtrace->state == STATE_PAUSED) {
1661                ret = trace_prestart(libtrace, global_blob, per_packet_cbs, 
1662                                reporter_cbs);
1663                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1664                return ret;
1665        }
1666
1667        if (libtrace->state != STATE_NEW) {
1668                trace_set_err(libtrace, TRACE_ERR_BAD_STATE, "trace_pstart "
1669                              "should be called on a NEW or PAUSED trace but "
1670                              "instead was called from %s",
1671                              get_trace_state_name(libtrace->state));
1672                goto cleanup_none;
1673        }
1674
1675        /* Store the user defined things against the trace */
1676        libtrace->global_blob = global_blob;
1677
1678        /* Save a copy of the callbacks in case the user tries to change them
1679         * on us later */
1680        if (!per_packet_cbs) {
1681                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "trace_pstart "
1682                                "requires a non-NULL set of per packet "
1683                                "callbacks.");
1684                goto cleanup_none;
1685        }
1686
1687        if (per_packet_cbs->message_packet == NULL) {
1688                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "The per "
1689                                "packet callbacks must include a handler "
1690                                "for a packet. Please set this using "
1691                                "trace_set_packet_cb().");
1692                goto cleanup_none;
1693        }
1694
1695        libtrace->perpkt_cbs = trace_create_callback_set();
1696        memcpy(libtrace->perpkt_cbs, per_packet_cbs, sizeof(libtrace_callback_set_t));
1697       
1698        if (reporter_cbs) {
1699                libtrace->reporter_cbs = trace_create_callback_set();
1700                memcpy(libtrace->reporter_cbs, reporter_cbs, sizeof(libtrace_callback_set_t));
1701        }
1702
1703       
1704
1705
1706        /* And zero other fields */
1707        for (i = 0; i < THREAD_STATE_MAX; ++i) {
1708                libtrace->perpkt_thread_states[i] = 0;
1709        }
1710        libtrace->first_packets.first = 0;
1711        libtrace->first_packets.count = 0;
1712        libtrace->first_packets.packets = NULL;
1713        libtrace->perpkt_threads = NULL;
1714        /* Set a global which says we are using a parallel trace. This is
1715         * for backwards compatability due to changes when destroying packets */
1716        libtrace_parallel = 1;
1717
1718        /* Parses configuration passed through environment variables */
1719        parse_env_config(libtrace);
1720        verify_configuration(libtrace);
1721
1722        ret = -1;
1723        /* Try start the format - we prefer parallel over single threaded, as
1724         * these formats should support messages better */
1725        if (trace_supports_parallel(libtrace) &&
1726            !trace_has_dedicated_hasher(libtrace)) {
1727                ret = libtrace->format->pstart_input(libtrace);
1728                libtrace->pread = trace_pread_packet_wrapper;
1729        }
1730        if (ret != 0) {
1731                if (libtrace->format->start_input) {
1732                        ret = libtrace->format->start_input(libtrace);
1733                }
1734                if (libtrace->perpkt_thread_count > 1)
1735                        libtrace->pread = trace_pread_packet_first_in_first_served;
1736                else
1737                        /* Use standard read_packet */
1738                        libtrace->pread = NULL;
1739        }
1740
1741        if (ret != 0) {
1742                goto cleanup_none;
1743        }
1744
1745        /* --- Start all the threads we need --- */
1746        /* Disable signals because it is inherited by the threads we start */
1747        sigemptyset(&sig_block_all);
1748        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_block_all, &sig_before), == 0);
1749
1750        /* If we need a hasher thread start it
1751         * Special Case: If single threaded we don't need a hasher
1752         */
1753        if (trace_has_dedicated_hasher(libtrace)) {
1754                libtrace->hasher_thread.type = THREAD_EMPTY;
1755                ret = trace_start_thread(libtrace, &libtrace->hasher_thread,
1756                                   THREAD_HASHER, hasher_entry, -1,
1757                                   "hasher-thread");
1758                if (ret != 0)
1759                        goto cleanup_started;
1760                libtrace->pread = trace_pread_packet_hasher_thread;
1761        } else {
1762                libtrace->hasher_thread.type = THREAD_EMPTY;
1763        }
1764
1765        /* Start up our perpkt threads */
1766        libtrace->perpkt_threads = calloc(sizeof(libtrace_thread_t),
1767                                          libtrace->perpkt_thread_count);
1768        if (!libtrace->perpkt_threads) {
1769                trace_set_err(libtrace, errno, "trace_pstart "
1770                              "failed to allocate memory.");
1771                goto cleanup_threads;
1772        }
1773        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1774                snprintf(name, sizeof(name), "perpkt-%d", i);
1775                libtrace_zero_thread(&libtrace->perpkt_threads[i]);
1776                ret = trace_start_thread(libtrace, &libtrace->perpkt_threads[i],
1777                                   THREAD_PERPKT, perpkt_threads_entry, i,
1778                                   name);
1779                if (ret != 0)
1780                        goto cleanup_threads;
1781        }
1782
1783        /* Start the reporter thread */
1784        if (reporter_cbs) {
1785                if (libtrace->combiner.initialise)
1786                        libtrace->combiner.initialise(libtrace, &libtrace->combiner);
1787                ret = trace_start_thread(libtrace, &libtrace->reporter_thread,
1788                                   THREAD_REPORTER, reporter_entry, -1,
1789                                   "reporter_thread");
1790                if (ret != 0)
1791                        goto cleanup_threads;
1792        }
1793
1794        /* Start the keepalive thread */
1795        if (libtrace->config.tick_interval > 0) {
1796                ret = trace_start_thread(libtrace, &libtrace->keepalive_thread,
1797                                   THREAD_KEEPALIVE, keepalive_entry, -1,
1798                                   "keepalive_thread");
1799                if (ret != 0)
1800                        goto cleanup_threads;
1801        }
1802
1803        /* Init other data structures */
1804        libtrace->perpkt_thread_states[THREAD_RUNNING] = libtrace->perpkt_thread_count;
1805        ASSERT_RET(pthread_spin_init(&libtrace->first_packets.lock, 0), == 0);
1806        libtrace->first_packets.packets = calloc(libtrace->perpkt_thread_count,
1807                                                 sizeof(*libtrace->first_packets.packets));
1808        if (libtrace->first_packets.packets == NULL) {
1809                trace_set_err(libtrace, errno, "trace_pstart "
1810                              "failed to allocate memory.");
1811                goto cleanup_threads;
1812        }
1813
1814        if (libtrace_ocache_init(&libtrace->packet_freelist,
1815                             (void* (*)()) trace_create_packet,
1816                             (void (*)(void *))trace_destroy_packet,
1817                             libtrace->config.thread_cache_size,
1818                             libtrace->config.cache_size * 4,
1819                             libtrace->config.fixed_count) != 0) {
1820                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "trace_pstart "
1821                              "failed to allocate ocache.");
1822                goto cleanup_threads;
1823        }
1824
1825        /* Threads don't start */
1826        libtrace->started = true;
1827        libtrace_change_state(libtrace, STATE_RUNNING, false);
1828
1829        ret = 0;
1830        goto success;
1831cleanup_threads:
1832        if (libtrace->first_packets.packets) {
1833                free(libtrace->first_packets.packets);
1834                libtrace->first_packets.packets = NULL;
1835        }
1836        libtrace_change_state(libtrace, STATE_ERROR, false);
1837        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1838        if (libtrace->hasher_thread.type == THREAD_HASHER) {
1839                pthread_join(libtrace->hasher_thread.tid, NULL);
1840                libtrace_zero_thread(&libtrace->hasher_thread);
1841        }
1842
1843        if (libtrace->perpkt_threads) {
1844                for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1845                        if (libtrace->perpkt_threads[i].type == THREAD_PERPKT) {
1846                                pthread_join(libtrace->perpkt_threads[i].tid, NULL);
1847                                libtrace_zero_thread(&libtrace->perpkt_threads[i]);
1848                        } else break;
1849                }
1850                free(libtrace->perpkt_threads);
1851                libtrace->perpkt_threads = NULL;
1852        }
1853
1854        if (libtrace->reporter_thread.type == THREAD_REPORTER) {
1855                pthread_join(libtrace->reporter_thread.tid, NULL);
1856                libtrace_zero_thread(&libtrace->reporter_thread);
1857        }
1858
1859        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
1860                pthread_join(libtrace->keepalive_thread.tid, NULL);
1861                libtrace_zero_thread(&libtrace->keepalive_thread);
1862        }
1863        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1864        libtrace_change_state(libtrace, STATE_NEW, false);
1865        assert(libtrace->perpkt_thread_states[THREAD_RUNNING] == 0);
1866        libtrace->perpkt_thread_states[THREAD_FINISHED] = 0;
1867cleanup_started:
1868        if (libtrace->pread == trace_pread_packet_wrapper) {
1869                if (libtrace->format->ppause_input)
1870                        libtrace->format->ppause_input(libtrace);
1871        } else {
1872                if (libtrace->format->pause_input)
1873                        libtrace->format->pause_input(libtrace);
1874        }
1875        ret = -1;
1876success:
1877        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_before, NULL), == 0);
1878cleanup_none:
1879        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1880        return ret;
1881}
1882
1883DLLEXPORT int trace_set_starting_cb(libtrace_callback_set_t *cbset,
1884                fn_cb_starting handler) {
1885        cbset->message_starting = handler;
1886        return 0;
1887}
1888
1889DLLEXPORT int trace_set_pausing_cb(libtrace_callback_set_t *cbset,
1890                fn_cb_dataless handler) {
1891        cbset->message_pausing = handler;
1892        return 0;
1893}
1894
1895DLLEXPORT int trace_set_resuming_cb(libtrace_callback_set_t *cbset,
1896                fn_cb_dataless handler) {
1897        cbset->message_resuming = handler;
1898        return 0;
1899}
1900
1901DLLEXPORT int trace_set_stopping_cb(libtrace_callback_set_t *cbset,
1902                fn_cb_dataless handler) {
1903        cbset->message_stopping = handler;
1904        return 0;
1905}
1906
1907DLLEXPORT int trace_set_packet_cb(libtrace_callback_set_t *cbset,
1908                fn_cb_packet handler) {
1909        cbset->message_packet = handler;
1910        return 0;
1911}
1912
1913DLLEXPORT int trace_set_first_packet_cb(libtrace_callback_set_t *cbset,
1914                fn_cb_first_packet handler) {
1915        cbset->message_first_packet = handler;
1916        return 0;
1917}
1918
1919DLLEXPORT int trace_set_tick_count_cb(libtrace_callback_set_t *cbset,
1920                fn_cb_tick handler) {
1921        cbset->message_tick_count = handler;
1922        return 0;
1923}
1924
1925DLLEXPORT int trace_set_tick_interval_cb(libtrace_callback_set_t *cbset,
1926                fn_cb_tick handler) {
1927        cbset->message_tick_interval = handler;
1928        return 0;
1929}
1930
1931DLLEXPORT int trace_set_result_cb(libtrace_callback_set_t *cbset,
1932                fn_cb_result handler) {
1933        cbset->message_result = handler;
1934        return 0;
1935}
1936
1937DLLEXPORT int trace_set_user_message_cb(libtrace_callback_set_t *cbset,
1938                fn_cb_usermessage handler) {
1939        cbset->message_user = handler;
1940        return 0;
1941}
1942
1943/*
1944 * Pauses a trace, this should only be called by the main thread
1945 * 1. Set started = false
1946 * 2. All perpkt threads are paused waiting on a condition var
1947 * 3. Then call ppause on the underlying format if found
1948 * 4. The traces state is paused
1949 *
1950 * Once done you should be able to modify the trace setup and call pstart again
1951 * TODO add support to change the number of threads.
1952 */
1953DLLEXPORT int trace_ppause(libtrace_t *libtrace)
1954{
1955        libtrace_thread_t *t;
1956        int i;
1957        assert(libtrace);
1958
1959        t = get_thread_table(libtrace);
1960        // Check state from within the lock if we are going to change it
1961        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1962
1963        /* If we are already paused, just treat this as a NOOP */
1964        if (libtrace->state == STATE_PAUSED) {
1965                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1966                return 0;
1967        }
1968        if (!libtrace->started || libtrace->state != STATE_RUNNING) {
1969                trace_set_err(libtrace,TRACE_ERR_BAD_STATE, "You must call trace_start() before calling trace_ppause()");
1970                return -1;
1971        }
1972
1973        libtrace_change_state(libtrace, STATE_PAUSING, false);
1974        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1975
1976        // Special case handle the hasher thread case
1977        if (trace_has_dedicated_hasher(libtrace)) {
1978                if (libtrace->config.debug_state)
1979                        fprintf(stderr, "Hasher thread is running, asking it to pause ...");
1980                libtrace_message_t message = {0, {.uint64=0}, NULL};
1981                message.code = MESSAGE_DO_PAUSE;
1982                trace_message_thread(libtrace, &libtrace->hasher_thread, &message);
1983                // Wait for it to pause
1984                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1985                while (libtrace->hasher_thread.state == THREAD_RUNNING) {
1986                        ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
1987                }
1988                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1989                if (libtrace->config.debug_state)
1990                        fprintf(stderr, " DONE\n");
1991        }
1992
1993        if (libtrace->config.debug_state)
1994                fprintf(stderr, "Asking perpkt threads to pause ...");
1995        // Stop threads, skip this one if it's a perpkt
1996        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1997                if (&libtrace->perpkt_threads[i] != t) {
1998                        libtrace_message_t message = {0, {.uint64=0}, NULL};
1999                        message.code = MESSAGE_DO_PAUSE;
2000                        ASSERT_RET(trace_message_thread(libtrace, &libtrace->perpkt_threads[i], &message), != -1);
2001                        if(trace_has_dedicated_hasher(libtrace)) {
2002                                // The hasher has stopped and other threads have messages waiting therefore
2003                                // If the queues are empty the other threads would have no data
2004                                // So send some message packets to simply ask the threads to check
2005                                // We are the only writer since hasher has paused
2006                                libtrace_packet_t *pkt;
2007                                libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &pkt, 1, 1);
2008                                pkt->error = READ_MESSAGE;
2009                                libtrace_ringbuffer_write(&libtrace->perpkt_threads[i].rbuffer, pkt);
2010                        }
2011                } else {
2012                        fprintf(stderr, "Mapper threads should not be used to pause a trace this could cause any number of problems!!\n");
2013                }
2014        }
2015
2016        if (t) {
2017                // A perpkt is doing the pausing, interesting, fake an extra thread paused
2018                // We rely on the user to *not* return before starting the trace again
2019                thread_change_state(libtrace, t, THREAD_PAUSED, true);
2020        }
2021
2022        // Wait for all threads to pause
2023        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
2024        while(libtrace->perpkt_thread_states[THREAD_RUNNING]) {
2025                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
2026        }
2027        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2028
2029        if (libtrace->config.debug_state)
2030                fprintf(stderr, " DONE\n");
2031
2032        // Deal with the reporter
2033        if (trace_has_reporter(libtrace)) {
2034                if (libtrace->config.debug_state)
2035                        fprintf(stderr, "Reporter thread is running, asking it to pause ...");
2036                if (pthread_equal(pthread_self(), libtrace->reporter_thread.tid)) {
2037                        libtrace->combiner.pause(libtrace, &libtrace->combiner);
2038                        thread_change_state(libtrace, &libtrace->reporter_thread, THREAD_PAUSED, true);
2039               
2040                } else {
2041                        libtrace_message_t message = {0, {.uint64=0}, NULL};
2042                        message.code = MESSAGE_DO_PAUSE;
2043                        trace_message_thread(libtrace, &libtrace->reporter_thread, &message);
2044                        // Wait for it to pause
2045                        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
2046                        while (libtrace->reporter_thread.state == THREAD_RUNNING) {
2047                                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
2048                        }
2049                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2050                }
2051                if (libtrace->config.debug_state)
2052                        fprintf(stderr, " DONE\n");
2053        }
2054
2055        /* Cache values before we pause */
2056        if (libtrace->stats == NULL)
2057                libtrace->stats = trace_create_statistics();
2058        // Save the statistics against the trace
2059        trace_get_statistics(libtrace, NULL);
2060        if (trace_is_parallel(libtrace)) {
2061                libtrace->started = false;
2062                if (libtrace->format->ppause_input)
2063                        libtrace->format->ppause_input(libtrace);
2064                // TODO What happens if we don't have pause input??
2065        } else {
2066                int err;
2067                err = trace_pause(libtrace);
2068                // We should handle this a bit better
2069                if (err)
2070                        return err;
2071        }
2072
2073        // Only set as paused after the pause has been called on the trace
2074        libtrace_change_state(libtrace, STATE_PAUSED, true);
2075        return 0;
2076}
2077
2078/**
2079 * Stop trace finish prematurely as though it meet an EOF
2080 * This should only be called by the main thread
2081 * 1. Calls ppause
2082 * 2. Sends a message asking for threads to finish
2083 * 3. Releases threads which will pause
2084 */
2085DLLEXPORT int trace_pstop(libtrace_t *libtrace)
2086{
2087        int i, err;
2088        libtrace_message_t message = {0, {.uint64=0}, NULL};
2089        assert(libtrace);
2090
2091        // Ensure all threads have paused and the underlying trace format has
2092        // been closed and all packets associated are cleaned up
2093        // Pause will do any state checks for us
2094        err = trace_ppause(libtrace);
2095        if (err)
2096                return err;
2097
2098        // Now send a message asking the threads to stop
2099        // This will be retrieved before trying to read another packet
2100
2101        message.code = MESSAGE_DO_STOP;
2102        trace_message_perpkts(libtrace, &message);
2103        if (trace_has_dedicated_hasher(libtrace))
2104                trace_message_thread(libtrace, &libtrace->hasher_thread, &message);
2105
2106        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2107                trace_message_thread(libtrace, &libtrace->perpkt_threads[i], &message);
2108        }
2109
2110        /* Now release the threads and let them stop - when the threads finish
2111         * the state will be set to finished */
2112        libtrace_change_state(libtrace, STATE_FINISHING, true);
2113        return 0;
2114}
2115
2116DLLEXPORT int trace_set_hasher(libtrace_t *trace, enum hasher_types type, fn_hasher hasher, void *data) {
2117        int ret = -1;
2118        if ((type == HASHER_CUSTOM && !hasher) || (type == HASHER_BALANCE && hasher)) {
2119                return -1;
2120        }
2121
2122        // Save the requirements
2123        trace->hasher_type = type;
2124        if (hasher) {
2125                trace->hasher = hasher;
2126                trace->hasher_data = data;
2127        } else {
2128                trace->hasher = NULL;
2129                trace->hasher_data = NULL;
2130        }
2131
2132        // Try push this to hardware - NOTE hardware could do custom if
2133        // there is a more efficient way to apply it, in this case
2134        // it will simply grab the function out of libtrace_t
2135        if (trace_supports_parallel(trace) && trace->format->config_input)
2136                ret = trace->format->config_input(trace, TRACE_OPTION_HASHER, &type);
2137
2138        if (ret == -1) {
2139                /* We have to deal with this ourself */
2140                if (!hasher) {
2141                        switch (type)
2142                        {
2143                                case HASHER_CUSTOM:
2144                                case HASHER_BALANCE:
2145                                        return 0;
2146                                case HASHER_BIDIRECTIONAL:
2147                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
2148                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
2149                                        toeplitz_init_config(trace->hasher_data, 1);
2150                                        return 0;
2151                                case HASHER_UNIDIRECTIONAL:
2152                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
2153                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
2154                                        toeplitz_init_config(trace->hasher_data, 0);
2155                                        return 0;
2156                        }
2157                        return -1;
2158                }
2159        } else {
2160                /* If the hasher is hardware we zero out the hasher and hasher
2161                 * data fields - only if we need a hasher do we do this */
2162                trace->hasher = NULL;
2163                trace->hasher_data = NULL;
2164        }
2165
2166        return 0;
2167}
2168
2169// Waits for all threads to finish
2170DLLEXPORT void trace_join(libtrace_t *libtrace) {
2171        int i;
2172
2173        /* Firstly wait for the perpkt threads to finish, since these are
2174         * user controlled */
2175        for (i=0; i< libtrace->perpkt_thread_count; i++) {
2176                ASSERT_RET(pthread_join(libtrace->perpkt_threads[i].tid, NULL), == 0);
2177                // So we must do our best effort to empty the queue - so
2178                // the producer (or any other threads) don't block.
2179                libtrace_packet_t * packet;
2180                assert(libtrace->perpkt_threads[i].state == THREAD_FINISHED);
2181                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
2182                        if (packet) // This could be NULL iff the perpkt finishes early
2183                                trace_destroy_packet(packet);
2184        }
2185
2186        /* Now the hasher */
2187        if (trace_has_dedicated_hasher(libtrace)) {
2188                pthread_join(libtrace->hasher_thread.tid, NULL);
2189                assert(libtrace->hasher_thread.state == THREAD_FINISHED);
2190        }
2191
2192        // Now that everything is finished nothing can be touching our
2193        // buffers so clean them up
2194        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2195                // Its possible 1 packet got added by the reporter (or 1 per any other thread) since we cleaned up
2196                // if they lost timeslice before-during a write
2197                libtrace_packet_t * packet;
2198                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
2199                        trace_destroy_packet(packet);
2200                if (trace_has_dedicated_hasher(libtrace)) {
2201                        assert(libtrace_ringbuffer_is_empty(&libtrace->perpkt_threads[i].rbuffer));
2202                        libtrace_ringbuffer_destroy(&libtrace->perpkt_threads[i].rbuffer);
2203                }
2204                // Cannot destroy vector yet, this happens with trace_destroy
2205        }
2206
2207        if (trace_has_reporter(libtrace)) {
2208                pthread_join(libtrace->reporter_thread.tid, NULL);
2209                assert(libtrace->reporter_thread.state == THREAD_FINISHED);
2210        }
2211
2212        // Wait for the tick (keepalive) thread if it has been started
2213        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
2214                libtrace_message_t msg = {0, {.uint64=0}, NULL};
2215                msg.code = MESSAGE_DO_STOP;
2216                trace_message_thread(libtrace, &libtrace->keepalive_thread, &msg);
2217                pthread_join(libtrace->keepalive_thread.tid, NULL);
2218        }
2219
2220        libtrace_change_state(libtrace, STATE_JOINED, true);
2221        print_memory_stats();
2222}
2223
2224DLLEXPORT int libtrace_thread_get_message_count(libtrace_t * libtrace,
2225                                                libtrace_thread_t *t)
2226{
2227        int ret;
2228        if (t == NULL)
2229                t = get_thread_descriptor(libtrace);
2230        if (t == NULL)
2231                return -1;
2232        ret = libtrace_message_queue_count(&t->messages);
2233        return ret < 0 ? 0 : ret;
2234}
2235
2236DLLEXPORT int libtrace_thread_get_message(libtrace_t * libtrace,
2237                                          libtrace_thread_t *t,
2238                                          libtrace_message_t * message)
2239{
2240        int ret;
2241        if (t == NULL)
2242                t = get_thread_descriptor(libtrace);
2243        if (t == NULL)
2244                return -1;
2245        ret = libtrace_message_queue_get(&t->messages, message);
2246        return ret < 0 ? 0 : ret;
2247}
2248
2249DLLEXPORT int libtrace_thread_try_get_message(libtrace_t * libtrace,
2250                                              libtrace_thread_t *t,
2251                                              libtrace_message_t * message)
2252{
2253        if (t == NULL)
2254                t = get_thread_descriptor(libtrace);
2255        if (t == NULL)
2256                return -1;
2257        if (libtrace_message_queue_try_get(&t->messages, message) != LIBTRACE_MQ_FAILED)
2258                return 0;
2259        else
2260                return -1;
2261}
2262
2263DLLEXPORT int trace_message_thread(libtrace_t * libtrace, libtrace_thread_t *t, libtrace_message_t * message)
2264{
2265        int ret;
2266        if (!message->sender)
2267                message->sender = get_thread_descriptor(libtrace);
2268
2269        ret = libtrace_message_queue_put(&t->messages, message);
2270        return ret < 0 ? 0 : ret;
2271}
2272
2273DLLEXPORT int trace_message_reporter(libtrace_t * libtrace, libtrace_message_t * message)
2274{
2275        if (!trace_has_reporter(libtrace) ||
2276            !(libtrace->reporter_thread.state == THREAD_RUNNING
2277              || libtrace->reporter_thread.state == THREAD_PAUSED))
2278                return -1;
2279
2280        return trace_message_thread(libtrace, &libtrace->reporter_thread, message);
2281}
2282
2283DLLEXPORT int trace_post_reporter(libtrace_t *libtrace)
2284{
2285        libtrace_message_t message = {0, {.uint64=0}, NULL};
2286        message.code = MESSAGE_POST_REPORTER;
2287        return trace_message_reporter(libtrace, (void *) &message);
2288}
2289
2290DLLEXPORT int trace_message_perpkts(libtrace_t * libtrace, libtrace_message_t * message)
2291{
2292        int i;
2293        int missed = 0;
2294        if (message->sender == NULL)
2295                message->sender = get_thread_descriptor(libtrace);
2296        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2297                if (libtrace->perpkt_threads[i].state == THREAD_RUNNING ||
2298                    libtrace->perpkt_threads[i].state == THREAD_PAUSED) {
2299                        libtrace_message_queue_put(&libtrace->perpkt_threads[i].messages, message);
2300                } else {
2301                        missed += 1;
2302                }
2303        }
2304        return -missed;
2305}
2306
2307/**
2308 * Publishes a result to the reduce queue
2309 * Should only be called by a perpkt thread, i.e. from a perpkt handler
2310 */
2311DLLEXPORT void trace_publish_result(libtrace_t *libtrace, libtrace_thread_t *t, uint64_t key, libtrace_generic_t value, int type) {
2312        libtrace_result_t res;
2313        res.type = type;
2314        res.key = key;
2315        res.value = value;
2316        assert(libtrace->combiner.publish);
2317        libtrace->combiner.publish(libtrace, t->perpkt_num, &libtrace->combiner, &res);
2318        return;
2319}
2320
2321DLLEXPORT void trace_set_combiner(libtrace_t *trace, const libtrace_combine_t *combiner, libtrace_generic_t config){
2322        if (combiner) {
2323                trace->combiner = *combiner;
2324                trace->combiner.configuration = config;
2325        } else {
2326                // No combiner, so don't try use it
2327                memset(&trace->combiner, 0, sizeof(trace->combiner));
2328        }
2329}
2330
2331DLLEXPORT uint64_t trace_packet_get_order(libtrace_packet_t * packet) {
2332        return packet->order;
2333}
2334
2335DLLEXPORT uint64_t trace_packet_get_hash(libtrace_packet_t * packet) {
2336        return packet->hash;
2337}
2338
2339DLLEXPORT void trace_packet_set_order(libtrace_packet_t * packet, uint64_t order) {
2340        packet->order = order;
2341}
2342
2343DLLEXPORT void trace_packet_set_hash(libtrace_packet_t * packet, uint64_t hash) {
2344        packet->hash = hash;
2345}
2346
2347DLLEXPORT bool trace_has_finished(libtrace_t * libtrace) {
2348        return libtrace->state == STATE_FINISHED || libtrace->state == STATE_JOINED;
2349}
2350
2351/**
2352 * @return True if the trace is not running such that it can be configured
2353 */
2354static inline bool trace_is_configurable(libtrace_t *trace) {
2355        return trace->state == STATE_NEW ||
2356                        trace->state == STATE_PAUSED;
2357}
2358
2359DLLEXPORT int trace_set_perpkt_threads(libtrace_t *trace, int nb) {
2360        if (!trace_is_configurable(trace)) return -1;
2361
2362        /* TODO consider allowing an offset from the total number of cores i.e.
2363         * -1 reserve 1 core */
2364        if (nb >= 0) {
2365                trace->config.perpkt_threads = nb;
2366                return 0;
2367        } else {
2368                return -1;
2369        }
2370}
2371
2372DLLEXPORT int trace_set_tick_interval(libtrace_t *trace, size_t millisec) {
2373        if (!trace_is_configurable(trace)) return -1;
2374
2375        trace->config.tick_interval = millisec;
2376        return 0;
2377}
2378
2379DLLEXPORT int trace_set_tick_count(libtrace_t *trace, size_t count) {
2380        if (!trace_is_configurable(trace)) return -1;
2381
2382        trace->config.tick_count = count;
2383        return 0;
2384}
2385
2386DLLEXPORT int trace_set_tracetime(libtrace_t *trace, bool tracetime) {
2387        if (!trace_is_configurable(trace)) return -1;
2388
2389        trace->tracetime = tracetime;
2390        return 0;
2391}
2392
2393DLLEXPORT int trace_set_cache_size(libtrace_t *trace, size_t size) {
2394        if (!trace_is_configurable(trace)) return -1;
2395
2396        trace->config.cache_size = size;
2397        return 0;
2398}
2399
2400DLLEXPORT int trace_set_thread_cache_size(libtrace_t *trace, size_t size) {
2401        if (!trace_is_configurable(trace)) return -1;
2402
2403        trace->config.thread_cache_size = size;
2404        return 0;
2405}
2406
2407DLLEXPORT int trace_set_fixed_count(libtrace_t *trace, bool fixed) {
2408        if (!trace_is_configurable(trace)) return -1;
2409
2410        trace->config.fixed_count = fixed;
2411        return 0;
2412}
2413
2414DLLEXPORT int trace_set_burst_size(libtrace_t *trace, size_t size) {
2415        if (!trace_is_configurable(trace)) return -1;
2416
2417        trace->config.burst_size = size;
2418        return 0;
2419}
2420
2421DLLEXPORT int trace_set_hasher_queue_size(libtrace_t *trace, size_t size) {
2422        if (!trace_is_configurable(trace)) return -1;
2423
2424        trace->config.hasher_queue_size = size;
2425        return 0;
2426}
2427
2428DLLEXPORT int trace_set_hasher_polling(libtrace_t *trace, bool polling) {
2429        if (!trace_is_configurable(trace)) return -1;
2430
2431        trace->config.hasher_polling = polling;
2432        return 0;
2433}
2434
2435DLLEXPORT int trace_set_reporter_polling(libtrace_t *trace, bool polling) {
2436        if (!trace_is_configurable(trace)) return -1;
2437
2438        trace->config.reporter_polling = polling;
2439        return 0;
2440}
2441
2442DLLEXPORT int trace_set_reporter_thold(libtrace_t *trace, size_t thold) {
2443        if (!trace_is_configurable(trace)) return -1;
2444
2445        trace->config.reporter_thold = thold;
2446        return 0;
2447}
2448
2449DLLEXPORT int trace_set_debug_state(libtrace_t *trace, bool debug_state) {
2450        if (!trace_is_configurable(trace)) return -1;
2451
2452        trace->config.debug_state = debug_state;
2453        return 0;
2454}
2455
2456static bool config_bool_parse(char *value, size_t nvalue) {
2457        if (strncmp(value, "true", nvalue) == 0)
2458                return true;
2459        else if (strncmp(value, "false", nvalue) == 0)
2460                return false;
2461        else
2462                return strtoll(value, NULL, 10) != 0;
2463}
2464
2465/* Note update documentation on trace_set_configuration */
2466static void config_string(struct user_configuration *uc, char *key, size_t nkey, char *value, size_t nvalue) {
2467        assert(key);
2468        assert(value);
2469        assert(uc);
2470        if (strncmp(key, "cache_size", nkey) == 0
2471            || strncmp(key, "cs", nkey) == 0) {
2472                uc->cache_size = strtoll(value, NULL, 10);
2473        } else if (strncmp(key, "thread_cache_size", nkey) == 0
2474                   || strncmp(key, "tcs", nkey) == 0) {
2475                uc->thread_cache_size = strtoll(value, NULL, 10);
2476        } else if (strncmp(key, "fixed_count", nkey) == 0
2477                   || strncmp(key, "fc", nkey) == 0) {
2478                uc->fixed_count = config_bool_parse(value, nvalue);
2479        } else if (strncmp(key, "burst_size", nkey) == 0
2480                   || strncmp(key, "bs", nkey) == 0) {
2481                uc->burst_size = strtoll(value, NULL, 10);
2482        } else if (strncmp(key, "tick_interval", nkey) == 0
2483                   || strncmp(key, "ti", nkey) == 0) {
2484                uc->tick_interval = strtoll(value, NULL, 10);
2485        } else if (strncmp(key, "tick_count", nkey) == 0
2486                   || strncmp(key, "tc", nkey) == 0) {
2487                uc->tick_count = strtoll(value, NULL, 10);
2488        } else if (strncmp(key, "perpkt_threads", nkey) == 0
2489                   || strncmp(key, "pt", nkey) == 0) {
2490                uc->perpkt_threads = strtoll(value, NULL, 10);
2491        } else if (strncmp(key, "hasher_queue_size", nkey) == 0
2492                   || strncmp(key, "hqs", nkey) == 0) {
2493                uc->hasher_queue_size = strtoll(value, NULL, 10);
2494        } else if (strncmp(key, "hasher_polling", nkey) == 0
2495                   || strncmp(key, "hp", nkey) == 0) {
2496                uc->hasher_polling = config_bool_parse(value, nvalue);
2497        } else if (strncmp(key, "reporter_polling", nkey) == 0
2498                   || strncmp(key, "rp", nkey) == 0) {
2499                uc->reporter_polling = config_bool_parse(value, nvalue);
2500        } else if (strncmp(key, "reporter_thold", nkey) == 0
2501                   || strncmp(key, "rt", nkey) == 0) {
2502                uc->reporter_thold = strtoll(value, NULL, 10);
2503        } else if (strncmp(key, "debug_state", nkey) == 0
2504                   || strncmp(key, "ds", nkey) == 0) {
2505                uc->debug_state = config_bool_parse(value, nvalue);
2506        } else {
2507                fprintf(stderr, "No matching option %s(=%s), ignoring\n", key, value);
2508        }
2509}
2510
2511DLLEXPORT int trace_set_configuration(libtrace_t *trace, const char *str) {
2512        char *pch;
2513        char key[100];
2514        char value[100];
2515        char *dup;
2516        assert(str);
2517        assert(trace);
2518
2519        if (!trace_is_configurable(trace)) return -1;
2520
2521        dup = strdup(str);
2522        pch = strtok (dup," ,.-");
2523        while (pch != NULL)
2524        {
2525                if (sscanf(pch, "%99[^=]=%99s", key, value) == 2) {
2526                        config_string(&trace->config, key, sizeof(key), value, sizeof(value));
2527                } else {
2528                        fprintf(stderr, "Error parsing option %s\n", pch);
2529                }
2530                pch = strtok (NULL," ,.-");
2531        }
2532        free(dup);
2533
2534        return 0;
2535}
2536
2537DLLEXPORT int trace_set_configuration_file(libtrace_t *trace, FILE *file) {
2538        char line[1024];
2539        if (!trace_is_configurable(trace)) return -1;
2540
2541        while (fgets(line, sizeof(line), file) != NULL)
2542        {
2543                trace_set_configuration(trace, line);
2544        }
2545
2546        if(ferror(file))
2547                return -1;
2548        else
2549                return 0;
2550}
2551
2552DLLEXPORT void trace_free_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
2553        assert(packet);
2554        /* Always release any resources this might be holding */
2555        trace_fin_packet(packet, 0);
2556        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packet, 1, 1);
2557}
2558
2559DLLEXPORT libtrace_info_t *trace_get_information(libtrace_t * libtrace) {
2560        if (libtrace->format)
2561                return &libtrace->format->info;
2562        else
2563                return NULL;
2564}
Note: See TracBrowser for help on using the repository browser.