source: lib/trace_parallel.c

develop
Last change on this file was 243bfd1, checked in by Shane Alcock <salcock@…>, 3 years ago

Improve parallel performance by skipping needless sanity checks

Normally, the sanity checking that we do here would be a good idea,
just in case something has corrupted one of our packets.

BUT, these checks happen every single time we attempt to read a
batch of packets (even for members of the packet array that are
not filled in by the read operation) and even though the checks
themselves are pretty simple, the CPU cycles used to perform them
really start to add up at high packet rates.

Instead, we're going to trust that a) we don't invalidate the
packets ourselves with some broken internal code and b) the user
doesn't go around touching raw packet structure contents
because we have an API for all necessary interactions. Sanity
checks can also be added at the point where the packet array element
is about to be updated by the read, if necessary. At least at that
point, we're sanity-checking something that we're about to use
instead of the current case where we waste time checking something
that we may or may not use.

  • Property mode set to 100644
File size: 90.5 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27#define _GNU_SOURCE
28#include "common.h"
29#include "config.h"
30#include <assert.h>
31#include <errno.h>
32#include <fcntl.h>
33#include <stdio.h>
34#include <stdlib.h>
35#include <string.h>
36#include <sys/stat.h>
37#include <sys/types.h>
38#ifndef WIN32
39#include <sys/socket.h>
40#endif
41#include <stdarg.h>
42#include <sys/param.h>
43
44#ifdef HAVE_LIMITS_H
45#  include <limits.h>
46#endif
47
48#ifdef HAVE_SYS_LIMITS_H
49#  include <sys/limits.h>
50#endif
51
52#ifdef HAVE_NET_IF_ARP_H
53#  include <net/if_arp.h>
54#endif
55
56#ifdef HAVE_NET_IF_H
57#  include <net/if.h>
58#endif
59
60#ifdef HAVE_NETINET_IN_H
61#  include <netinet/in.h>
62#endif
63
64#ifdef HAVE_NET_ETHERNET_H
65#  include <net/ethernet.h>
66#endif
67
68#ifdef HAVE_NETINET_IF_ETHER_H
69#  include <netinet/if_ether.h>
70#endif
71
72#include <time.h>
73#ifdef WIN32
74#include <sys/timeb.h>
75#endif
76
77#include "libtrace.h"
78#include "libtrace_parallel.h"
79
80#ifdef HAVE_NET_BPF_H
81#  include <net/bpf.h>
82#else
83#  ifdef HAVE_PCAP_BPF_H
84#    include <pcap-bpf.h>
85#  endif
86#endif
87
88
89#include "libtrace_int.h"
90#include "format_helper.h"
91#include "rt_protocol.h"
92#include "hash_toeplitz.h"
93
94#include <pthread.h>
95#include <signal.h>
96#include <unistd.h>
97#include <ctype.h>
98
99static inline int delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t);
100extern int libtrace_parallel;
101
102struct mem_stats {
103        struct memfail {
104           uint64_t cache_hit;
105           uint64_t ring_hit;
106           uint64_t miss;
107           uint64_t recycled;
108        } readbulk, read, write, writebulk;
109};
110
111
112#ifdef ENABLE_MEM_STATS
113// Grrr gcc wants this spelt out
114__thread struct mem_stats mem_hits = {{0},{0},{0},{0}};
115
116
117static void print_memory_stats() {
118        uint64_t total;
119#if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__)
120        char t_name[50];
121        pthread_getname_np(pthread_self(), t_name, sizeof(t_name));
122
123        fprintf(stderr, "Thread ID#%d - %s\n", (int) pthread_self(), t_name);
124#else
125        fprintf(stderr, "Thread ID#%d\n", (int) pthread_self());
126#endif
127
128        total = mem_hits.read.cache_hit + mem_hits.read.ring_hit + mem_hits.read.miss;
129        if (total) {
130                fprintf(stderr, "\tRead:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
131                                mem_hits.read.cache_hit, mem_hits.read.ring_hit, mem_hits.read.miss, mem_hits.read.recycled);
132                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
133                                total, (double) mem_hits.read.miss / (double) total * 100.0);
134        }
135
136        total = mem_hits.readbulk.cache_hit + mem_hits.readbulk.ring_hit + mem_hits.readbulk.miss;
137        if (total) {
138                fprintf(stderr, "\tReadbulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
139                                mem_hits.readbulk.cache_hit, mem_hits.readbulk.ring_hit, mem_hits.readbulk.miss, mem_hits.readbulk.recycled);
140
141
142                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
143                                total, (double) mem_hits.readbulk.miss / (double) total * 100.0);
144        }
145
146        total = mem_hits.write.cache_hit + mem_hits.write.ring_hit + mem_hits.write.miss;
147        if (total) {
148                fprintf(stderr, "\tWrite:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
149                                mem_hits.write.cache_hit, mem_hits.write.ring_hit, mem_hits.write.miss, mem_hits.write.recycled);
150
151                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
152                                total, (double) mem_hits.write.miss / (double) total * 100.0);
153        }
154
155        total = mem_hits.writebulk.cache_hit + mem_hits.writebulk.ring_hit + mem_hits.writebulk.miss;
156        if (total) {
157                fprintf(stderr, "\tWritebulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
158                                mem_hits.writebulk.cache_hit, mem_hits.writebulk.ring_hit, mem_hits.writebulk.miss, mem_hits.writebulk.recycled);
159
160                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
161                                total, (double) mem_hits.writebulk.miss / (double) total * 100.0);
162        }
163}
164#else
165static void print_memory_stats() {}
166#endif
167
168static const libtrace_generic_t gen_zero = {0};
169
170/* This should optimise away the switch to nothing in the explict cases */
171inline void send_message(libtrace_t *trace, libtrace_thread_t *thread,
172                const enum libtrace_messages type,
173                libtrace_generic_t data, libtrace_thread_t *sender) {
174
175        fn_cb_dataless fn = NULL;
176        enum libtrace_messages switchtype;
177        libtrace_callback_set_t *cbs = NULL;
178
179        if (thread == &trace->reporter_thread) {
180                cbs = trace->reporter_cbs;
181        } else {
182                cbs = trace->perpkt_cbs;
183        }
184
185        if (cbs == NULL)
186                return;
187
188        if (type >= MESSAGE_USER)
189                switchtype = MESSAGE_USER;
190        else
191                switchtype = (enum libtrace_messages) type;
192
193        switch (switchtype) {
194        case MESSAGE_STARTING:
195                if (cbs->message_starting)
196                        thread->user_data = (*cbs->message_starting)(trace,
197                                        thread, trace->global_blob);
198                return;
199        case MESSAGE_FIRST_PACKET:
200                if (cbs->message_first_packet)
201                                (*cbs->message_first_packet)(trace, thread,
202                                trace->global_blob, thread->user_data,
203                                sender);
204                return;
205        case MESSAGE_TICK_COUNT:
206                if (cbs->message_tick_count)
207                        (*cbs->message_tick_count)(trace, thread,
208                                        trace->global_blob, thread->user_data,
209                                        data.uint64);
210                return;
211        case MESSAGE_TICK_INTERVAL:
212                if (cbs->message_tick_interval)
213                        (*cbs->message_tick_interval)(trace, thread,
214                                        trace->global_blob, thread->user_data,
215                                        data.uint64);
216                return;
217        case MESSAGE_STOPPING:
218                fn = cbs->message_stopping;
219                break;
220        case MESSAGE_RESUMING:
221                fn = cbs->message_resuming;
222                break;
223        case MESSAGE_PAUSING:
224                fn = cbs->message_pausing;
225                break;
226        case MESSAGE_USER:
227                if (cbs->message_user)
228                        (*cbs->message_user)(trace, thread, trace->global_blob,
229                                        thread->user_data, type, data, sender);
230                return;
231        case MESSAGE_RESULT:
232                if (cbs->message_result)
233                        (*cbs->message_result)(trace, thread,
234                                        trace->global_blob, thread->user_data,
235                                        data.res);
236                return;
237
238        /* These should be unused */
239        case MESSAGE_DO_PAUSE:
240        case MESSAGE_DO_STOP:
241        case MESSAGE_POST_REPORTER:
242        case MESSAGE_PACKET:
243                return;
244        case MESSAGE_META_PACKET:
245                return;
246        }
247        if (fn)
248                (*fn)(trace, thread, trace->global_blob, thread->user_data);
249}
250
251DLLEXPORT void trace_destroy_callback_set(libtrace_callback_set_t *cbset) {
252        free(cbset);
253}
254
255DLLEXPORT libtrace_callback_set_t *trace_create_callback_set() {
256        libtrace_callback_set_t *cbset;
257
258        cbset = (libtrace_callback_set_t *)malloc(sizeof(libtrace_callback_set_t));
259        memset(cbset, 0, sizeof(libtrace_callback_set_t));
260        return cbset;
261}
262
263/*
264 * This can be used once the hasher thread has been started and internally after
265 * verify_configuration.
266 */
267DLLEXPORT bool trace_has_dedicated_hasher(libtrace_t * libtrace)
268{
269        return libtrace->hasher_thread.type == THREAD_HASHER;
270}
271
272DLLEXPORT bool trace_has_reporter(libtrace_t * libtrace)
273{
274        if (!(libtrace->state != STATE_NEW)) {
275                trace_set_err(libtrace, TRACE_ERR_BAD_STATE, "Cannot check reporter for the current state in trace_has_reporter()");
276                return false;
277        }
278        return libtrace->reporter_thread.type == THREAD_REPORTER && libtrace->reporter_cbs;
279}
280
281/**
282 * When running the number of perpkt threads in use.
283 * TODO what if the trace is not running yet, or has finished??
284 *
285 * @brief libtrace_perpkt_thread_nb
286 * @param t The trace
287 * @return
288 */
289DLLEXPORT int trace_get_perpkt_threads(libtrace_t * t) {
290        return t->perpkt_thread_count;
291}
292
293DLLEXPORT int trace_get_perpkt_thread_id(libtrace_thread_t *thread) {
294        return thread->perpkt_num;
295}
296
297/**
298 * Changes the overall traces state and signals the condition.
299 *
300 * @param trace A pointer to the trace
301 * @param new_state The new state of the trace
302 * @param need_lock Set to true if libtrace_lock is not held, otherwise
303 *        false in the case the lock is currently held by this thread.
304 */
305static inline void libtrace_change_state(libtrace_t *trace,
306        const enum trace_state new_state, const bool need_lock)
307{
308        UNUSED enum trace_state prev_state;
309        if (need_lock)
310                pthread_mutex_lock(&trace->libtrace_lock);
311        prev_state = trace->state;
312        trace->state = new_state;
313
314        if (trace->config.debug_state)
315                fprintf(stderr, "Trace(%s) state changed from %s to %s\n",
316                        trace->uridata, get_trace_state_name(prev_state),
317                        get_trace_state_name(trace->state));
318
319        pthread_cond_broadcast(&trace->perpkt_cond);
320        if (need_lock)
321                pthread_mutex_unlock(&trace->libtrace_lock);
322}
323
324/**
325 * Changes a thread's state and broadcasts the condition variable. This
326 * should always be done when the lock is held.
327 *
328 * Additionally for perpkt threads the state counts are updated.
329 *
330 * @param trace A pointer to the trace
331 * @param t A pointer to the thread to modify
332 * @param new_state The new state of the thread
333 * @param need_lock Set to true if libtrace_lock is not held, otherwise
334 *        false in the case the lock is currently held by this thread.
335 */
336static inline void thread_change_state(libtrace_t *trace, libtrace_thread_t *t,
337        const enum thread_states new_state, const bool need_lock)
338{
339        enum thread_states prev_state;
340        if (need_lock)
341                pthread_mutex_lock(&trace->libtrace_lock);
342        prev_state = t->state;
343        t->state = new_state;
344        if (t->type == THREAD_PERPKT) {
345                --trace->perpkt_thread_states[prev_state];
346                ++trace->perpkt_thread_states[new_state];
347        }
348
349        if (trace->config.debug_state)
350                fprintf(stderr, "Thread %d state changed from %d to %d\n",
351                        (int) t->tid, prev_state, t->state);
352
353        if (trace->perpkt_thread_states[THREAD_FINISHED] == trace->perpkt_thread_count) {
354                /* Make sure we save our final stats in case someone wants
355                 * them at the end of their program.
356                 */
357
358                trace_get_statistics(trace, NULL);
359                libtrace_change_state(trace, STATE_FINISHED, false);
360        }
361
362        pthread_cond_broadcast(&trace->perpkt_cond);
363        if (need_lock)
364                pthread_mutex_unlock(&trace->libtrace_lock);
365}
366
367/**
368 * This is valid once a trace is initialised
369 *
370 * @return True if the format supports parallel threads.
371 */
372static inline bool trace_supports_parallel(libtrace_t *trace)
373{
374        if (!trace) {
375                fprintf(stderr, "NULL trace passed into trace_supports_parallel()\n");
376                return false;
377        }
378        if (!trace->format) {
379                trace_set_err(trace, TRACE_ERR_BAD_FORMAT,
380                        "NULL capture format associated with trace in trace_supports_parallel()");
381                return false;
382        }
383        if (trace->format->pstart_input)
384                return true;
385        else
386                return false;
387}
388
389void libtrace_zero_thread(libtrace_thread_t * t) {
390        t->accepted_packets = 0;
391        t->filtered_packets = 0;
392        t->recorded_first = false;
393        t->tracetime_offset_usec = 0;
394        t->user_data = 0;
395        t->format_data = 0;
396        libtrace_zero_ringbuffer(&t->rbuffer);
397        t->trace = NULL;
398        t->ret = NULL;
399        t->type = THREAD_EMPTY;
400        t->perpkt_num = -1;
401}
402
403// Ints are aligned int is atomic so safe to read and write at same time
404// However write must be locked, read doesn't (We never try read before written to table)
405libtrace_thread_t * get_thread_table(libtrace_t *libtrace) {
406        int i = 0;
407        pthread_t tid = pthread_self();
408
409        for (;i<libtrace->perpkt_thread_count ;++i) {
410                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
411                        return &libtrace->perpkt_threads[i];
412        }
413        return NULL;
414}
415
416static libtrace_thread_t * get_thread_descriptor(libtrace_t *libtrace) {
417        libtrace_thread_t *ret;
418        if (!(ret = get_thread_table(libtrace))) {
419                pthread_t tid = pthread_self();
420                // Check if we are reporter or something else
421                if (libtrace->hasher_thread.type == THREAD_REPORTER &&
422                                pthread_equal(tid, libtrace->reporter_thread.tid))
423                        ret = &libtrace->reporter_thread;
424                else if (libtrace->hasher_thread.type == THREAD_HASHER &&
425                         pthread_equal(tid, libtrace->hasher_thread.tid))
426                        ret = &libtrace->hasher_thread;
427                else
428                        ret = NULL;
429        }
430        return ret;
431}
432
433DLLEXPORT void libtrace_make_packet_safe(libtrace_packet_t *pkt) {
434        // Duplicate the packet in standard malloc'd memory and free the
435        // original, This is a 1:1 exchange so the ocache count remains unchanged.
436        if (pkt->buf_control != TRACE_CTRL_PACKET) {
437                libtrace_packet_t *dup;
438                dup = trace_copy_packet(pkt);
439                /* Release the external buffer */
440                trace_fin_packet(pkt);
441                /* Copy the duplicated packet over the existing */
442                memcpy(pkt, dup, sizeof(libtrace_packet_t));
443                /* Free the packet structure */
444                free(dup);
445        }
446}
447
448/**
449 * Makes a libtrace_result_t safe, used when pausing a trace.
450 * This will call libtrace_make_packet_safe if the result is
451 * a packet.
452 */
453DLLEXPORT void libtrace_make_result_safe(libtrace_result_t *res) {
454        if (res->type == RESULT_PACKET) {
455                libtrace_make_packet_safe(res->value.pkt);
456        }
457}
458
459/**
460 * Holds threads in a paused state, until released by broadcasting
461 * the condition mutex.
462 */
463static void trace_thread_pause(libtrace_t *trace, libtrace_thread_t *t) {
464        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
465        thread_change_state(trace, t, THREAD_PAUSED, false);
466        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
467                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
468        }
469        thread_change_state(trace, t, THREAD_RUNNING, false);
470        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
471}
472
473/**
474 * Sends a packet to the user, expects either a valid packet or a TICK packet.
475 *
476 * @param trace The trace
477 * @param t The current thread
478 * @param packet A pointer to the packet storage, which may be set to null upon
479 *               return, or a packet to be finished.
480 * @param tracetime If true packets are delayed to match with tracetime
481 * @return 0 is successful, otherwise if playing back in tracetime
482 *         READ_MESSAGE(-2) can be returned in which case the packet is not sent.
483 *
484 * @note READ_MESSAGE will only be returned if tracetime is true.
485 */
486static inline int dispatch_packet(libtrace_t *trace,
487                                  libtrace_thread_t *t,
488                                  libtrace_packet_t **packet,
489                                  bool tracetime) {
490
491        if ((*packet)->error > 0) {
492                if (tracetime) {
493                        if (delay_tracetime(trace, packet[0], t) == READ_MESSAGE)
494                                return READ_MESSAGE;
495                }
496                if (!IS_LIBTRACE_META_PACKET((*packet))) {
497                        t->accepted_packets++;
498                }
499
500                /* If packet is meta call the meta callback */
501                if (IS_LIBTRACE_META_PACKET((*packet))) {
502                        /* Pass to meta callback if defined else pass to packet callback */
503                        if (trace->perpkt_cbs->message_meta_packet) {
504                                *packet = (*trace->perpkt_cbs->message_meta_packet)(trace, t,
505                                        trace->global_blob, t->user_data, *packet);
506                        } else if (trace->perpkt_cbs->message_packet) {
507                                *packet = (*trace->perpkt_cbs->message_packet)(trace, t,
508                                        trace->global_blob, t->user_data, *packet);
509                        }
510                } else {
511                        if (trace->perpkt_cbs->message_packet) {
512                                *packet = (*trace->perpkt_cbs->message_packet)(trace, t,
513                                        trace->global_blob, t->user_data, *packet);
514                        }
515                }
516                trace_fin_packet(*packet);
517        } else {
518                if ((*packet)->error != READ_TICK) {
519                        trace_set_err(trace, TRACE_ERR_BAD_STATE,
520                                "dispatch_packet() called with invalid 'packet'");
521                        return -1;
522                }
523                libtrace_generic_t data = {.uint64 = trace_packet_get_order(*packet)};
524                send_message(trace, t, MESSAGE_TICK_COUNT, data, t);
525        }
526        return 0;
527}
528
529/**
530 * Sends a batch of packets to the user, expects either a valid packet or a
531 * TICK packet.
532 *
533 * @param trace The trace
534 * @param t The current thread
535 * @param packets [in,out] An array of packets, these may be null upon return
536 * @param nb_packets The total number of packets in the list
537 * @param empty [in,out] A pointer to an integer storing the first empty slot,
538 * upon return this is updated
539 * @param offset [in,out] The offset into the array, upon return this is updated
540 * @param tracetime If true packets are delayed to match with tracetime
541 * @return 0 is successful, otherwise if playing back in tracetime
542 *         READ_MESSAGE(-2) can be returned in which case the packet is not sent.
543 *
544 * @note READ_MESSAGE will only be returned if tracetime is true.
545 */
546static inline int dispatch_packets(libtrace_t *trace,
547                                  libtrace_thread_t *t,
548                                  libtrace_packet_t *packets[],
549                                  int nb_packets, int *empty, int *offset,
550                                  bool tracetime) {
551        for (;*offset < nb_packets; ++*offset) {
552                int ret;
553                ret = dispatch_packet(trace, t, &packets[*offset], tracetime);
554                if (ret == 0) {
555                        /* Move full slots to front as we go */
556                        if (packets[*offset]) {
557                                if (*empty != *offset) {
558                                        packets[*empty] = packets[*offset];
559                                        packets[*offset] = NULL;
560                                }
561                                ++*empty;
562                        }
563                } else {
564                        /* Break early */
565                        if (ret != READ_MESSAGE) {
566                                trace_set_err(trace, TRACE_ERR_UNKNOWN_OPTION,
567                                        "dispatch_packets() called with at least one invalid packet");
568                                return -1;
569                        }
570                        return READ_MESSAGE;
571                }
572        }
573
574        return 0;
575}
576
577/**
578 * Pauses a per packet thread, messages will not be processed when the thread
579 * is paused.
580 *
581 * This process involves reading packets if a hasher thread is used. As such
582 * this function can fail to pause due to errors when reading in which case
583 * the thread should be stopped instead.
584 *
585 *
586 * @brief trace_perpkt_thread_pause
587 * @return READ_ERROR(-1) or READ_EOF(0) or 1 if successfull
588 */
589static int trace_perpkt_thread_pause(libtrace_t *trace, libtrace_thread_t *t,
590                                     libtrace_packet_t *packets[],
591                                     int nb_packets, int *empty, int *offset) {
592        libtrace_packet_t * packet = NULL;
593
594        /* Let the user thread know we are going to pause */
595        send_message(trace, t, MESSAGE_PAUSING, gen_zero, t);
596
597        /* Send through any remaining packets (or messages) without delay */
598
599        /* First send those packets already read, as fast as possible
600         * This should never fail or check for messages etc. */
601        ASSERT_RET(dispatch_packets(trace, t, packets, nb_packets, empty,
602                                    offset, false), == 0);
603
604        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
605        /* If a hasher thread is running, empty input queues so we don't lose data */
606        if (trace_has_dedicated_hasher(trace)) {
607                // The hasher has stopped by this point, so the queue shouldn't be filling
608                while(!libtrace_ringbuffer_is_empty(&t->rbuffer) || t->format_data) {
609                        int ret = trace->pread(trace, t, &packet, 1);
610                        if (ret == 1) {
611                                if (packet->error > 0) {
612                                        store_first_packet(trace, packet, t);
613                                }
614                                ASSERT_RET(dispatch_packet(trace, t, &packet, false), == 0);
615                                if (packet == NULL)
616                                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
617                        } else if (ret != READ_MESSAGE) {
618                                /* Ignore messages we pick these up next loop */
619                                if (!(ret == READ_EOF || ret == READ_ERROR)) {
620                                        trace_set_err(trace, TRACE_ERR_PAUSE_PTHREAD,
621                                                "Error pausing processing thread in trace_perpkt_thread_pause()");
622                                        return -1;
623                                }
624                                /* Verify no packets are remaining */
625                                /* TODO refactor this sanity check out!! */
626                                while (!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
627                                        ASSERT_RET(trace->pread(trace, t, &packet, 1), <= 0);
628                                        // No packets after this should have any data in them
629                                        if (packet->error > 0) {
630                                                trace_set_err(trace, TRACE_ERR_BAD_PACKET, "Bogus data in "
631                                                        "libtrace ring buffer after pausing perpkt thread");
632                                                return -1;
633                                        }
634                                }
635                                libtrace_ocache_free(&trace->packet_freelist, (void **) &packet, 1, 1);
636                                return -1;
637                        }
638                }
639        }
640        libtrace_ocache_free(&trace->packet_freelist, (void **) &packet, 1, 1);
641
642        /* Now we do the actual pause, this returns when we resumed */
643        trace_thread_pause(trace, t);
644        send_message(trace, t, MESSAGE_RESUMING, gen_zero, t);
645        return 1;
646}
647
648/**
649 * The is the entry point for our packet processing threads.
650 */
651static void* perpkt_threads_entry(void *data) {
652        libtrace_t *trace = (libtrace_t *)data;
653        libtrace_thread_t *t;
654        libtrace_message_t message = {0, {.uint64=0}, NULL};
655        libtrace_packet_t *packets[trace->config.burst_size];
656        size_t i;
657        //int ret;
658        /* The current reading position into the packets */
659        int offset = 0;
660        /* The number of packets last read */
661        int nb_packets = 0;
662        /* The offset to the first NULL packet upto offset */
663        int empty = 0;
664        int j;
665
666        /* Wait until trace_pstart has been completed */
667        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
668        t = get_thread_table(trace);
669        if (!t) {
670                trace_set_err(trace, TRACE_ERR_THREAD, "Unable to get thread table in perpkt_threads_entry()");
671                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
672                pthread_exit(NULL);
673        }
674        if (trace->state == STATE_ERROR) {
675                thread_change_state(trace, t, THREAD_FINISHED, false);
676                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
677                pthread_exit(NULL);
678        }
679        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
680
681        if (trace->format->pregister_thread) {
682                if (trace->format->pregister_thread(trace, t, 
683                                trace_is_parallel(trace)) < 0) {
684                        thread_change_state(trace, t, THREAD_FINISHED, false);
685                        pthread_exit(NULL);
686                }
687        }
688
689        /* Fill our buffer with empty packets */
690        memset(&packets, 0, sizeof(void*) * trace->config.burst_size);
691        libtrace_ocache_alloc(&trace->packet_freelist, (void **) packets,
692                              trace->config.burst_size,
693                              trace->config.burst_size);
694
695        /* ~~~~~~~~~~~ Setup complete now we loop ~~~~~~~~~~~~~~~ */
696
697        /* Let the per_packet function know we have started */
698        send_message(trace, t, MESSAGE_STARTING, gen_zero, t);
699        send_message(trace, t, MESSAGE_RESUMING, gen_zero, t);
700
701        for (;;) {
702
703                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
704                        int ret;
705                        switch (message.code) {
706                                case MESSAGE_DO_PAUSE: // This is internal
707                                        ret = trace_perpkt_thread_pause(trace, t,
708                                              packets, nb_packets, &empty, &offset);
709                                        if (ret == READ_EOF) {
710                                                goto eof;
711                                        } else if (ret == READ_ERROR) {
712                                                goto error;
713                                        }
714                                        if (ret != 1) {
715                                                fprintf(stderr, "Unknown error pausing thread in perpkt_threads_entry()\n");
716                                                pthread_exit(NULL);
717                                        }
718                                        continue;
719                                case MESSAGE_DO_STOP: // This is internal
720                                        goto eof;
721                        }
722                        send_message(trace, t, message.code, message.data, 
723                                        message.sender);
724                        /* Continue and the empty messages out before packets */
725                        continue;
726                }
727
728
729                /* Do we need to read a new set of packets MOST LIKELY we do */
730                if (offset == nb_packets) {
731                        /* Refill the packet buffer */
732                        if (empty != nb_packets) {
733                                // Refill the empty packets
734                                libtrace_ocache_alloc(&trace->packet_freelist,
735                                                      (void **) &packets[empty],
736                                                      nb_packets - empty,
737                                                      nb_packets - empty);
738                        }
739                        if (!trace->pread) {
740                                if (!packets[0]) {
741                                        fprintf(stderr, "Unable to read into NULL packet structure\n");
742                                        pthread_exit(NULL);
743                                }
744                                nb_packets = trace_read_packet(trace, packets[0]);
745                                packets[0]->error = nb_packets;
746                                if (nb_packets > 0)
747                                        nb_packets = 1;
748                        } else {
749                                nb_packets = trace->pread(trace, t, packets, trace->config.burst_size);
750                        }
751                        offset = 0;
752                        empty = 0;
753                }
754
755                /* Handle error/message cases */
756                if (nb_packets > 0) {
757                        /* Store the first non-meta packet */
758                        for (j = 0; j < nb_packets; j++) {
759                                if (t->recorded_first)
760                                        break;
761                                if (packets[j]->error > 0) {
762                                        store_first_packet(trace, packets[j], t);
763                                }
764                        }
765                        dispatch_packets(trace, t, packets, nb_packets, &empty,
766                                         &offset, trace->tracetime);
767                } else {
768                        switch (nb_packets) {
769                        case READ_EOF:
770                                goto eof;
771                        case READ_ERROR:
772                                goto error;
773                        case READ_MESSAGE:
774                                nb_packets = 0;
775                                continue;
776                        default:
777                                fprintf(stderr, "Unexpected error %d!!\n", nb_packets);
778                                goto error;
779                        }
780                }
781
782        }
783
784error:
785        message.code = MESSAGE_DO_STOP;
786        message.sender = t;
787        message.data.uint64 = 0;
788        trace_message_perpkts(trace, &message);
789eof:
790        /* ~~~~~~~~~~~~~~ Trace is finished do tear down ~~~~~~~~~~~~~~~~~~~~~ */
791
792        // Let the per_packet function know we have stopped
793        send_message(trace, t, MESSAGE_PAUSING, gen_zero, t);
794        send_message(trace, t, MESSAGE_STOPPING, gen_zero, t);
795
796        // Free any remaining packets
797        for (i = 0; i < trace->config.burst_size; i++) {
798                if (packets[i]) {
799                        libtrace_ocache_free(&trace->packet_freelist, (void **) &packets[i], 1, 1);
800                        packets[i] = NULL;
801                }
802        }
803
804        thread_change_state(trace, t, THREAD_FINISHED, true);
805
806        /* Make sure the reporter sees we have finished */
807        if (trace_has_reporter(trace))
808                trace_post_reporter(trace);
809
810        // Release all ocache memory before unregistering with the format
811        // because this might(it does in DPDK) unlink the formats mempool
812        // causing destroy/finish packet to fail.
813        libtrace_ocache_unregister_thread(&trace->packet_freelist);
814        if (trace->format->punregister_thread) {
815                trace->format->punregister_thread(trace, t);
816        }
817        print_memory_stats();
818
819        pthread_exit(NULL);
820}
821
822/**
823 * The start point for our single threaded hasher thread, this will read
824 * and hash a packet from a data source and queue it against the correct
825 * core to process it.
826 */
827static void* hasher_entry(void *data) {
828        libtrace_t *trace = (libtrace_t *)data;
829        libtrace_thread_t * t;
830        int i;
831        libtrace_packet_t * packet;
832        libtrace_message_t message = {0, {.uint64=0}, NULL};
833        int pkt_skipped = 0;
834
835        if (!trace_has_dedicated_hasher(trace)) {
836                fprintf(stderr, "Trace does not have hasher associated with it in hasher_entry()\n");
837                pthread_exit(NULL);
838        }
839        /* Wait until all threads are started and objects are initialised (ring buffers) */
840        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
841        t = &trace->hasher_thread;
842        if (!(t->type == THREAD_HASHER && pthread_equal(pthread_self(), t->tid))) {
843                fprintf(stderr, "Incorrect thread type or non matching thread IDs in hasher_entry()\n");
844                pthread_exit(NULL);
845        }
846
847        if (trace->state == STATE_ERROR) {
848                thread_change_state(trace, t, THREAD_FINISHED, false);
849                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
850                pthread_exit(NULL);
851        }
852        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
853
854        /* We are reading but it is not the parallel API */
855        if (trace->format->pregister_thread) {
856                trace->format->pregister_thread(trace, t, true);
857        }
858
859        /* Read all packets in then hash and queue against the correct thread */
860        while (1) {
861                int thread;
862                if (!pkt_skipped)
863                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
864                if (!packet) {
865                        fprintf(stderr, "Hasher thread was unable to get a fresh packet from the "
866                                "object cache\n");
867                        pthread_exit(NULL);
868                }
869
870                // Check for messages that we expect MESSAGE_DO_PAUSE, (internal messages only)
871                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
872                        switch(message.code) {
873                                case MESSAGE_DO_PAUSE:
874                                        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
875                                        thread_change_state(trace, t, THREAD_PAUSED, false);
876                                        pthread_cond_broadcast(&trace->perpkt_cond);
877                                        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
878                                                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
879                                        }
880                                        thread_change_state(trace, t, THREAD_RUNNING, false);
881                                        pthread_cond_broadcast(&trace->perpkt_cond);
882                                        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
883                                        break;
884                                case MESSAGE_DO_STOP:
885                                        /* Either FINISHED or FINISHING */
886                                        if (!(trace->started == false)) {
887                                                fprintf(stderr, "STOP message received by hasher, but "
888                                                        "trace is still active\n");
889                                                pthread_exit(NULL);
890                                        }
891                                        /* Mark the current packet as EOF */
892                                        packet->error = 0;
893                                        goto hasher_eof;
894                                default:
895                                        fprintf(stderr, "Hasher thread didn't expect message code=%d\n", message.code);
896                        }
897                        pkt_skipped = 1;
898                        continue;
899                }
900
901                if ((packet->error = trace_read_packet(trace, packet)) <1) {
902                        if (packet->error == READ_MESSAGE) {
903                                pkt_skipped = 1;
904                                continue;
905                        } else {
906                                break; /* We are EOF or error'd either way we stop  */
907                        }
908                }
909
910                /* We are guaranteed to have a hash function i.e. != NULL */
911                trace_packet_set_hash(packet, (*trace->hasher)(packet, trace->hasher_data));
912                thread = trace_packet_get_hash(packet) % trace->perpkt_thread_count;
913                /* Blocking write to the correct queue - I'm the only writer */
914                if (trace->perpkt_threads[thread].state != THREAD_FINISHED) {
915                        uint64_t order = trace_packet_get_order(packet);
916                        libtrace_ringbuffer_write(&trace->perpkt_threads[thread].rbuffer, packet);
917                        if (trace->config.tick_count && order % trace->config.tick_count == 0) {
918                                // Write ticks to everyone else
919                                libtrace_packet_t * pkts[trace->perpkt_thread_count];
920                                memset(pkts, 0, sizeof(void *) * trace->perpkt_thread_count);
921                                libtrace_ocache_alloc(&trace->packet_freelist, (void **) pkts, trace->perpkt_thread_count, trace->perpkt_thread_count);
922                                for (i = 0; i < trace->perpkt_thread_count; i++) {
923                                        pkts[i]->error = READ_TICK;
924                                        trace_packet_set_order(pkts[i], order);
925                                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, pkts[i]);
926                                }
927                        }
928                        pkt_skipped = 0;
929                }
930        }
931hasher_eof:
932        /* Broadcast our last failed read to all threads */
933        for (i = 0; i < trace->perpkt_thread_count; i++) {
934                libtrace_packet_t * bcast;
935                if (i == trace->perpkt_thread_count - 1) {
936                        bcast = packet;
937                } else {
938                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &bcast, 1, 1);
939                        bcast->error = packet->error;
940                }
941                ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
942                if (trace->perpkt_threads[i].state != THREAD_FINISHED) {
943                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, bcast);
944                } else {
945                        libtrace_ocache_free(&trace->packet_freelist, (void **) &bcast, 1, 1);
946                }
947                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
948        }
949
950        // We don't need to free the packet
951        thread_change_state(trace, t, THREAD_FINISHED, true);
952
953        libtrace_ocache_unregister_thread(&trace->packet_freelist);
954        if (trace->format->punregister_thread) {
955                trace->format->punregister_thread(trace, t);
956        }
957        print_memory_stats();
958
959        // TODO remove from TTABLE t sometime
960        pthread_exit(NULL);
961}
962
963/* Our simplest case when a thread becomes ready it can obtain an exclusive
964 * lock to read packets from the underlying trace.
965 */
966static int trace_pread_packet_first_in_first_served(libtrace_t *libtrace,
967                                                    libtrace_thread_t *t,
968                                                    libtrace_packet_t *packets[],
969                                                    size_t nb_packets) {
970        size_t i = 0;
971        //bool tick_hit = false;
972
973        ASSERT_RET(pthread_mutex_lock(&libtrace->read_packet_lock), == 0);
974        /* Read nb_packets */
975        for (i = 0; i < nb_packets; ++i) {
976                if (libtrace_message_queue_count(&t->messages) > 0) {
977                        if ( i==0 ) {
978                                ASSERT_RET(pthread_mutex_unlock(&libtrace->read_packet_lock), == 0);
979                                return READ_MESSAGE;
980                        } else {
981                                break;
982                        }
983                }
984                packets[i]->error = trace_read_packet(libtrace, packets[i]);
985
986                if (packets[i]->error <= 0) {
987                        /* We'll catch this next time if we have already got packets */
988                        if ( i==0 ) {
989                                ASSERT_RET(pthread_mutex_unlock(&libtrace->read_packet_lock), == 0);
990                                return packets[i]->error;
991                        } else {
992                                break;
993                        }
994                }
995                /*
996                if (libtrace->config.tick_count && trace_packet_get_order(packets[i]) % libtrace->config.tick_count == 0) {
997                        tick_hit = true;
998                }*/
999
1000                // Doing this inside the lock ensures the first packet is
1001                // always recorded first
1002                if (!t->recorded_first && packets[0]->error > 0) {
1003                        store_first_packet(libtrace, packets[0], t);
1004                }
1005        }
1006        ASSERT_RET(pthread_mutex_unlock(&libtrace->read_packet_lock), == 0);
1007        /* XXX TODO this needs to be inband with packets, or we don't bother in this case
1008        if (tick_hit) {
1009                libtrace_message_t tick;
1010                tick.additional.uint64 = trace_packet_get_order(packets[i]);
1011                tick.code = MESSAGE_TICK;
1012                trace_send_message_to_perpkts(libtrace, &tick);
1013        } */
1014        return i;
1015}
1016
1017/**
1018 * For the case that we have a dedicated hasher thread
1019 * 1. We read a packet from our buffer
1020 * 2. Move that into the packet provided (packet)
1021 */
1022inline static int trace_pread_packet_hasher_thread(libtrace_t *libtrace,
1023                                                   libtrace_thread_t *t,
1024                                                   libtrace_packet_t *packets[],
1025                                                   size_t nb_packets) {
1026        size_t i;
1027
1028        /* We store the last error message here */
1029        if (t->format_data) {
1030                return ((libtrace_packet_t *)t->format_data)->error;
1031        }
1032
1033        // Always grab at least one
1034        if (packets[0]) // Recycle the old get the new
1035                libtrace_ocache_free(&libtrace->packet_freelist, (void **) packets, 1, 1);
1036        packets[0] = libtrace_ringbuffer_read(&t->rbuffer);
1037
1038        if (packets[0]->error <= 0 && packets[0]->error != READ_TICK) {
1039                return packets[0]->error;
1040        }
1041
1042        for (i = 1; i < nb_packets; i++) {
1043                if (packets[i]) // Recycle the old get the new
1044                        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packets[i], 1, 1);
1045                if (!libtrace_ringbuffer_try_read(&t->rbuffer, (void **) &packets[i])) {
1046                        packets[i] = NULL;
1047                        break;
1048                }
1049
1050                /* We will return an error or EOF the next time around */
1051                if (packets[i]->error <= 0 && packets[0]->error != READ_TICK) {
1052                        /* The message case will be checked automatically -
1053                           However other cases like EOF and error will only be
1054                           sent once*/
1055                        if (packets[i]->error != READ_MESSAGE) {
1056                                t->format_data = packets[i];
1057                        }
1058                        break;
1059                }
1060        }
1061
1062        return i;
1063}
1064
1065/**
1066 * For the first packet of each queue we keep a copy and note the system
1067 * time it was received at.
1068 *
1069 * This is used for finding the first packet when playing back a trace
1070 * in trace time. And can be used by real time applications to print
1071 * results out every XXX seconds.
1072 */
1073void store_first_packet(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t)
1074{
1075
1076        libtrace_message_t mesg = {0, {.uint64=0}, NULL};
1077        struct timeval tv;
1078        libtrace_packet_t * dup;
1079
1080        if (t->recorded_first) {
1081                return;
1082        }
1083
1084        if (IS_LIBTRACE_META_PACKET(packet)) {
1085                return;
1086        }
1087
1088        /* We mark system time against a copy of the packet */
1089        gettimeofday(&tv, NULL);
1090        dup = trace_copy_packet(packet);
1091
1092        ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
1093        libtrace->first_packets.packets[t->perpkt_num].packet = dup;
1094        memcpy(&libtrace->first_packets.packets[t->perpkt_num].tv, &tv, sizeof(tv));
1095        libtrace->first_packets.count++;
1096
1097        /* Now update the first */
1098        if (libtrace->first_packets.count == 1) {
1099                /* We the first entry hence also the first known packet */
1100                libtrace->first_packets.first = t->perpkt_num;
1101        } else {
1102                /* Check if we are newer than the previous 'first' packet */
1103                size_t first = libtrace->first_packets.first;
1104                struct timeval cur_ts = trace_get_timeval(dup);
1105                struct timeval first_ts = trace_get_timeval(libtrace->first_packets.packets[first].packet);
1106                if (timercmp(&cur_ts, &first_ts, <))
1107                        libtrace->first_packets.first = t->perpkt_num;
1108        }
1109        ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
1110
1111        memset(&mesg, 0, sizeof(libtrace_message_t));
1112        mesg.code = MESSAGE_FIRST_PACKET;
1113        trace_message_reporter(libtrace, &mesg);
1114        trace_message_perpkts(libtrace, &mesg);
1115        t->recorded_first = true;
1116}
1117
1118DLLEXPORT int trace_get_first_packet(libtrace_t *libtrace,
1119                                     libtrace_thread_t *t,
1120                                     const libtrace_packet_t **packet,
1121                                     const struct timeval **tv)
1122{
1123        void * tmp;
1124        int ret = 0;
1125
1126        if (t) {
1127                if (t->type != THREAD_PERPKT || t->trace != libtrace)
1128                        return -1;
1129        }
1130
1131        /* Throw away these which we don't use */
1132        if (!packet)
1133                packet = (const libtrace_packet_t **) &tmp;
1134        if (!tv)
1135                tv = (const struct timeval **) &tmp;
1136
1137        ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
1138        if (t) {
1139                /* Get the requested thread */
1140                *packet = libtrace->first_packets.packets[t->perpkt_num].packet;
1141                *tv = &libtrace->first_packets.packets[t->perpkt_num].tv;
1142        } else if (libtrace->first_packets.count) {
1143                /* Get the first packet across all threads */
1144                *packet = libtrace->first_packets.packets[libtrace->first_packets.first].packet;
1145                *tv = &libtrace->first_packets.packets[libtrace->first_packets.first].tv;
1146                if (libtrace->first_packets.count == (size_t) libtrace->perpkt_thread_count) {
1147                        ret = 1;
1148                } else {
1149                        struct timeval curr_tv;
1150                        // If a second has passed since the first entry we will assume this is the very first packet
1151                        gettimeofday(&curr_tv, NULL);
1152                        if (curr_tv.tv_sec > (*tv)->tv_sec) {
1153                                if(curr_tv.tv_usec > (*tv)->tv_usec || curr_tv.tv_sec - (*tv)->tv_sec > 1) {
1154                                        ret = 1;
1155                                }
1156                        }
1157                }
1158        } else {
1159                *packet = NULL;
1160                *tv = NULL;
1161        }
1162        ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
1163        return ret;
1164}
1165
1166
1167DLLEXPORT uint64_t tv_to_usec(const struct timeval *tv)
1168{
1169        return (uint64_t) tv->tv_sec*1000000ull + (uint64_t) tv->tv_usec;
1170}
1171
1172inline static struct timeval usec_to_tv(uint64_t usec)
1173{
1174        struct timeval tv;
1175        tv.tv_sec = usec / 1000000;
1176        tv.tv_usec = usec % 1000000;
1177        return tv;
1178}
1179
1180/** Similar to delay_tracetime but send messages to all threads periodically */
1181static void* reporter_entry(void *data) {
1182        libtrace_message_t message = {0, {.uint64=0}, NULL};
1183        libtrace_t *trace = (libtrace_t *)data;
1184        libtrace_thread_t *t = &trace->reporter_thread;
1185
1186        /* Wait until all threads are started */
1187        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
1188        if (trace->state == STATE_ERROR) {
1189                thread_change_state(trace, t, THREAD_FINISHED, false);
1190                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1191                pthread_exit(NULL);
1192        }
1193        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1194
1195        if (trace->format->pregister_thread) {
1196                trace->format->pregister_thread(trace, t, false);
1197        }
1198
1199        send_message(trace, t, MESSAGE_STARTING, (libtrace_generic_t){0}, t);
1200        send_message(trace, t, MESSAGE_RESUMING, (libtrace_generic_t){0}, t);
1201
1202        while (!trace_has_finished(trace)) {
1203                if (trace->config.reporter_polling) {
1204                        if (libtrace_message_queue_try_get(&t->messages, &message) == LIBTRACE_MQ_FAILED)
1205                                message.code = MESSAGE_POST_REPORTER;
1206                } else {
1207                        libtrace_message_queue_get(&t->messages, &message);
1208                }
1209                switch (message.code) {
1210                        // Check for results
1211                        case MESSAGE_POST_REPORTER:
1212                                trace->combiner.read(trace, &trace->combiner);
1213                                break;
1214                        case MESSAGE_DO_PAUSE:
1215                                if(trace->combiner.pause) {
1216                                        trace->combiner.pause(trace, &trace->combiner);
1217                                }
1218                                send_message(trace, t, MESSAGE_PAUSING,
1219                                                (libtrace_generic_t) {0}, t);
1220                                trace_thread_pause(trace, t);
1221                                send_message(trace, t, MESSAGE_RESUMING,
1222                                                (libtrace_generic_t) {0}, t);
1223                                break;
1224                default:
1225                        send_message(trace, t, message.code, message.data,
1226                                        message.sender);
1227                }
1228        }
1229
1230        // Flush out whats left now all our threads have finished
1231        trace->combiner.read_final(trace, &trace->combiner);
1232
1233        // GOODBYE
1234        send_message(trace, t, MESSAGE_PAUSING,(libtrace_generic_t) {0}, t);
1235        send_message(trace, t, MESSAGE_STOPPING,(libtrace_generic_t) {0}, t);
1236
1237        thread_change_state(trace, &trace->reporter_thread, THREAD_FINISHED, true);
1238        print_memory_stats();
1239        pthread_exit(NULL);
1240}
1241
1242/** Similar to delay_tracetime but send messages to all threads periodically */
1243static void* keepalive_entry(void *data) {
1244        struct timeval prev, next;
1245        libtrace_message_t message = {0, {.uint64=0}, NULL};
1246        libtrace_t *trace = (libtrace_t *)data;
1247        uint64_t next_release;
1248        libtrace_thread_t *t = &trace->keepalive_thread;
1249
1250        /* Wait until all threads are started */
1251        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
1252        if (trace->state == STATE_ERROR) {
1253                thread_change_state(trace, t, THREAD_FINISHED, false);
1254                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1255                pthread_exit(NULL);
1256        }
1257        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1258
1259        gettimeofday(&prev, NULL);
1260        memset(&message, 0, sizeof(libtrace_message_t));
1261        message.code = MESSAGE_TICK_INTERVAL;
1262
1263        while (trace->state != STATE_FINISHED) {
1264                fd_set rfds;
1265                next_release = tv_to_usec(&prev) + (trace->config.tick_interval * 1000);
1266                gettimeofday(&next, NULL);
1267                if (next_release > tv_to_usec(&next)) {
1268                        next = usec_to_tv(next_release - tv_to_usec(&next));
1269                        // Wait for timeout or a message
1270                        FD_ZERO(&rfds);
1271                        FD_SET(libtrace_message_queue_get_fd(&t->messages), &rfds);
1272                        if (select(libtrace_message_queue_get_fd(&t->messages)+1, &rfds, NULL, NULL, &next) == 1) {
1273                                libtrace_message_t msg;
1274                                libtrace_message_queue_get(&t->messages, &msg);
1275                                if (msg.code != MESSAGE_DO_STOP) {
1276                                        fprintf(stderr, "Unexpected message code in keepalive_entry()\n");
1277                                        pthread_exit(NULL);
1278                                }
1279                                goto done;
1280                        }
1281                }
1282                prev = usec_to_tv(next_release);
1283                if (trace->state == STATE_RUNNING) {
1284                        message.data.uint64 = ((((uint64_t)prev.tv_sec) << 32) +
1285                                               (((uint64_t)prev.tv_usec << 32)/1000000));
1286                        trace_message_perpkts(trace, &message);
1287                }
1288        }
1289done:
1290
1291        thread_change_state(trace, t, THREAD_FINISHED, true);
1292        pthread_exit(NULL);
1293}
1294
1295/**
1296 * Delays a packets playback so the playback will be in trace time.
1297 * This may break early if a message becomes available.
1298 *
1299 * Requires the first packet for this thread to be received.
1300 * @param libtrace  The trace
1301 * @param packet    The packet to delay
1302 * @param t         The current thread
1303 * @return Either READ_MESSAGE(-2) or 0 is successful
1304 */
1305static inline int delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t) {
1306        struct timeval curr_tv, pkt_tv;
1307        uint64_t next_release = t->tracetime_offset_usec;
1308        uint64_t curr_usec;
1309
1310        if (!t->tracetime_offset_usec) {
1311                const libtrace_packet_t *first_pkt;
1312                const struct timeval *sys_tv;
1313                int64_t initial_offset;
1314                int stable = trace_get_first_packet(libtrace, NULL, &first_pkt, &sys_tv);
1315                if (!first_pkt)
1316                        return 0;
1317                pkt_tv = trace_get_timeval(first_pkt);
1318                initial_offset = (int64_t)tv_to_usec(sys_tv) - (int64_t)tv_to_usec(&pkt_tv);
1319                /* In the unlikely case offset is 0, change it to 1 */
1320                if (stable)
1321                        t->tracetime_offset_usec = initial_offset ? initial_offset: 1;
1322                next_release = initial_offset;
1323        }
1324        /* next_release == offset */
1325        pkt_tv = trace_get_timeval(packet);
1326        next_release += tv_to_usec(&pkt_tv);
1327        gettimeofday(&curr_tv, NULL);
1328        curr_usec = tv_to_usec(&curr_tv);
1329        if (next_release > curr_usec) {
1330                int ret, mesg_fd = libtrace_message_queue_get_fd(&t->messages);
1331                struct timeval delay_tv = usec_to_tv(next_release-curr_usec);
1332                fd_set rfds;
1333                FD_ZERO(&rfds);
1334                FD_SET(mesg_fd, &rfds);
1335                // We need to wait
1336                ret = select(mesg_fd+1, &rfds, NULL, NULL, &delay_tv);
1337                if (ret == 0) {
1338                        return 0;
1339                } else if (ret > 0) {
1340                        return READ_MESSAGE;
1341                } else {
1342                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, "Unexpected return from select in delay_tracetime()");
1343                        return -1;
1344                }
1345        }
1346        return 0;
1347}
1348
1349/* Discards packets that don't match the filter.
1350 * Discarded packets are emptied and then moved to the end of the packet list.
1351 *
1352 * @param trace       The trace format, containing the filter
1353 * @param packets     An array of packets
1354 * @param nb_packets  The number of valid items in packets
1355 *
1356 * @return The number of packets that passed the filter, which are moved to
1357 *          the start of the packets array
1358 */
1359static inline size_t filter_packets(libtrace_t *trace,
1360                                    libtrace_packet_t **packets,
1361                                    size_t nb_packets) {
1362        size_t offset = 0;
1363        size_t i;
1364
1365        for (i = 0; i < nb_packets; ++i) {
1366                // The filter needs the trace attached to receive the link type
1367                packets[i]->trace = trace;
1368                if (trace_apply_filter(trace->filter, packets[i])) {
1369                        libtrace_packet_t *tmp;
1370                        tmp = packets[offset];
1371                        packets[offset++] = packets[i];
1372                        packets[i] = tmp;
1373                } else {
1374                        trace_fin_packet(packets[i]);
1375                }
1376        }
1377
1378        return offset;
1379}
1380
1381/* Read a batch of packets from the trace into a buffer.
1382 * Note that this function will block until a packet is read (or EOF is reached)
1383 *
1384 * @param libtrace    The trace
1385 * @param t           The thread
1386 * @param packets     An array of packets
1387 * @param nb_packets  The number of empty packets in packets
1388 * @return The number of packets read, 0 on EOF (or an error/message -1,-2).
1389 */
1390static int trace_pread_packet_wrapper(libtrace_t *libtrace,
1391                                      libtrace_thread_t *t,
1392                                      libtrace_packet_t *packets[],
1393                                      size_t nb_packets) {
1394        int i;
1395        if (!libtrace) {
1396                fprintf(stderr, "NULL trace passed into trace_read_packet()\n");
1397                return TRACE_ERR_NULL_TRACE;
1398        }
1399        if (nb_packets <= 0) {
1400                trace_set_err(libtrace, TRACE_ERR_NULL,
1401                        "nb_packets must be greater than zero in trace_pread_packet_wrapper()");
1402                return -1;
1403        }
1404        if (trace_is_err(libtrace))
1405                return -1;
1406        if (!libtrace->started) {
1407                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
1408                              "You must call libtrace_start() before trace_read_packet()\n");
1409                return -1;
1410        }
1411
1412        if (libtrace->format->pread_packets) {
1413                int ret;
1414#if 0
1415                for (i = 0; i < (int) nb_packets; ++i) {
1416                        if (!i[packets]) {
1417                                trace_set_err(libtrace, TRACE_ERR_BAD_STATE, "NULL packets in "
1418                                        "trace_pread_packet_wrapper()");
1419                                return -1;
1420                        }
1421                        if (!(packets[i]->buf_control==TRACE_CTRL_PACKET ||
1422                              packets[i]->buf_control==TRACE_CTRL_EXTERNAL)) {
1423                                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,
1424                                              "Packet passed to trace_read_packet() is invalid\n");
1425                                return -1;
1426                        }
1427                }
1428#endif
1429                do {
1430                        ret=libtrace->format->pread_packets(libtrace, t,
1431                                                            packets,
1432                                                            nb_packets);
1433                        /* Error, EOF or message? */
1434                        if (ret <= 0) {
1435                                return ret;
1436                        }
1437
1438                        if (libtrace->filter) {
1439                                int remaining;
1440                                remaining = filter_packets(libtrace,
1441                                                           packets, ret);
1442                                t->filtered_packets += ret - remaining;
1443                                ret = remaining;
1444                        }
1445                        for (i = 0; i < ret; ++i) {
1446                                /* We do not mark the packet against the trace,
1447                                 * before hand or after. After breaks DAG meta
1448                                 * packets and before is inefficient */
1449                                //packets[i]->trace = libtrace;
1450                                /* TODO IN FORMAT?? Like traditional libtrace */
1451                                if (libtrace->snaplen>0)
1452                                        trace_set_capture_length(packets[i],
1453                                                        libtrace->snaplen);
1454                                packets[i]->which_trace_start = libtrace->startcount;
1455                        }
1456                } while(ret == 0);
1457                return ret;
1458        }
1459        trace_set_err(libtrace, TRACE_ERR_UNSUPPORTED,
1460                      "This format does not support reading packets\n");
1461        return ~0U;
1462}
1463
1464/* Restarts a parallel trace, this is called from trace_pstart.
1465 * The libtrace lock is held upon calling this function.
1466 * Typically with a parallel trace the threads are not
1467 * killed rather.
1468 */
1469static int trace_prestart(libtrace_t * libtrace, void *global_blob,
1470                          libtrace_callback_set_t *per_packet_cbs, 
1471                          libtrace_callback_set_t *reporter_cbs) {
1472        int i, err = 0;
1473        if (libtrace->state != STATE_PAUSED) {
1474                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
1475                        "trace(%s) is not currently paused",
1476                              libtrace->uridata);
1477                return -1;
1478        }
1479
1480        if (!libtrace_parallel) {
1481                trace_set_err(libtrace, TRACE_ERR_THREAD, "Trace_prestart() has been called on a "
1482                        "non-parallel libtrace input?");
1483                return -1;
1484        }
1485        if (libtrace->perpkt_thread_states[THREAD_RUNNING]) {
1486                trace_set_err(libtrace, TRACE_ERR_THREAD, "Cannot restart a parallel libtrace input "
1487                        "while it is still running");
1488                return -1;
1489        }
1490
1491        /* Reset first packets */
1492        pthread_spin_lock(&libtrace->first_packets.lock);
1493        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1494                if (libtrace->first_packets.packets[i].packet) {
1495                        trace_destroy_packet(libtrace->first_packets.packets[i].packet);
1496                        libtrace->first_packets.packets[i].packet = NULL;
1497                        libtrace->first_packets.packets[i].tv.tv_sec = 0;
1498                        libtrace->first_packets.packets[i].tv.tv_usec = 0;
1499                        libtrace->first_packets.count--;
1500                        libtrace->perpkt_threads[i].recorded_first = false;
1501                }
1502        }
1503        if (libtrace->first_packets.count != 0) {
1504                trace_set_err(libtrace, TRACE_ERR_THREAD, "Expected a first packets count of 0 in trace_pstart()");
1505                return -1;
1506        }
1507        libtrace->first_packets.first = 0;
1508        pthread_spin_unlock(&libtrace->first_packets.lock);
1509
1510        /* Reset delay */
1511        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1512                libtrace->perpkt_threads[i].tracetime_offset_usec = 0;
1513        }
1514
1515        /* Reset statistics */
1516        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1517                libtrace->perpkt_threads[i].accepted_packets = 0;
1518                libtrace->perpkt_threads[i].filtered_packets = 0;
1519        }
1520        libtrace->accepted_packets = 0;
1521        libtrace->filtered_packets = 0;
1522
1523        /* Update functions if requested */
1524        if(global_blob)
1525                libtrace->global_blob = global_blob;
1526
1527        if (per_packet_cbs) {
1528                if (libtrace->perpkt_cbs)
1529                        trace_destroy_callback_set(libtrace->perpkt_cbs);
1530                libtrace->perpkt_cbs = trace_create_callback_set();
1531                memcpy(libtrace->perpkt_cbs, per_packet_cbs, 
1532                                sizeof(libtrace_callback_set_t));
1533        }
1534
1535        if (reporter_cbs) {
1536                if (libtrace->reporter_cbs)
1537                        trace_destroy_callback_set(libtrace->reporter_cbs);
1538
1539                libtrace->reporter_cbs = trace_create_callback_set();
1540                memcpy(libtrace->reporter_cbs, reporter_cbs,
1541                                sizeof(libtrace_callback_set_t));
1542        }
1543
1544        if (trace_is_parallel(libtrace)) {
1545                err = libtrace->format->pstart_input(libtrace);
1546        } else {
1547                if (libtrace->format->start_input) {
1548                        err = libtrace->format->start_input(libtrace);
1549                }
1550        }
1551
1552        if (err == 0) {
1553                libtrace->started = true;
1554                libtrace->startcount ++;
1555                libtrace_change_state(libtrace, STATE_RUNNING, false);
1556        }
1557        return err;
1558}
1559
1560/**
1561 * @return the number of CPU cores on the machine. -1 if unknown.
1562 */
1563SIMPLE_FUNCTION static int get_nb_cores() {
1564        int numCPU;
1565#ifdef _SC_NPROCESSORS_ONLN
1566        /* Most systems do this now */
1567        numCPU = sysconf(_SC_NPROCESSORS_ONLN);
1568
1569#else
1570        int mib[] = {CTL_HW, HW_AVAILCPU};
1571        size_t len = sizeof(numCPU);
1572
1573        /* get the number of CPUs from the system */
1574        sysctl(mib, 2, &numCPU, &len, NULL, 0);
1575#endif
1576        return numCPU <= 0 ? 1 : numCPU;
1577}
1578
1579/**
1580 * Verifies the configuration and sets default values for any values not
1581 * specified by the user.
1582 */
1583static void verify_configuration(libtrace_t *libtrace) {
1584
1585        if (libtrace->config.hasher_queue_size <= 0)
1586                libtrace->config.hasher_queue_size = 1000;
1587
1588        if (libtrace->config.perpkt_threads <= 0) {
1589                libtrace->perpkt_thread_count = get_nb_cores();
1590                if (libtrace->perpkt_thread_count <= 0)
1591                        // Lets just use one
1592                        libtrace->perpkt_thread_count = 1;
1593        } else {
1594                libtrace->perpkt_thread_count = libtrace->config.perpkt_threads;
1595        }
1596
1597        if (libtrace->config.reporter_thold <= 0)
1598                libtrace->config.reporter_thold = 100;
1599        if (libtrace->config.burst_size <= 0)
1600                libtrace->config.burst_size = 32;
1601        if (libtrace->config.thread_cache_size <= 0)
1602                libtrace->config.thread_cache_size = 64;
1603        if (libtrace->config.cache_size <= 0)
1604                libtrace->config.cache_size = (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count;
1605
1606        if (libtrace->config.cache_size <
1607                (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count)
1608                fprintf(stderr, "WARNING deadlocks may occur and extra memory allocating buffer sizes (packet_freelist_size) mismatched\n");
1609
1610        if (libtrace->combiner.initialise == NULL && libtrace->combiner.publish == NULL)
1611                libtrace->combiner = combiner_unordered;
1612
1613        /* Figure out if we are using a dedicated hasher thread? */
1614        if (libtrace->hasher && libtrace->perpkt_thread_count > 1) {
1615                libtrace->hasher_thread.type = THREAD_HASHER;
1616        }
1617}
1618
1619/**
1620 * Starts a libtrace_thread, including allocating memory for messaging.
1621 * Threads are expected to wait until the libtrace look is released.
1622 * Hence why we don't init structures until later.
1623 *
1624 * @param trace The trace the thread is associated with
1625 * @param t The thread that is filled when the thread is started
1626 * @param type The type of thread
1627 * @param start_routine The entry location of the thread
1628 * @param perpkt_num The perpkt thread number (should be set -1 if not perpkt)
1629 * @param name For debugging purposes set the threads name (Optional)
1630 *
1631 * @return 0 on success or -1 upon error in which case the libtrace error is set.
1632 *         In this situation the thread structure is zeroed.
1633 */
1634static int trace_start_thread(libtrace_t *trace,
1635                       libtrace_thread_t *t,
1636                       enum thread_types type,
1637                       void *(*start_routine) (void *),
1638                       int perpkt_num,
1639                       const char *name) {
1640#ifdef __linux__
1641        cpu_set_t cpus;
1642        int i;
1643#endif
1644        int ret;
1645        if (t->type != THREAD_EMPTY) {
1646                trace_set_err(trace, TRACE_ERR_THREAD,
1647                        "Expected thread type of THREAD_EMPTY in trace_start_thread()");
1648                return -1;
1649        }
1650        t->trace = trace;
1651        t->ret = NULL;
1652        t->user_data = NULL;
1653        t->type = type;
1654        t->state = THREAD_RUNNING;
1655
1656        if (!name) {
1657                trace_set_err(trace, TRACE_ERR_THREAD, "NULL thread name in trace_start_thread()");
1658                return -1;
1659        }
1660
1661#ifdef __linux__
1662        CPU_ZERO(&cpus);
1663        for (i = 0; i < get_nb_cores(); i++)
1664                CPU_SET(i, &cpus);
1665
1666        ret = pthread_create(&t->tid, NULL, start_routine, (void *) trace);
1667        if( ret == 0 ) {
1668                ret = pthread_setaffinity_np(t->tid, sizeof(cpus), &cpus);
1669        }
1670
1671#else
1672        ret = pthread_create(&t->tid, NULL, start_routine, (void *) trace);
1673#endif
1674        if (ret != 0) {
1675                libtrace_zero_thread(t);
1676                trace_set_err(trace, ret, "Failed to create a thread of type=%d\n", type);
1677                return -1;
1678        }
1679        libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
1680        if (trace_has_dedicated_hasher(trace) && type == THREAD_PERPKT) {
1681                libtrace_ringbuffer_init(&t->rbuffer,
1682                                         trace->config.hasher_queue_size,
1683                                         trace->config.hasher_polling?
1684                                                 LIBTRACE_RINGBUFFER_POLLING:
1685                                                 LIBTRACE_RINGBUFFER_BLOCKING);
1686        }
1687#if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__)
1688        if(name)
1689                pthread_setname_np(t->tid, name);
1690#endif
1691        t->perpkt_num = perpkt_num;
1692        return 0;
1693}
1694
1695/** Parses the environment variable LIBTRACE_CONF into the supplied
1696 * configuration structure.
1697 *
1698 * @param[in,out] libtrace The trace from which we determine the URI and set
1699 * the configuration.
1700 *
1701 * We search for 3 environment variables and apply them to the config in the
1702 * following order. Such that the first has the lowest priority.
1703 *
1704 * 1. LIBTRACE_CONF, The global environment configuration
1705 * 2. LIBTRACE_CONF_<FORMAT>, Applied to a given format
1706 * 3. LIBTRACE_CONF_<FORMAT_URI>, Applied the specified trace
1707 *
1708 * E.g.
1709 * - int:eth0 would match LIBTRACE_CONF, LIBTRACE_CONF_INT, LIBTRACE_CONF_INT_ETH0
1710 * - dag:/dev/dag0,0 would match LIBTRACE_CONF, LIBTRACE_CONF_DAG, LIBTRACE_CONF_DAG__DEV_DAG0_0
1711 * - test.erf would match LIBTRACE_CONF, LIBTRACE_CONF_ERF, LIBTRACE_CONF_ERF_TEST_ERF
1712 *
1713 * @note All environment variables names MUST only contian
1714 * [A-Z], [0-9] and [_] (underscore) and not start with a number. Any characters
1715 * outside of this range should be captilised if possible or replaced with an
1716 * underscore.
1717 */
1718static void parse_env_config (libtrace_t *libtrace) {
1719        char env_name[1024] = "LIBTRACE_CONF_";
1720        size_t len = strlen(env_name);
1721        size_t mark = 0;
1722        size_t i;
1723        char * env;
1724
1725        /* Make our compound string */
1726        strncpy(&env_name[len], libtrace->format->name, sizeof(env_name) - len);
1727        len += strlen(libtrace->format->name);
1728        strncpy(&env_name[len], ":", sizeof(env_name) - len);
1729        len += 1;
1730        strncpy(&env_name[len], libtrace->uridata, sizeof(env_name) - len);
1731
1732        /* env names are allowed to be A-Z (CAPS) 0-9 and _ */
1733        for (i = 0; env_name[i] != 0; ++i) {
1734                env_name[i] = toupper(env_name[i]);
1735                if(env_name[i] == ':') {
1736                        mark = i;
1737                }
1738                if (!( (env_name[i] >= 'A' && env_name[i] <= 'Z') ||
1739                       (env_name[i] >= '0' && env_name[i] <= '9') )) {
1740                        env_name[i] = '_';
1741                }
1742        }
1743
1744        /* First apply global env settings LIBTRACE_CONF */
1745        env = getenv("LIBTRACE_CONF");
1746        if (env)
1747        {
1748                printf("Got env %s", env);
1749                trace_set_configuration(libtrace, env);
1750        }
1751
1752        /* Then format settings LIBTRACE_CONF_<FORMAT> */
1753        if (mark != 0) {
1754                env_name[mark] = 0;
1755                env = getenv(env_name);
1756                if (env) {
1757                        trace_set_configuration(libtrace, env);
1758                }
1759                env_name[mark] = '_';
1760        }
1761
1762        /* Finally this specific trace LIBTRACE_CONF_<FORMAT_URI> */
1763        env = getenv(env_name);
1764        if (env) {
1765                trace_set_configuration(libtrace, env);
1766        }
1767}
1768
1769DLLEXPORT bool trace_is_parallel(libtrace_t * libtrace) {
1770        if (libtrace->state == STATE_NEW)
1771                return trace_supports_parallel(libtrace);
1772        return libtrace->pread == trace_pread_packet_wrapper;
1773}
1774
1775DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob,
1776                           libtrace_callback_set_t *per_packet_cbs,
1777                           libtrace_callback_set_t *reporter_cbs) {
1778        int i;
1779        int ret = -1;
1780        char name[24];
1781        sigset_t sig_before, sig_block_all;
1782        if (!libtrace) {
1783                fprintf(stderr, "NULL trace passed to trace_pstart()\n");
1784                return -1;
1785        }
1786
1787        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1788        if (trace_is_err(libtrace)) {
1789                goto cleanup_none;
1790        }
1791
1792        if (libtrace->state == STATE_PAUSED) {
1793                ret = trace_prestart(libtrace, global_blob, per_packet_cbs, 
1794                                reporter_cbs);
1795                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1796                return ret;
1797        }
1798
1799        if (libtrace->state != STATE_NEW) {
1800                trace_set_err(libtrace, TRACE_ERR_BAD_STATE, "trace_pstart "
1801                              "should be called on a NEW or PAUSED trace but "
1802                              "instead was called from %s",
1803                              get_trace_state_name(libtrace->state));
1804                goto cleanup_none;
1805        }
1806
1807        /* Store the user defined things against the trace */
1808        libtrace->global_blob = global_blob;
1809
1810        /* Save a copy of the callbacks in case the user tries to change them
1811         * on us later */
1812        if (!per_packet_cbs) {
1813                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "trace_pstart "
1814                                "requires a non-NULL set of per packet "
1815                                "callbacks.");
1816                goto cleanup_none;
1817        }
1818
1819        if (per_packet_cbs->message_packet == NULL) {
1820                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "The per "
1821                                "packet callbacks must include a handler "
1822                                "for a packet. Please set this using "
1823                                "trace_set_packet_cb().");
1824                goto cleanup_none;
1825        }
1826
1827        libtrace->perpkt_cbs = trace_create_callback_set();
1828        memcpy(libtrace->perpkt_cbs, per_packet_cbs, sizeof(libtrace_callback_set_t));
1829       
1830        if (reporter_cbs) {
1831                libtrace->reporter_cbs = trace_create_callback_set();
1832                memcpy(libtrace->reporter_cbs, reporter_cbs, sizeof(libtrace_callback_set_t));
1833        }
1834
1835       
1836
1837
1838        /* And zero other fields */
1839        for (i = 0; i < THREAD_STATE_MAX; ++i) {
1840                libtrace->perpkt_thread_states[i] = 0;
1841        }
1842        libtrace->first_packets.first = 0;
1843        libtrace->first_packets.count = 0;
1844        libtrace->first_packets.packets = NULL;
1845        libtrace->perpkt_threads = NULL;
1846        /* Set a global which says we are using a parallel trace. This is
1847         * for backwards compatibility due to changes when destroying packets */
1848        libtrace_parallel = 1;
1849
1850        /* Parses configuration passed through environment variables */
1851        parse_env_config(libtrace);
1852        verify_configuration(libtrace);
1853
1854        ret = -1;
1855        /* Try start the format - we prefer parallel over single threaded, as
1856         * these formats should support messages better */
1857
1858        if (trace_supports_parallel(libtrace) &&
1859            !trace_has_dedicated_hasher(libtrace)) {
1860                ret = libtrace->format->pstart_input(libtrace);
1861                libtrace->pread = trace_pread_packet_wrapper;
1862        }
1863        if (ret != 0) {
1864                if (libtrace->format->start_input) {
1865                        ret = libtrace->format->start_input(libtrace);
1866                }
1867                if (libtrace->perpkt_thread_count > 1) {
1868                        libtrace->pread = trace_pread_packet_first_in_first_served;
1869                        /* Don't wait for a burst of packets if the format is
1870                         * live as this could block ring based formats and
1871                         * introduces delay. */
1872                        if (libtrace->format->info.live) {
1873                                libtrace->config.burst_size = 1;
1874                        }
1875                }
1876                else {
1877                        /* Use standard read_packet */
1878                        libtrace->pread = NULL;
1879                }
1880        }
1881
1882        if (ret < 0) {
1883                goto cleanup_none;
1884        }
1885
1886        /* --- Start all the threads we need --- */
1887        /* Disable signals because it is inherited by the threads we start */
1888        sigemptyset(&sig_block_all);
1889        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_block_all, &sig_before), == 0);
1890
1891        /* If we need a hasher thread start it
1892         * Special Case: If single threaded we don't need a hasher
1893         */
1894        if (trace_has_dedicated_hasher(libtrace)) {
1895                libtrace->hasher_thread.type = THREAD_EMPTY;
1896                ret = trace_start_thread(libtrace, &libtrace->hasher_thread,
1897                                   THREAD_HASHER, hasher_entry, -1,
1898                                   "hasher-thread");
1899                if (ret != 0)
1900                        goto cleanup_started;
1901                libtrace->pread = trace_pread_packet_hasher_thread;
1902        } else {
1903                libtrace->hasher_thread.type = THREAD_EMPTY;
1904        }
1905
1906        /* Start up our perpkt threads */
1907        libtrace->perpkt_threads = calloc(sizeof(libtrace_thread_t),
1908                                          libtrace->perpkt_thread_count);
1909        if (!libtrace->perpkt_threads) {
1910                trace_set_err(libtrace, errno, "trace_pstart "
1911                              "failed to allocate memory.");
1912                goto cleanup_threads;
1913        }
1914        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1915                snprintf(name, sizeof(name), "perpkt-%d", i);
1916                libtrace_zero_thread(&libtrace->perpkt_threads[i]);
1917                ret = trace_start_thread(libtrace, &libtrace->perpkt_threads[i],
1918                                   THREAD_PERPKT, perpkt_threads_entry, i,
1919                                   name);
1920                if (ret != 0)
1921                        goto cleanup_threads;
1922        }
1923
1924        /* Start the reporter thread */
1925        if (reporter_cbs) {
1926                if (libtrace->combiner.initialise)
1927                        libtrace->combiner.initialise(libtrace, &libtrace->combiner);
1928                ret = trace_start_thread(libtrace, &libtrace->reporter_thread,
1929                                   THREAD_REPORTER, reporter_entry, -1,
1930                                   "reporter_thread");
1931                if (ret != 0)
1932                        goto cleanup_threads;
1933        }
1934
1935        /* Start the keepalive thread */
1936        if (libtrace->config.tick_interval > 0) {
1937                ret = trace_start_thread(libtrace, &libtrace->keepalive_thread,
1938                                   THREAD_KEEPALIVE, keepalive_entry, -1,
1939                                   "keepalive_thread");
1940                if (ret != 0)
1941                        goto cleanup_threads;
1942        }
1943
1944        /* Init other data structures */
1945        libtrace->perpkt_thread_states[THREAD_RUNNING] = libtrace->perpkt_thread_count;
1946        ASSERT_RET(pthread_spin_init(&libtrace->first_packets.lock, 0), == 0);
1947        libtrace->first_packets.packets = calloc(libtrace->perpkt_thread_count,
1948                                                 sizeof(*libtrace->first_packets.packets));
1949        if (libtrace->first_packets.packets == NULL) {
1950                trace_set_err(libtrace, errno, "trace_pstart "
1951                              "failed to allocate memory.");
1952                goto cleanup_threads;
1953        }
1954
1955        if (libtrace_ocache_init(&libtrace->packet_freelist,
1956                             (void* (*)()) trace_create_packet,
1957                             (void (*)(void *))trace_destroy_packet,
1958                             libtrace->config.thread_cache_size,
1959                             libtrace->config.cache_size * 4,
1960                             libtrace->config.fixed_count) != 0) {
1961                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "trace_pstart "
1962                              "failed to allocate ocache.");
1963                goto cleanup_threads;
1964        }
1965
1966        /* Threads don't start */
1967        libtrace->started = true;
1968        libtrace->startcount ++;
1969        libtrace_change_state(libtrace, STATE_RUNNING, false);
1970
1971        ret = 0;
1972        goto success;
1973cleanup_threads:
1974        if (libtrace->first_packets.packets) {
1975                free(libtrace->first_packets.packets);
1976                libtrace->first_packets.packets = NULL;
1977        }
1978        libtrace_change_state(libtrace, STATE_ERROR, false);
1979        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1980        if (libtrace->hasher_thread.type == THREAD_HASHER) {
1981                pthread_join(libtrace->hasher_thread.tid, NULL);
1982                libtrace_zero_thread(&libtrace->hasher_thread);
1983        }
1984
1985        if (libtrace->perpkt_threads) {
1986                for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1987                        if (libtrace->perpkt_threads[i].type == THREAD_PERPKT) {
1988                                pthread_join(libtrace->perpkt_threads[i].tid, NULL);
1989                                libtrace_zero_thread(&libtrace->perpkt_threads[i]);
1990                        } else break;
1991                }
1992                free(libtrace->perpkt_threads);
1993                libtrace->perpkt_threads = NULL;
1994        }
1995
1996        if (libtrace->reporter_thread.type == THREAD_REPORTER) {
1997                pthread_join(libtrace->reporter_thread.tid, NULL);
1998                libtrace_zero_thread(&libtrace->reporter_thread);
1999        }
2000
2001        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
2002                pthread_join(libtrace->keepalive_thread.tid, NULL);
2003                libtrace_zero_thread(&libtrace->keepalive_thread);
2004        }
2005        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
2006        libtrace_change_state(libtrace, STATE_NEW, false);
2007        if (libtrace->perpkt_thread_states[THREAD_RUNNING] != 0) {
2008                trace_set_err(libtrace, TRACE_ERR_THREAD, "Expected 0 running threads in trace_pstart()");
2009                return -1;
2010        }
2011        libtrace->perpkt_thread_states[THREAD_FINISHED] = 0;
2012cleanup_started:
2013        if (libtrace->pread == trace_pread_packet_wrapper) {
2014                if (libtrace->format->ppause_input)
2015                        libtrace->format->ppause_input(libtrace);
2016        } else {
2017                if (libtrace->format->pause_input)
2018                        libtrace->format->pause_input(libtrace);
2019        }
2020        ret = -1;
2021success:
2022        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_before, NULL), == 0);
2023cleanup_none:
2024        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2025        return ret;
2026}
2027
2028DLLEXPORT int trace_set_starting_cb(libtrace_callback_set_t *cbset,
2029                fn_cb_starting handler) {
2030        cbset->message_starting = handler;
2031        return 0;
2032}
2033
2034DLLEXPORT int trace_set_pausing_cb(libtrace_callback_set_t *cbset,
2035                fn_cb_dataless handler) {
2036        cbset->message_pausing = handler;
2037        return 0;
2038}
2039
2040DLLEXPORT int trace_set_resuming_cb(libtrace_callback_set_t *cbset,
2041                fn_cb_dataless handler) {
2042        cbset->message_resuming = handler;
2043        return 0;
2044}
2045
2046DLLEXPORT int trace_set_stopping_cb(libtrace_callback_set_t *cbset,
2047                fn_cb_dataless handler) {
2048        cbset->message_stopping = handler;
2049        return 0;
2050}
2051
2052DLLEXPORT int trace_set_packet_cb(libtrace_callback_set_t *cbset,
2053                fn_cb_packet handler) {
2054        cbset->message_packet = handler;
2055        return 0;
2056}
2057
2058DLLEXPORT int trace_set_meta_packet_cb(libtrace_callback_set_t *cbset,
2059                fn_cb_meta_packet handler) {
2060        cbset->message_meta_packet = handler;
2061        return 0;
2062}
2063
2064DLLEXPORT int trace_set_first_packet_cb(libtrace_callback_set_t *cbset,
2065                fn_cb_first_packet handler) {
2066        cbset->message_first_packet = handler;
2067        return 0;
2068}
2069
2070DLLEXPORT int trace_set_tick_count_cb(libtrace_callback_set_t *cbset,
2071                fn_cb_tick handler) {
2072        cbset->message_tick_count = handler;
2073        return 0;
2074}
2075
2076DLLEXPORT int trace_set_tick_interval_cb(libtrace_callback_set_t *cbset,
2077                fn_cb_tick handler) {
2078        cbset->message_tick_interval = handler;
2079        return 0;
2080}
2081
2082DLLEXPORT int trace_set_result_cb(libtrace_callback_set_t *cbset,
2083                fn_cb_result handler) {
2084        cbset->message_result = handler;
2085        return 0;
2086}
2087
2088DLLEXPORT int trace_set_user_message_cb(libtrace_callback_set_t *cbset,
2089                fn_cb_usermessage handler) {
2090        cbset->message_user = handler;
2091        return 0;
2092}
2093
2094/*
2095 * Pauses a trace, this should only be called by the main thread
2096 * 1. Set started = false
2097 * 2. All perpkt threads are paused waiting on a condition var
2098 * 3. Then call ppause on the underlying format if found
2099 * 4. The traces state is paused
2100 *
2101 * Once done you should be able to modify the trace setup and call pstart again
2102 * TODO add support to change the number of threads.
2103 */
2104DLLEXPORT int trace_ppause(libtrace_t *libtrace)
2105{
2106        libtrace_thread_t *t;
2107        int i;
2108        if (!libtrace) {
2109                fprintf(stderr, "NULL trace passed into trace_ppause()\n");
2110                return TRACE_ERR_NULL_TRACE;
2111        }
2112
2113        t = get_thread_table(libtrace);
2114        // Check state from within the lock if we are going to change it
2115        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
2116
2117        /* If we are already paused, just treat this as a NOOP */
2118        if (libtrace->state == STATE_PAUSED) {
2119                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2120                return 0;
2121        }
2122        if (!libtrace->started || libtrace->state != STATE_RUNNING) {
2123                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2124                trace_set_err(libtrace,TRACE_ERR_BAD_STATE, "You must call trace_start() before calling trace_ppause()");
2125                return -1;
2126        }
2127
2128        libtrace_change_state(libtrace, STATE_PAUSING, false);
2129        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2130
2131        // Special case handle the hasher thread case
2132        if (trace_has_dedicated_hasher(libtrace)) {
2133                if (libtrace->config.debug_state)
2134                        fprintf(stderr, "Hasher thread is running, asking it to pause ...");
2135                libtrace_message_t message = {0, {.uint64=0}, NULL};
2136                message.code = MESSAGE_DO_PAUSE;
2137                trace_message_thread(libtrace, &libtrace->hasher_thread, &message);
2138                // Wait for it to pause
2139                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
2140                while (libtrace->hasher_thread.state == THREAD_RUNNING) {
2141                        ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
2142                }
2143                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2144                if (libtrace->config.debug_state)
2145                        fprintf(stderr, " DONE\n");
2146        }
2147
2148        if (libtrace->config.debug_state)
2149                fprintf(stderr, "Asking perpkt threads to pause ...");
2150        // Stop threads, skip this one if it's a perpkt
2151        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2152                if (&libtrace->perpkt_threads[i] != t) {
2153                        libtrace_message_t message = {0, {.uint64=0}, NULL};
2154                        message.code = MESSAGE_DO_PAUSE;
2155                        ASSERT_RET(trace_message_thread(libtrace, &libtrace->perpkt_threads[i], &message), != -1);
2156                        if(trace_has_dedicated_hasher(libtrace)) {
2157                                // The hasher has stopped and other threads have messages waiting therefore
2158                                // If the queues are empty the other threads would have no data
2159                                // So send some message packets to simply ask the threads to check
2160                                // We are the only writer since hasher has paused
2161                                libtrace_packet_t *pkt;
2162                                libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &pkt, 1, 1);
2163                                pkt->error = READ_MESSAGE;
2164                                libtrace_ringbuffer_write(&libtrace->perpkt_threads[i].rbuffer, pkt);
2165                        }
2166                } else {
2167                        fprintf(stderr, "Mapper threads should not be used to pause a trace this could cause any number of problems!!\n");
2168                }
2169        }
2170
2171        if (t) {
2172                // A perpkt is doing the pausing, interesting, fake an extra thread paused
2173                // We rely on the user to *not* return before starting the trace again
2174                thread_change_state(libtrace, t, THREAD_PAUSED, true);
2175        }
2176
2177        // Wait for all threads to pause
2178        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
2179        while(libtrace->perpkt_thread_states[THREAD_RUNNING]) {
2180                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
2181        }
2182        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2183
2184        if (libtrace->config.debug_state)
2185                fprintf(stderr, " DONE\n");
2186
2187        // Deal with the reporter
2188        if (trace_has_reporter(libtrace)) {
2189                if (libtrace->config.debug_state)
2190                        fprintf(stderr, "Reporter thread is running, asking it to pause ...");
2191                if (pthread_equal(pthread_self(), libtrace->reporter_thread.tid)) {
2192                        libtrace->combiner.pause(libtrace, &libtrace->combiner);
2193                        thread_change_state(libtrace, &libtrace->reporter_thread, THREAD_PAUSED, true);
2194               
2195                } else {
2196                        libtrace_message_t message = {0, {.uint64=0}, NULL};
2197                        message.code = MESSAGE_DO_PAUSE;
2198                        trace_message_thread(libtrace, &libtrace->reporter_thread, &message);
2199                        // Wait for it to pause
2200                        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
2201                        while (libtrace->reporter_thread.state == THREAD_RUNNING) {
2202                                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
2203                        }
2204                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2205                }
2206                if (libtrace->config.debug_state)
2207                        fprintf(stderr, " DONE\n");
2208        }
2209
2210        /* Cache values before we pause */
2211        if (libtrace->stats == NULL)
2212                libtrace->stats = trace_create_statistics();
2213        // Save the statistics against the trace
2214        trace_get_statistics(libtrace, NULL);
2215        if (trace_is_parallel(libtrace)) {
2216                libtrace->started = false;
2217                if (libtrace->format->ppause_input)
2218                        libtrace->format->ppause_input(libtrace);
2219                // TODO What happens if we don't have pause input??
2220        } else {
2221                int err;
2222                err = trace_pause(libtrace);
2223                // We should handle this a bit better
2224                if (err)
2225                        return err;
2226        }
2227
2228        // Only set as paused after the pause has been called on the trace
2229        libtrace_change_state(libtrace, STATE_PAUSED, true);
2230        return 0;
2231}
2232
2233/**
2234 * Stop trace finish prematurely as though it meet an EOF
2235 * This should only be called by the main thread
2236 * 1. Calls ppause
2237 * 2. Sends a message asking for threads to finish
2238 * 3. Releases threads which will pause
2239 */
2240DLLEXPORT int trace_pstop(libtrace_t *libtrace)
2241{
2242        int i, err;
2243        libtrace_message_t message = {0, {.uint64=0}, NULL};
2244        if (!libtrace) {
2245                fprintf(stderr, "NULL trace passed into trace_pstop()\n");
2246                return TRACE_ERR_NULL_TRACE;
2247        }
2248
2249        // Ensure all threads have paused and the underlying trace format has
2250        // been closed and all packets associated are cleaned up
2251        // Pause will do any state checks for us
2252        err = trace_ppause(libtrace);
2253        if (err)
2254                return err;
2255
2256        // Now send a message asking the threads to stop
2257        // This will be retrieved before trying to read another packet
2258        message.code = MESSAGE_DO_STOP;
2259        trace_message_perpkts(libtrace, &message);
2260        if (trace_has_dedicated_hasher(libtrace))
2261                trace_message_thread(libtrace, &libtrace->hasher_thread, &message);
2262
2263        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2264                trace_message_thread(libtrace, &libtrace->perpkt_threads[i], &message);
2265        }
2266
2267        /* Now release the threads and let them stop - when the threads finish
2268         * the state will be set to finished */
2269        libtrace_change_state(libtrace, STATE_FINISHING, true);
2270        return 0;
2271}
2272
2273DLLEXPORT int trace_set_hasher(libtrace_t *trace, enum hasher_types type, fn_hasher hasher, void *data) {
2274        int ret = -1;
2275        if ((type == HASHER_CUSTOM && !hasher) || (type == HASHER_BALANCE && hasher)) {
2276                return -1;
2277        }
2278
2279        // Save the requirements
2280        trace->hasher_type = type;
2281        if (hasher) {
2282                if (trace->hasher_owner == HASH_OWNED_LIBTRACE) {
2283                        if (trace->hasher_data) {
2284                                free(trace->hasher_data);
2285                        }
2286                }
2287                trace->hasher = hasher;
2288                trace->hasher_data = data;
2289                trace->hasher_owner = HASH_OWNED_EXTERNAL;
2290        } else {
2291                trace->hasher = NULL;
2292                trace->hasher_data = NULL;
2293                trace->hasher_owner = HASH_OWNED_LIBTRACE;
2294        }
2295
2296        // Try push this to hardware - NOTE hardware could do custom if
2297        // there is a more efficient way to apply it, in this case
2298        // it will simply grab the function out of libtrace_t
2299        if (trace_supports_parallel(trace) && trace->format->config_input)
2300                ret = trace->format->config_input(trace, TRACE_OPTION_HASHER, &type);
2301
2302        if (ret == -1) {
2303                libtrace_err_t err UNUSED;
2304
2305                /* We have to deal with this ourself */
2306                /* If we succeed, clear any error state otherwise our caller
2307                 * might assume an error occurred, even though we've resolved
2308                 * the issue ourselves.
2309                 */
2310                if (!hasher) {
2311                        switch (type)
2312                        {
2313                                case HASHER_CUSTOM:
2314                                case HASHER_BALANCE:
2315                                        err = trace_get_err(trace);
2316                                        return 0;
2317                                case HASHER_BIDIRECTIONAL:
2318                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
2319                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
2320                                        toeplitz_init_config(trace->hasher_data, 1);
2321                                        err = trace_get_err(trace);
2322                                        return 0;
2323                                case HASHER_UNIDIRECTIONAL:
2324                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
2325                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
2326                                        toeplitz_init_config(trace->hasher_data, 0);
2327                                        err = trace_get_err(trace);
2328                                        return 0;
2329                        }
2330                        return -1;
2331                }
2332        } else {
2333                /* If the hasher is hardware we zero out the hasher and hasher
2334                 * data fields - only if we need a hasher do we do this */
2335                trace->hasher = NULL;
2336                trace->hasher_data = NULL;
2337        }
2338
2339        return 0;
2340}
2341
2342// Waits for all threads to finish
2343DLLEXPORT void trace_join(libtrace_t *libtrace) {
2344        int i;
2345
2346        /* Firstly wait for the perpkt threads to finish, since these are
2347         * user controlled */
2348        for (i=0; i< libtrace->perpkt_thread_count; i++) {
2349                ASSERT_RET(pthread_join(libtrace->perpkt_threads[i].tid, NULL), == 0);
2350                // So we must do our best effort to empty the queue - so
2351                // the producer (or any other threads) don't block.
2352                libtrace_packet_t * packet;
2353                if (libtrace->perpkt_threads[i].state != THREAD_FINISHED) {
2354                        trace_set_err(libtrace, TRACE_ERR_THREAD_STATE,
2355                                "Expected processing thread state to be THREAD_FINISHED in trace_join()");
2356                        return;
2357                }
2358                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
2359                        if (packet) // This could be NULL iff the perpkt finishes early
2360                                trace_destroy_packet(packet);
2361        }
2362
2363        /* Now the hasher */
2364        if (trace_has_dedicated_hasher(libtrace)) {
2365                pthread_join(libtrace->hasher_thread.tid, NULL);
2366                if (libtrace->hasher_thread.state != THREAD_FINISHED) {
2367                        trace_set_err(libtrace, TRACE_ERR_THREAD_STATE,
2368                                "Expected hasher thread state to be THREAD_FINISHED in trace_join()");
2369                        return;
2370                }
2371        }
2372
2373        // Now that everything is finished nothing can be touching our
2374        // buffers so clean them up
2375        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2376                // Its possible 1 packet got added by the reporter (or 1 per any other thread) since we cleaned up
2377                // if they lost timeslice before-during a write
2378                libtrace_packet_t * packet;
2379                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
2380                        trace_destroy_packet(packet);
2381                if (trace_has_dedicated_hasher(libtrace)) {
2382                        if (!libtrace_ringbuffer_is_empty(&libtrace->perpkt_threads[i].rbuffer)) {
2383                                trace_set_err(libtrace, TRACE_ERR_THREAD,
2384                                        "Expected processing threads ring buffers to be empty in trace_join()");
2385                                return;
2386                        }
2387                        libtrace_ringbuffer_destroy(&libtrace->perpkt_threads[i].rbuffer);
2388                }
2389                // Cannot destroy vector yet, this happens with trace_destroy
2390        }
2391
2392        if (trace_has_reporter(libtrace)) {
2393                pthread_join(libtrace->reporter_thread.tid, NULL);
2394                if (libtrace->reporter_thread.state != THREAD_FINISHED) {
2395                        trace_set_err(libtrace, TRACE_ERR_THREAD_STATE,
2396                                "Expected reporting thread state to be THREAD_FINISHED in trace_join()");
2397                        return;
2398                }
2399        }
2400
2401        // Wait for the tick (keepalive) thread if it has been started
2402        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
2403                libtrace_message_t msg = {0, {.uint64=0}, NULL};
2404                msg.code = MESSAGE_DO_STOP;
2405                trace_message_thread(libtrace, &libtrace->keepalive_thread, &msg);
2406                pthread_join(libtrace->keepalive_thread.tid, NULL);
2407        }
2408
2409        libtrace_change_state(libtrace, STATE_JOINED, true);
2410        print_memory_stats();
2411}
2412
2413DLLEXPORT int libtrace_thread_get_message_count(libtrace_t * libtrace,
2414                                                libtrace_thread_t *t)
2415{
2416        int ret;
2417        if (t == NULL)
2418                t = get_thread_descriptor(libtrace);
2419        if (t == NULL)
2420                return -1;
2421        ret = libtrace_message_queue_count(&t->messages);
2422        return ret < 0 ? 0 : ret;
2423}
2424
2425DLLEXPORT int libtrace_thread_get_message(libtrace_t * libtrace,
2426                                          libtrace_thread_t *t,
2427                                          libtrace_message_t * message)
2428{
2429        int ret;
2430        if (t == NULL)
2431                t = get_thread_descriptor(libtrace);
2432        if (t == NULL)
2433                return -1;
2434        ret = libtrace_message_queue_get(&t->messages, message);
2435        return ret < 0 ? 0 : ret;
2436}
2437
2438DLLEXPORT int libtrace_thread_try_get_message(libtrace_t * libtrace,
2439                                              libtrace_thread_t *t,
2440                                              libtrace_message_t * message)
2441{
2442        if (t == NULL)
2443                t = get_thread_descriptor(libtrace);
2444        if (t == NULL)
2445                return -1;
2446        if (libtrace_message_queue_try_get(&t->messages, message) != LIBTRACE_MQ_FAILED)
2447                return 0;
2448        else
2449                return -1;
2450}
2451
2452DLLEXPORT int trace_message_thread(libtrace_t * libtrace, libtrace_thread_t *t, libtrace_message_t * message)
2453{
2454        int ret;
2455        if (!message->sender)
2456                message->sender = get_thread_descriptor(libtrace);
2457
2458        ret = libtrace_message_queue_put(&t->messages, message);
2459        return ret < 0 ? 0 : ret;
2460}
2461
2462DLLEXPORT int trace_message_reporter(libtrace_t * libtrace, libtrace_message_t * message)
2463{
2464        if (!trace_has_reporter(libtrace) ||
2465            !(libtrace->reporter_thread.state == THREAD_RUNNING
2466              || libtrace->reporter_thread.state == THREAD_PAUSED))
2467                return -1;
2468
2469        return trace_message_thread(libtrace, &libtrace->reporter_thread, message);
2470}
2471
2472DLLEXPORT int trace_post_reporter(libtrace_t *libtrace)
2473{
2474        libtrace_message_t message;
2475        memset(&message, 0, sizeof(libtrace_message_t));
2476        message.code = MESSAGE_POST_REPORTER;
2477        message.data.uint64 = 0;
2478        message.sender = NULL;
2479        return trace_message_reporter(libtrace, (void *) &message);
2480}
2481
2482DLLEXPORT int trace_message_perpkts(libtrace_t * libtrace, libtrace_message_t * message)
2483{
2484        int i;
2485        int missed = 0;
2486        if (message->sender == NULL)
2487                message->sender = get_thread_descriptor(libtrace);
2488        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2489                if (libtrace->perpkt_threads[i].state == THREAD_RUNNING ||
2490                    libtrace->perpkt_threads[i].state == THREAD_PAUSED) {
2491                        libtrace_message_queue_put(&libtrace->perpkt_threads[i].messages, message);
2492                } else {
2493                        missed += 1;
2494                }
2495        }
2496        return -missed;
2497}
2498
2499/**
2500 * Publishes a result to the reduce queue
2501 * Should only be called by a perpkt thread, i.e. from a perpkt handler
2502 */
2503DLLEXPORT void trace_publish_result(libtrace_t *libtrace, libtrace_thread_t *t, uint64_t key, libtrace_generic_t value, int type) {
2504        libtrace_result_t res;
2505        res.type = type;
2506        res.key = key;
2507        res.value = value;
2508        if (!libtrace->combiner.publish) {
2509                fprintf(stderr, "Combiner has no publish method -- can not publish results!\n");
2510                return;
2511        }
2512        libtrace->combiner.publish(libtrace, t->perpkt_num, &libtrace->combiner, &res);
2513        return;
2514}
2515
2516DLLEXPORT void trace_set_combiner(libtrace_t *trace, const libtrace_combine_t *combiner, libtrace_generic_t config){
2517        if (combiner) {
2518                trace->combiner = *combiner;
2519                trace->combiner.configuration = config;
2520        } else {
2521                // No combiner, so don't try use it
2522                memset(&trace->combiner, 0, sizeof(trace->combiner));
2523        }
2524}
2525
2526DLLEXPORT uint64_t trace_packet_get_order(libtrace_packet_t * packet) {
2527        return packet->order;
2528}
2529
2530DLLEXPORT uint64_t trace_packet_get_hash(libtrace_packet_t * packet) {
2531        return packet->hash;
2532}
2533
2534DLLEXPORT void trace_packet_set_order(libtrace_packet_t * packet, uint64_t order) {
2535        packet->order = order;
2536}
2537
2538DLLEXPORT void trace_packet_set_hash(libtrace_packet_t * packet, uint64_t hash) {
2539        packet->hash = hash;
2540}
2541
2542DLLEXPORT bool trace_has_finished(libtrace_t * libtrace) {
2543        return libtrace->state == STATE_FINISHED || libtrace->state == STATE_JOINED;
2544}
2545
2546/**
2547 * @return True if the trace is not running such that it can be configured
2548 */
2549static inline bool trace_is_configurable(libtrace_t *trace) {
2550        return trace->state == STATE_NEW ||
2551                        trace->state == STATE_PAUSED;
2552}
2553
2554DLLEXPORT int trace_set_perpkt_threads(libtrace_t *trace, int nb) {
2555        // Only supported on new traces not paused traces
2556        if (trace->state != STATE_NEW) return -1;
2557
2558        /* TODO consider allowing an offset from the total number of cores i.e.
2559         * -1 reserve 1 core */
2560        if (nb >= 0) {
2561                trace->config.perpkt_threads = nb;
2562                return 0;
2563        } else {
2564                return -1;
2565        }
2566}
2567
2568DLLEXPORT int trace_set_tick_interval(libtrace_t *trace, size_t millisec) {
2569        if (!trace_is_configurable(trace)) return -1;
2570
2571        trace->config.tick_interval = millisec;
2572        return 0;
2573}
2574
2575DLLEXPORT int trace_set_tick_count(libtrace_t *trace, size_t count) {
2576        if (!trace_is_configurable(trace)) return -1;
2577
2578        trace->config.tick_count = count;
2579        return 0;
2580}
2581
2582DLLEXPORT int trace_set_tracetime(libtrace_t *trace, bool tracetime) {
2583        if (!trace_is_configurable(trace)) return -1;
2584
2585        trace->tracetime = tracetime;
2586        return 0;
2587}
2588
2589DLLEXPORT int trace_set_cache_size(libtrace_t *trace, size_t size) {
2590        if (!trace_is_configurable(trace)) return -1;
2591
2592        trace->config.cache_size = size;
2593        return 0;
2594}
2595
2596DLLEXPORT int trace_set_thread_cache_size(libtrace_t *trace, size_t size) {
2597        if (!trace_is_configurable(trace)) return -1;
2598
2599        trace->config.thread_cache_size = size;
2600        return 0;
2601}
2602
2603DLLEXPORT int trace_set_fixed_count(libtrace_t *trace, bool fixed) {
2604        if (!trace_is_configurable(trace)) return -1;
2605
2606        trace->config.fixed_count = fixed;
2607        return 0;
2608}
2609
2610DLLEXPORT int trace_set_burst_size(libtrace_t *trace, size_t size) {
2611        if (!trace_is_configurable(trace)) return -1;
2612
2613        trace->config.burst_size = size;
2614        return 0;
2615}
2616
2617DLLEXPORT int trace_set_hasher_queue_size(libtrace_t *trace, size_t size) {
2618        if (!trace_is_configurable(trace)) return -1;
2619
2620        trace->config.hasher_queue_size = size;
2621        return 0;
2622}
2623
2624DLLEXPORT int trace_set_hasher_polling(libtrace_t *trace, bool polling) {
2625        if (!trace_is_configurable(trace)) return -1;
2626
2627        trace->config.hasher_polling = polling;
2628        return 0;
2629}
2630
2631DLLEXPORT int trace_set_reporter_polling(libtrace_t *trace, bool polling) {
2632        if (!trace_is_configurable(trace)) return -1;
2633
2634        trace->config.reporter_polling = polling;
2635        return 0;
2636}
2637
2638DLLEXPORT int trace_set_reporter_thold(libtrace_t *trace, size_t thold) {
2639        if (!trace_is_configurable(trace)) return -1;
2640
2641        trace->config.reporter_thold = thold;
2642        return 0;
2643}
2644
2645DLLEXPORT int trace_set_debug_state(libtrace_t *trace, bool debug_state) {
2646        if (!trace_is_configurable(trace)) return -1;
2647
2648        trace->config.debug_state = debug_state;
2649        return 0;
2650}
2651
2652static bool config_bool_parse(char *value, size_t nvalue) {
2653        if (strncmp(value, "true", nvalue) == 0)
2654                return true;
2655        else if (strncmp(value, "false", nvalue) == 0)
2656                return false;
2657        else
2658                return strtoll(value, NULL, 10) != 0;
2659}
2660
2661/* Note update documentation on trace_set_configuration */
2662static void config_string(struct user_configuration *uc, char *key, size_t nkey, char *value, size_t nvalue) {
2663        if (!key) {
2664                fprintf(stderr, "NULL key passed to config_string()\n");
2665                return;
2666        }
2667        if (!value) {
2668                fprintf(stderr, "NULL value passed to config_string()\n");
2669                return;
2670        }
2671        if (!uc) {
2672                fprintf(stderr, "NULL uc (user_configuration) passed to config_string()\n");
2673                return;
2674        }
2675        if (strncmp(key, "cache_size", nkey) == 0
2676            || strncmp(key, "cs", nkey) == 0) {
2677                uc->cache_size = strtoll(value, NULL, 10);
2678        } else if (strncmp(key, "thread_cache_size", nkey) == 0
2679                   || strncmp(key, "tcs", nkey) == 0) {
2680                uc->thread_cache_size = strtoll(value, NULL, 10);
2681        } else if (strncmp(key, "fixed_count", nkey) == 0
2682                   || strncmp(key, "fc", nkey) == 0) {
2683                uc->fixed_count = config_bool_parse(value, nvalue);
2684        } else if (strncmp(key, "burst_size", nkey) == 0
2685                   || strncmp(key, "bs", nkey) == 0) {
2686                uc->burst_size = strtoll(value, NULL, 10);
2687        } else if (strncmp(key, "tick_interval", nkey) == 0
2688                   || strncmp(key, "ti", nkey) == 0) {
2689                uc->tick_interval = strtoll(value, NULL, 10);
2690        } else if (strncmp(key, "tick_count", nkey) == 0
2691                   || strncmp(key, "tc", nkey) == 0) {
2692                uc->tick_count = strtoll(value, NULL, 10);
2693        } else if (strncmp(key, "perpkt_threads", nkey) == 0
2694                   || strncmp(key, "pt", nkey) == 0) {
2695                uc->perpkt_threads = strtoll(value, NULL, 10);
2696        } else if (strncmp(key, "hasher_queue_size", nkey) == 0
2697                   || strncmp(key, "hqs", nkey) == 0) {
2698                uc->hasher_queue_size = strtoll(value, NULL, 10);
2699        } else if (strncmp(key, "hasher_polling", nkey) == 0
2700                   || strncmp(key, "hp", nkey) == 0) {
2701                uc->hasher_polling = config_bool_parse(value, nvalue);
2702        } else if (strncmp(key, "reporter_polling", nkey) == 0
2703                   || strncmp(key, "rp", nkey) == 0) {
2704                uc->reporter_polling = config_bool_parse(value, nvalue);
2705        } else if (strncmp(key, "reporter_thold", nkey) == 0
2706                   || strncmp(key, "rt", nkey) == 0) {
2707                uc->reporter_thold = strtoll(value, NULL, 10);
2708        } else if (strncmp(key, "debug_state", nkey) == 0
2709                   || strncmp(key, "ds", nkey) == 0) {
2710                uc->debug_state = config_bool_parse(value, nvalue);
2711        } else {
2712                fprintf(stderr, "No matching option %s(=%s), ignoring\n", key, value);
2713        }
2714}
2715
2716DLLEXPORT int trace_set_configuration(libtrace_t *trace, const char *str) {
2717        char *pch;
2718        char key[100];
2719        char value[100];
2720        char *dup;
2721        if (!trace) {
2722                fprintf(stderr, "NULL trace passed into trace_set_configuration()\n");
2723                return TRACE_ERR_NULL_TRACE;
2724        }
2725        if (!str) {
2726                trace_set_err(trace, TRACE_ERR_CONFIG, "NULL configuration string passed to trace_set_configuration()");
2727                return -1;
2728        }
2729
2730        if (!trace_is_configurable(trace)) return -1;
2731
2732        dup = strdup(str);
2733        pch = strtok (dup," ,.-");
2734        while (pch != NULL)
2735        {
2736                if (sscanf(pch, "%99[^=]=%99s", key, value) == 2) {
2737                        config_string(&trace->config, key, sizeof(key), value, sizeof(value));
2738                } else {
2739                        fprintf(stderr, "Error: parsing option %s\n", pch);
2740                }
2741                pch = strtok (NULL," ,.-");
2742        }
2743        free(dup);
2744
2745        return 0;
2746}
2747
2748DLLEXPORT int trace_set_configuration_file(libtrace_t *trace, FILE *file) {
2749        char line[1024];
2750        if (!trace_is_configurable(trace)) return -1;
2751
2752        while (fgets(line, sizeof(line), file) != NULL)
2753        {
2754                trace_set_configuration(trace, line);
2755        }
2756
2757        if(ferror(file))
2758                return -1;
2759        else
2760                return 0;
2761}
2762
2763DLLEXPORT void trace_free_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
2764        if (!packet) {
2765                trace_set_err(libtrace, TRACE_ERR_NULL_PACKET,
2766                        "NULL packet passed to trace_free_packet()");
2767                return;
2768        }
2769        /* Always release any resources this might be holding */
2770        trace_fin_packet(packet);
2771        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packet, 1, 1);
2772}
2773
2774DLLEXPORT void trace_increment_packet_refcount(libtrace_packet_t *packet) {
2775        pthread_mutex_lock(&(packet->ref_lock));
2776        if (packet->refcount < 0) {
2777                packet->refcount = 1;
2778        } else {
2779                packet->refcount ++;
2780        }
2781        pthread_mutex_unlock(&(packet->ref_lock));
2782}
2783
2784DLLEXPORT void trace_decrement_packet_refcount(libtrace_packet_t *packet) {
2785        pthread_mutex_lock(&(packet->ref_lock));
2786        packet->refcount --;
2787
2788        if (packet->refcount <= 0) {
2789                trace_free_packet(packet->trace, packet);
2790        }
2791        pthread_mutex_unlock(&(packet->ref_lock));
2792}
2793
2794
2795DLLEXPORT libtrace_info_t *trace_get_information(libtrace_t * libtrace) {
2796        if (libtrace->format)
2797                return &libtrace->format->info;
2798        else
2799                pthread_exit(NULL);
2800}
Note: See TracBrowser for help on using the repository browser.