source: lib/trace_parallel.c @ 4338f97

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 4338f97 was 4338f97, checked in by Richard Sanger <rsangerarj@…>, 6 years ago

Check the environment variables for configuration of formats

We search for 3 environment variables and apply them to the config in the
following order. Such that the first has the lowest priority.

  1. LIBTRACE_CONF, The global environment configuration
  2. LIBTRACE_CONF_<FORMAT>, Applied to a given format
  3. LIBTRACE_CONF_<FORMAT_URI>, Applied the specified trace
  • Property mode set to 100644
File size: 82.1 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * All rights reserved.
8 *
9 * This code has been developed by the University of Waikato WAND
10 * research group. For further information please see http://www.wand.net.nz/
11 *
12 * libtrace is free software; you can redistribute it and/or modify
13 * it under the terms of the GNU General Public License as published by
14 * the Free Software Foundation; either version 2 of the License, or
15 * (at your option) any later version.
16 *
17 * libtrace is distributed in the hope that it will be useful,
18 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 * GNU General Public License for more details.
21 *
22 * You should have received a copy of the GNU General Public License
23 * along with libtrace; if not, write to the Free Software
24 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25 *
26 * $Id$
27 *
28 */
29
30
31#define _GNU_SOURCE
32#include "common.h"
33#include "config.h"
34#include <assert.h>
35#include <errno.h>
36#include <fcntl.h>
37#include <stdio.h>
38#include <stdlib.h>
39#include <string.h>
40#include <sys/stat.h>
41#include <sys/types.h>
42#ifndef WIN32
43#include <sys/socket.h>
44#endif
45#include <stdarg.h>
46#include <sys/param.h>
47
48#ifdef HAVE_LIMITS_H
49#  include <limits.h>
50#endif
51
52#ifdef HAVE_SYS_LIMITS_H
53#  include <sys/limits.h>
54#endif
55
56#ifdef HAVE_NET_IF_ARP_H
57#  include <net/if_arp.h>
58#endif
59
60#ifdef HAVE_NET_IF_H
61#  include <net/if.h>
62#endif
63
64#ifdef HAVE_NETINET_IN_H
65#  include <netinet/in.h>
66#endif
67
68#ifdef HAVE_NET_ETHERNET_H
69#  include <net/ethernet.h>
70#endif
71
72#ifdef HAVE_NETINET_IF_ETHER_H
73#  include <netinet/if_ether.h>
74#endif
75
76#include <time.h>
77#ifdef WIN32
78#include <sys/timeb.h>
79#endif
80
81#include "libtrace.h"
82#include "libtrace_int.h"
83
84#ifdef HAVE_PCAP_BPF_H
85#  include <pcap-bpf.h>
86#else
87#  ifdef HAVE_NET_BPF_H
88#    include <net/bpf.h>
89#  endif
90#endif
91
92
93#include "libtrace_int.h"
94#include "format_helper.h"
95#include "rt_protocol.h"
96#include "hash_toeplitz.h"
97#include "combiners.h"
98
99#include <pthread.h>
100#include <signal.h>
101#include <unistd.h>
102#include <ctype.h>
103
104static inline int delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t);
105extern int libtrace_parallel;
106
107struct multithreading_stats {
108        uint64_t full_queue_hits;
109        uint64_t wait_for_fill_complete_hits;
110} contention_stats[1024];
111
112struct mem_stats {
113        struct memfail {
114           uint64_t cache_hit;
115           uint64_t ring_hit;
116           uint64_t miss;
117           uint64_t recycled;
118        } readbulk, read, write, writebulk;
119};
120
121// Grrr gcc wants this spelt out
122__thread struct mem_stats mem_hits = {{0},{0},{0},{0}};
123
124static void print_memory_stats() {
125#if 0
126        char t_name[50];
127        uint64_t total;
128        pthread_getname_np(pthread_self(), t_name, sizeof(t_name));
129
130        fprintf(stderr, "Thread ID#%d - %s\n", (int) pthread_self(), t_name);
131
132        total = mem_hits.read.cache_hit + mem_hits.read.ring_hit + mem_hits.read.miss;
133        if (total) {
134                fprintf(stderr, "\tRead:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
135                                mem_hits.read.cache_hit, mem_hits.read.ring_hit, mem_hits.read.miss, mem_hits.read.recycled);
136                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
137                                total, (double) mem_hits.read.miss / (double) total * 100.0);
138        }
139
140        total = mem_hits.readbulk.cache_hit + mem_hits.readbulk.ring_hit + mem_hits.readbulk.miss;
141        if (total) {
142                fprintf(stderr, "\tReadbulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
143                                mem_hits.readbulk.cache_hit, mem_hits.readbulk.ring_hit, mem_hits.readbulk.miss, mem_hits.readbulk.recycled);
144
145
146                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
147                                total, (double) mem_hits.readbulk.miss / (double) total * 100.0);
148        }
149
150        total = mem_hits.write.cache_hit + mem_hits.write.ring_hit + mem_hits.write.miss;
151        if (total) {
152                fprintf(stderr, "\tWrite:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
153                                mem_hits.write.cache_hit, mem_hits.write.ring_hit, mem_hits.write.miss, mem_hits.write.recycled);
154
155                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
156                                total, (double) mem_hits.write.miss / (double) total * 100.0);
157        }
158
159        total = mem_hits.writebulk.cache_hit + mem_hits.writebulk.ring_hit + mem_hits.writebulk.miss;
160        if (total) {
161                fprintf(stderr, "\tWritebulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
162                                mem_hits.writebulk.cache_hit, mem_hits.writebulk.ring_hit, mem_hits.writebulk.miss, mem_hits.writebulk.recycled);
163
164                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
165                                total, (double) mem_hits.writebulk.miss / (double) total * 100.0);
166        }
167#endif
168}
169
170/**
171 * This can be used once the hasher thread has been started and internally after
172 * verfiy_configuration.
173 *
174 * @return true if the trace has dedicated hasher thread otherwise false.
175 */
176inline bool trace_has_dedicated_hasher(libtrace_t * libtrace)
177{
178        return libtrace->hasher_thread.type == THREAD_HASHER;
179}
180
181/**
182 * True if the trace has dedicated hasher thread otherwise false,
183 * to be used after the trace is running
184 */
185static inline int trace_has_dedicated_reporter(libtrace_t * libtrace)
186{
187        assert(libtrace->state != STATE_NEW);
188        return libtrace->reporter_thread.type == THREAD_REPORTER && libtrace->reporter;
189}
190
191/**
192 * When running the number of perpkt threads in use.
193 * TODO what if the trace is not running yet, or has finished??
194 *
195 * @brief libtrace_perpkt_thread_nb
196 * @param t The trace
197 * @return
198 */
199DLLEXPORT int libtrace_get_perpkt_count(libtrace_t * t) {
200        return t->perpkt_thread_count;
201}
202
203/**
204 * Changes a thread's state and broadcasts the condition variable. This
205 * should always be done when the lock is held.
206 *
207 * Additionally for perpkt threads the state counts are updated.
208 *
209 * @param trace A pointer to the trace
210 * @param t A pointer to the thread to modify
211 * @param new_state The new state of the thread
212 * @param need_lock Set to true if libtrace_lock is not held, otherwise
213 *        false in the case the lock is currently held by this thread.
214 */
215static inline void thread_change_state(libtrace_t *trace, libtrace_thread_t *t,
216        const enum thread_states new_state, const bool need_lock)
217{
218        enum thread_states prev_state;
219        if (need_lock)
220                pthread_mutex_lock(&trace->libtrace_lock);
221        prev_state = t->state;
222        t->state = new_state;
223        if (t->type == THREAD_PERPKT) {
224                --trace->perpkt_thread_states[prev_state];
225                ++trace->perpkt_thread_states[new_state];
226        }
227
228        if (trace->config.debug_state)
229                fprintf(stderr, "Thread %d state changed from %d to %d\n",
230                        (int) t->tid, prev_state, t->state);
231
232        pthread_cond_broadcast(&trace->perpkt_cond);
233        if (need_lock)
234                pthread_mutex_unlock(&trace->libtrace_lock);
235}
236
237/**
238 * Changes the overall traces state and signals the condition.
239 *
240 * @param trace A pointer to the trace
241 * @param new_state The new state of the trace
242 * @param need_lock Set to true if libtrace_lock is not held, otherwise
243 *        false in the case the lock is currently held by this thread.
244 */
245static inline void libtrace_change_state(libtrace_t *trace,
246        const enum trace_state new_state, const bool need_lock)
247{
248        UNUSED enum trace_state prev_state;
249        if (need_lock)
250                pthread_mutex_lock(&trace->libtrace_lock);
251        prev_state = trace->state;
252        trace->state = new_state;
253
254        if (trace->config.debug_state)
255                fprintf(stderr, "Trace(%s) state changed from %s to %s\n",
256                        trace->uridata, get_trace_state_name(prev_state),
257                        get_trace_state_name(trace->state));
258
259        pthread_cond_broadcast(&trace->perpkt_cond);
260        if (need_lock)
261                pthread_mutex_unlock(&trace->libtrace_lock);
262}
263
264/**
265 * This is valid once a trace is initialised
266 *
267 * @return True if the format supports parallel threads.
268 */
269static inline bool trace_supports_parallel(libtrace_t *trace)
270{
271        assert(trace);
272        assert(trace->format);
273        if (trace->format->pstart_input)
274                return true;
275        else
276                return false;
277}
278
279DLLEXPORT void print_contention_stats(libtrace_t *libtrace) {
280        int i;
281        struct multithreading_stats totals = {0};
282        for (i = 0; i < libtrace->perpkt_thread_count ; i++) {
283                fprintf(stderr, "\nStats for perpkt thread#%d\n", i);
284                fprintf(stderr, "\tfull_queue_hits: %"PRIu64"\n", contention_stats[i].full_queue_hits);
285                totals.full_queue_hits += contention_stats[i].full_queue_hits;
286                fprintf(stderr, "\twait_for_fill_complete_hits: %"PRIu64"\n", contention_stats[i].wait_for_fill_complete_hits);
287                totals.wait_for_fill_complete_hits += contention_stats[i].wait_for_fill_complete_hits;
288        }
289        fprintf(stderr, "\nTotals for perpkt threads\n");
290        fprintf(stderr, "\tfull_queue_hits: %"PRIu64"\n", totals.full_queue_hits);
291        fprintf(stderr, "\twait_for_fill_complete_hits: %"PRIu64"\n", totals.wait_for_fill_complete_hits);
292
293        return;
294}
295
296void libtrace_zero_thread(libtrace_thread_t * t) {
297        t->accepted_packets = 0;
298        t->filtered_packets = 0;
299        t->recorded_first = false;
300        t->tracetime_offset_usec = 0;
301        t->user_data = 0;
302        t->format_data = 0;
303        libtrace_zero_ringbuffer(&t->rbuffer);
304        t->trace = NULL;
305        t->ret = NULL;
306        t->type = THREAD_EMPTY;
307        t->perpkt_num = -1;
308}
309
310// Ints are aligned int is atomic so safe to read and write at same time
311// However write must be locked, read doesn't (We never try read before written to table)
312libtrace_thread_t * get_thread_table(libtrace_t *libtrace) {
313        int i = 0;
314        pthread_t tid = pthread_self();
315
316        for (;i<libtrace->perpkt_thread_count ;++i) {
317                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
318                        return &libtrace->perpkt_threads[i];
319        }
320        return NULL;
321}
322
323int get_thread_table_num(libtrace_t *libtrace) {
324        int i = 0;
325        pthread_t tid = pthread_self();
326        for (;i<libtrace->perpkt_thread_count; ++i) {
327                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
328                        return i;
329        }
330        return -1;
331}
332
333static libtrace_thread_t * get_thread_descriptor(libtrace_t *libtrace) {
334        libtrace_thread_t *ret;
335        if (!(ret = get_thread_table(libtrace))) {
336                pthread_t tid = pthread_self();
337                // Check if we are reporter or something else
338                if (pthread_equal(tid, libtrace->reporter_thread.tid))
339                        ret = &libtrace->reporter_thread;
340                else if (pthread_equal(tid, libtrace->hasher_thread.tid))
341                        ret = &libtrace->hasher_thread;
342                else
343                        ret = NULL;
344        }
345        return ret;
346}
347
348/** Makes a packet safe, a packet may become invaild after a
349 * pause (or stop/destroy) of a trace. This copies a packet
350 * in such a way that it will be able to survive a pause.
351 *
352 * However this will not allow the packet to be used after
353 * the format is destroyed. Or while the trace is still paused.
354 */
355DLLEXPORT void libtrace_make_packet_safe(libtrace_packet_t *pkt) {
356        // Duplicate the packet in standard malloc'd memory and free the
357        // original, This is a 1:1 exchange so is ocache count remains unchanged.
358        if (pkt->buf_control != TRACE_CTRL_PACKET) {
359                libtrace_packet_t *dup;
360                dup = trace_copy_packet(pkt);
361                /* Release the external buffer */
362                trace_fin_packet(pkt);
363                /* Copy the duplicated packet over the existing */
364                memcpy(pkt, dup, sizeof(libtrace_packet_t));
365        }
366}
367
368/**
369 * Makes a libtrace_result_t safe, used when pausing a trace.
370 * This will call libtrace_make_packet_safe if the result is
371 * a packet.
372 */
373DLLEXPORT void libtrace_make_result_safe(libtrace_result_t *res) {
374        if (res->type == RESULT_PACKET) {
375                libtrace_make_packet_safe(res->value.pkt);
376        }
377}
378
379/**
380 * Holds threads in a paused state, until released by broadcasting
381 * the condition mutex.
382 */
383static void trace_thread_pause(libtrace_t *trace, libtrace_thread_t *t) {
384        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
385        thread_change_state(trace, t, THREAD_PAUSED, false);
386        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
387                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
388        }
389        thread_change_state(trace, t, THREAD_RUNNING, false);
390        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
391}
392
393/**
394 * Sends a packet to the user, expects either a valid packet or a TICK packet.
395 *
396 * Note READ_MESSAGE will only be returned if tracetime is true.
397 *
398 * @brief dispatch_packet
399 * @param trace
400 * @param t
401 * @param packet A pointer to the packet storage, which may be set to null upon
402 *               return, or a packet to be finished.
403 * @return 0 is successful, otherwise if playing back in tracetime
404 *         READ_MESSAGE(-2) can be returned in which case the packet is not sent.
405 */
406static inline int dispatch_packet(libtrace_t *trace,
407                                  libtrace_thread_t *t,
408                                  libtrace_packet_t **packet,
409                                  bool tracetime) {
410        if ((*packet)->error > 0) {
411                if (tracetime) {
412                        if (delay_tracetime(trace, packet[0], t) == READ_MESSAGE)
413                                return READ_MESSAGE;
414                }
415                t->accepted_packets++;
416                *packet = (*trace->per_pkt)(trace, *packet, NULL, t);
417                trace_fin_packet(*packet);
418        } else {
419                libtrace_message_t message;
420                assert((*packet)->error == READ_TICK);
421                message.code = MESSAGE_TICK;
422                message.additional.uint64 = trace_packet_get_order(*packet);
423                message.sender = t;
424                (*trace->per_pkt)(trace, NULL, &message, t);
425        }
426        return 0;
427}
428
429/**
430 * Pauses a per packet thread, messages will not be processed when the thread
431 * is paused.
432 *
433 * This process involves reading packets if a hasher thread is used. As such
434 * this function can fail to pause due to errors when reading in which case
435 * the thread should be stopped instead.
436 *
437 *
438 * @brief trace_perpkt_thread_pause
439 * @return READ_ERROR(-1) or READ_EOF(0) or 1 if successfull
440 */
441static int trace_perpkt_thread_pause(libtrace_t *trace, libtrace_thread_t *t,
442                                     libtrace_packet_t *packets[],
443                                     int *nb_packets, int *empty, int *offset) {
444        libtrace_message_t message = {0};
445        libtrace_packet_t * packet = NULL;
446
447        /* Let the user thread know we are going to pause */
448        message.code = MESSAGE_PAUSING;
449        message.sender = t;
450        (*trace->per_pkt)(trace, NULL, &message, t);
451
452        /* Send through any remaining packets (or messages) without delay */
453
454        /* First send those packets already read, as fast as possible
455         * This should never fail or check for messages etc. */
456        for (;*offset < *nb_packets; ++*offset) {
457                ASSERT_RET(dispatch_packet(trace, t, &packets[*offset], false), == 0);
458                /* Move full slots to front as we go */
459                if (packets[*offset]) {
460                        if (*empty != *offset) {
461                                packets[*empty] = packets[*offset];
462                                packets[*offset] = NULL;
463                        }
464                        ++*empty;
465                }
466        }
467
468        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
469        /* If a hasher thread is running, empty input queues so we don't lose data */
470        if (trace_has_dedicated_hasher(trace)) {
471                fprintf(stderr, "Trace is using a hasher thread emptying queues\n");
472                // The hasher has stopped by this point, so the queue shouldn't be filling
473                while(!libtrace_ringbuffer_is_empty(&t->rbuffer) || t->format_data) {
474                        int ret = trace->pread(trace, t, &packet, 1);
475                        if (ret == 1) {
476                                if (packet->error > 0) {
477                                        store_first_packet(trace, packet, t);
478                                }
479                                ASSERT_RET(dispatch_packet(trace, t, &packet, 1), == 0);
480                                if (packet == NULL)
481                                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
482                        } else if (ret != READ_MESSAGE) {
483                                /* Ignore messages we pick these up next loop */
484                                assert (ret == READ_EOF || ret == READ_ERROR);
485                                /* Verify no packets are remaining */
486                                /* TODO refactor this sanity check out!! */
487                                while (!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
488                                        ASSERT_RET(trace->pread(trace, t, &packet, 1), <= 0);
489                                        // No packets after this should have any data in them
490                                        assert(packet->error <= 0);
491                                }
492                                fprintf(stderr, "PREAD_FAILED %d\n", ret);
493                                libtrace_ocache_free(&trace->packet_freelist, (void **) &packet, 1, 1);
494                                return -1;
495                        }
496                }
497        }
498        libtrace_ocache_free(&trace->packet_freelist, (void **) &packet, 1, 1);
499
500        /* Now we do the actual pause, this returns when we resumed */
501        trace_thread_pause(trace, t);
502        message.code = MESSAGE_RESUMING;
503        (*trace->per_pkt)(trace, NULL, &message, t);
504        return 1;
505}
506
507/**
508 * The is the entry point for our packet processing threads.
509 */
510static void* perpkt_threads_entry(void *data) {
511        libtrace_t *trace = (libtrace_t *)data;
512        libtrace_thread_t *t;
513        libtrace_message_t message = {0};
514        libtrace_packet_t *packets[trace->config.burst_size];
515        size_t i;
516        //int ret;
517        /* The current reading position into the packets */
518        int offset = 0;
519        /* The number of packets last read */
520        int nb_packets = 0;
521        /* The offset to the first NULL packet upto offset */
522        int empty = 0;
523
524        /* Wait until trace_pstart has been completed */
525        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
526        t = get_thread_table(trace);
527        assert(t);
528        if (trace->state == STATE_ERROR) {
529                thread_change_state(trace, t, THREAD_FINISHED, false);
530                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
531                pthread_exit(NULL);
532        }
533        //printf("Yay Started perpkt thread #%d\n", (int) get_thread_table_num(trace));
534        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
535
536        if (trace->format->pregister_thread) {
537                trace->format->pregister_thread(trace, t, !trace_has_dedicated_hasher(trace));
538        }
539
540        /* Fill our buffer with empty packets */
541        memset(&packets, 0, sizeof(void*) * trace->config.burst_size);
542        libtrace_ocache_alloc(&trace->packet_freelist, (void **) packets,
543                              trace->config.burst_size,
544                              trace->config.burst_size);
545
546        /* ~~~~~~~~~~~ Setup complete now we loop ~~~~~~~~~~~~~~~ */
547        // Send a message to say we've started
548
549        // Let the per_packet function know we have started
550        message.code = MESSAGE_STARTING;
551        message.sender = t;
552        (*trace->per_pkt)(trace, NULL, &message, t);
553        message.code = MESSAGE_RESUMING;
554        (*trace->per_pkt)(trace, NULL, &message, t);
555
556
557        for (;;) {
558
559                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
560                        int ret;
561                        switch (message.code) {
562                                case MESSAGE_DO_PAUSE: // This is internal
563                                        ret = trace_perpkt_thread_pause(trace, t, packets, &nb_packets, &empty, &offset);
564                                        if (ret == READ_EOF) {
565                                                fprintf(stderr, "PAUSE stop eof!!\n");
566                                                goto eof;
567                                        } else if (ret == READ_ERROR) {
568                                                fprintf(stderr, "PAUSE stop error!!\n");
569                                                goto error;
570                                        }
571                                        assert(ret == 1);
572                                        continue;
573                                case MESSAGE_DO_STOP: // This is internal
574                                        fprintf(stderr, "DO_STOP stop!!\n");
575                                        goto eof;
576                        }
577                        (*trace->per_pkt)(trace, NULL, &message, t);
578                        /* Continue and the empty messages out before packets */
579                        continue;
580                }
581
582
583                /* Do we need to read a new set of packets MOST LIKELY we do */
584                if (offset == nb_packets) {
585                        /* Refill the packet buffer */
586                        if (empty != nb_packets) {
587                                // Refill the empty packets
588                                libtrace_ocache_alloc(&trace->packet_freelist,
589                                                      (void **) &packets[empty],
590                                                      nb_packets - empty,
591                                                      nb_packets - empty);
592                        }
593                        if (!trace->pread) {
594                                assert(packets[0]);
595                                nb_packets = trace_read_packet(trace, packets[0]);
596                                packets[0]->error = nb_packets;
597                                if (nb_packets > 0)
598                                        nb_packets = 1;
599                        } else {
600                                nb_packets = trace->pread(trace, t, packets, trace->config.burst_size);
601                        }
602                        offset = 0;
603                        empty = 0;
604                }
605
606                /* Handle error/message cases */
607                if (nb_packets > 0) {
608                        /* Store the first packet */
609                        if (packets[0]->error > 0) {
610                                store_first_packet(trace, packets[0], t);
611                        }
612                        for (;offset < nb_packets; ++offset) {
613                                int ret;
614                                ret = dispatch_packet(trace, t, &packets[offset], trace->tracetime);
615                                if (ret == 0) {
616                                        /* Move full slots to front as we go */
617                                        if (packets[offset]) {
618                                                if (empty != offset) {
619                                                        packets[empty] = packets[offset];
620                                                        packets[offset] = NULL;
621                                                }
622                                                ++empty;
623                                        }
624                                } else {
625                                        assert(ret == READ_MESSAGE);
626                                        /* Loop around and process the message, note */
627                                        continue;
628                                }
629                        }
630                } else {
631                        switch (nb_packets) {
632                        case READ_EOF:
633                                fprintf(stderr, "EOF stop %d!!\n", nb_packets);
634                                goto eof;
635                        case READ_ERROR:
636                                fprintf(stderr, "ERROR stop %d!!\n", nb_packets);
637                                goto error;
638                        case READ_MESSAGE:
639                                nb_packets = 0;
640                                continue;
641                        default:
642                                fprintf(stderr, "Unexpected error %d!!\n", nb_packets);
643                                goto error;
644                        }
645                }
646
647        }
648
649error:
650        fprintf(stderr, "An error occured in trace\n");
651        message.code = MESSAGE_DO_STOP;
652        message.sender = t;
653        message.additional.uint64 = 0;
654        trace_send_message_to_perpkts(trace, &message);
655eof:
656        fprintf(stderr, "An eof occured in trace\n");
657        /* ~~~~~~~~~~~~~~ Trace is finished do tear down ~~~~~~~~~~~~~~~~~~~~~ */
658
659        // Let the per_packet function know we have stopped
660        message.code = MESSAGE_PAUSING;
661        message.sender = t;
662        (*trace->per_pkt)(trace, NULL, &message, t);
663        message.code = MESSAGE_STOPPING;
664        message.additional.uint64 = 0;
665        (*trace->per_pkt)(trace, NULL, &message, t);
666
667        // Free any remaining packets
668        for (i = 0; i < trace->config.burst_size; i++) {
669                if (packets[i]) {
670                        libtrace_ocache_free(&trace->packet_freelist, (void **) &packets[i], 1, 1);
671                        packets[i] = NULL;
672                }
673        }
674
675
676        thread_change_state(trace, t, THREAD_FINISHED, true);
677
678        // Notify only after we've defiantly set the state to finished
679        message.code = MESSAGE_PERPKT_ENDED;
680        message.additional.uint64 = 0;
681        trace_send_message_to_reporter(trace, &message);
682
683        // Release all ocache memory before unregistering with the format
684        // because this might(it does in DPDK) unlink the formats mempool
685        // causing destroy/finish packet to fail.
686        libtrace_ocache_unregister_thread(&trace->packet_freelist);
687        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
688        if (trace->format->punregister_thread) {
689                trace->format->punregister_thread(trace, t);
690        }
691        print_memory_stats();
692
693        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
694
695        pthread_exit(NULL);
696};
697
698/**
699 * The start point for our single threaded hasher thread, this will read
700 * and hash a packet from a data source and queue it against the correct
701 * core to process it.
702 */
703static void* hasher_entry(void *data) {
704        libtrace_t *trace = (libtrace_t *)data;
705        libtrace_thread_t * t;
706        int i;
707        libtrace_packet_t * packet;
708        libtrace_message_t message = {0};
709        int pkt_skipped = 0;
710
711        assert(trace_has_dedicated_hasher(trace));
712        /* Wait until all threads are started and objects are initialised (ring buffers) */
713        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
714        t = &trace->hasher_thread;
715        assert(t->type == THREAD_HASHER && pthread_equal(pthread_self(), t->tid));
716        if (trace->state == STATE_ERROR) {
717                thread_change_state(trace, t, THREAD_FINISHED, false);
718                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
719                pthread_exit(NULL);
720        }
721
722        printf("Hasher Thread started\n");
723        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
724
725        if (trace->format->pregister_thread) {
726                trace->format->pregister_thread(trace, t, true);
727        }
728
729        /* Read all packets in then hash and queue against the correct thread */
730        while (1) {
731                int thread;
732                if (!pkt_skipped)
733                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
734                assert(packet);
735
736                if (libtrace_halt) {
737                        packet->error = 0;
738                        break;
739                }
740
741                // Check for messages that we expect MESSAGE_DO_PAUSE, (internal messages only)
742                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
743                        switch(message.code) {
744                                case MESSAGE_DO_PAUSE:
745                                        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
746                                        thread_change_state(trace, t, THREAD_PAUSED, false);
747                                        pthread_cond_broadcast(&trace->perpkt_cond);
748                                        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
749                                                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
750                                        }
751                                        thread_change_state(trace, t, THREAD_RUNNING, false);
752                                        pthread_cond_broadcast(&trace->perpkt_cond);
753                                        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
754                                        break;
755                                case MESSAGE_DO_STOP:
756                                        assert(trace->started == false);
757                                        assert(trace->state == STATE_FINSHED);
758                                        /* Mark the current packet as EOF */
759                                        packet->error = 0;
760                                        break;
761                                default:
762                                        fprintf(stderr, "Hasher thread didn't expect message code=%d\n", message.code);
763                        }
764                        pkt_skipped = 1;
765                        continue;
766                }
767
768                if ((packet->error = trace_read_packet(trace, packet)) <1) {
769                        break; /* We are EOF or error'd either way we stop  */
770                }
771
772                /* We are guaranteed to have a hash function i.e. != NULL */
773                trace_packet_set_hash(packet, (*trace->hasher)(packet, trace->hasher_data));
774                thread = trace_packet_get_hash(packet) % trace->perpkt_thread_count;
775                /* Blocking write to the correct queue - I'm the only writer */
776                if (trace->perpkt_threads[thread].state != THREAD_FINISHED) {
777                        uint64_t order = trace_packet_get_order(packet);
778                        libtrace_ringbuffer_write(&trace->perpkt_threads[thread].rbuffer, packet);
779                        if (trace->config.tick_count && order % trace->config.tick_count == 0) {
780                                // Write ticks to everyone else
781                                libtrace_packet_t * pkts[trace->perpkt_thread_count];
782                                memset(pkts, 0, sizeof(void *) * trace->perpkt_thread_count);
783                                libtrace_ocache_alloc(&trace->packet_freelist, (void **) pkts, trace->perpkt_thread_count, trace->perpkt_thread_count);
784                                for (i = 0; i < trace->perpkt_thread_count; i++) {
785                                        pkts[i]->error = READ_TICK;
786                                        trace_packet_set_order(pkts[i], order);
787                                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, pkts[i]);
788                                }
789                        }
790                        pkt_skipped = 0;
791                } else {
792                        assert(!"Dropping a packet!!");
793                        pkt_skipped = 1; // Reuse that packet no one read it
794                }
795        }
796
797        /* Broadcast our last failed read to all threads */
798        for (i = 0; i < trace->perpkt_thread_count; i++) {
799                libtrace_packet_t * bcast;
800                fprintf(stderr, "Broadcasting error/EOF now the trace is over\n");
801                if (i == trace->perpkt_thread_count - 1) {
802                        bcast = packet;
803                } else {
804                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &bcast, 1, 1);
805                        bcast->error = packet->error;
806                }
807                ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
808                if (trace->perpkt_threads[i].state != THREAD_FINISHED) {
809                        // Unlock early otherwise we could deadlock
810                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, bcast);
811                        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
812                } else {
813                        fprintf(stderr, "SKIPPING THREAD !!!%d!!!/n", (int) i);
814                        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
815                }
816        }
817
818        // We don't need to free the packet
819        thread_change_state(trace, t, THREAD_FINISHED, true);
820
821        // Notify only after we've defiantly set the state to finished
822        message.code = MESSAGE_PERPKT_ENDED;
823        message.additional.uint64 = 0;
824        trace_send_message_to_reporter(trace, &message);
825        libtrace_ocache_unregister_thread(&trace->packet_freelist);
826        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
827        if (trace->format->punregister_thread) {
828                trace->format->punregister_thread(trace, t);
829        }
830        print_memory_stats();
831        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
832
833        // TODO remove from TTABLE t sometime
834        pthread_exit(NULL);
835};
836
837/**
838 * Moves src into dest(Complete copy) and copies the memory buffer and
839 * its flags from dest into src ready for reuse without needing extra mallocs.
840 */
841static inline void swap_packets(libtrace_packet_t *dest, libtrace_packet_t *src) {
842        // Save the passed in buffer status
843        assert(dest->trace == NULL); // Must be a empty packet
844        void * temp_buf = dest->buffer;
845        buf_control_t temp_buf_control = dest->buf_control;
846        // Completely copy StoredPacket into packet
847        memcpy(dest, src, sizeof(libtrace_packet_t));
848        // Set the buffer settings on the returned packet
849        src->buffer = temp_buf;
850        src->buf_control = temp_buf_control;
851        src->trace = NULL;
852}
853
854/* Our simplest case when a thread becomes ready it can obtain an exclusive
855 * lock to read packets from the underlying trace.
856 */
857static int trace_pread_packet_first_in_first_served(libtrace_t *libtrace,
858                                                    libtrace_thread_t *t,
859                                                    libtrace_packet_t *packets[],
860                                                    size_t nb_packets) {
861        size_t i = 0;
862        //bool tick_hit = false;
863
864        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
865        /* Read nb_packets */
866        for (i = 0; i < nb_packets; ++i) {
867                packets[i]->error = trace_read_packet(libtrace, packets[i]);
868
869                if (packets[i]->error <= 0) {
870                        /* We'll catch this next time if we have already got packets */
871                        if ( i==0 ) {
872                                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
873                                return packets[i]->error;
874                        } else {
875                                break;
876                        }
877                }
878                /*
879                if (libtrace->config.tick_count && trace_packet_get_order(packets[i]) % libtrace->config.tick_count == 0) {
880                        tick_hit = true;
881                }*/
882        }
883        // Doing this inside the lock ensures the first packet is always
884        // recorded first
885        if (packets[0]->error > 0) {
886                store_first_packet(libtrace, packets[0], t);
887        }
888        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
889        /* XXX TODO this needs to be inband with packets, or we don't bother in this case
890        if (tick_hit) {
891                libtrace_message_t tick;
892                tick.additional.uint64 = trace_packet_get_order(packets[i]);
893                tick.code = MESSAGE_TICK;
894                trace_send_message_to_perpkts(libtrace, &tick);
895        } */
896        return i;
897}
898
899/**
900 * For the case that we have a dedicated hasher thread
901 * 1. We read a packet from our buffer
902 * 2. Move that into the packet provided (packet)
903 */
904inline static int trace_pread_packet_hasher_thread(libtrace_t *libtrace,
905                                                   libtrace_thread_t *t,
906                                                   libtrace_packet_t *packets[],
907                                                   size_t nb_packets) {
908        size_t i;
909
910        /* We store the last error message here */
911        if (t->format_data) {
912                fprintf(stderr, "Hit me, ohh yeah got error %d\n",
913                        ((libtrace_packet_t *)t->format_data)->error);
914                return ((libtrace_packet_t *)t->format_data)->error;
915        }
916
917        // Always grab at least one
918        if (packets[0]) // Recycle the old get the new
919                libtrace_ocache_free(&libtrace->packet_freelist, (void **) packets, 1, 1);
920        packets[0] = libtrace_ringbuffer_read(&t->rbuffer);
921
922        if (packets[0]->error <= 0 && packets[0]->error != READ_TICK) {
923                fprintf(stderr, "Hit me, ohh yeah returning error %d\n", packets[0]->error);
924                return packets[0]->error;
925        }
926
927        for (i = 1; i < nb_packets; i++) {
928                if (packets[i]) // Recycle the old get the new
929                        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packets[i], 1, 1);
930                if (!libtrace_ringbuffer_try_read(&t->rbuffer, (void **) &packets[i])) {
931                        packets[i] = NULL;
932                        break;
933                }
934
935                /* We will return an error or EOF the next time around */
936                if (packets[i]->error <= 0 && packets[0]->error != READ_TICK) {
937                        /* The message case will be checked automatically -
938                           However other cases like EOF and error will only be
939                           sent once*/
940                        if (packets[i]->error != READ_MESSAGE) {
941                                assert(t->format_data == NULL);
942                                t->format_data = packets[i];
943                                fprintf(stderr, "Hit me, ohh yeah set error %d\n",
944                                        ((libtrace_packet_t *)t->format_data)->error);
945                        }
946                        break;
947                }
948        }
949
950        return i;
951}
952
953/**
954 * Tries to read from our queue and returns 1 if a packet was retrieved
955 */
956static inline int try_waiting_queue(libtrace_t *libtrace, libtrace_thread_t * t, libtrace_packet_t **packet, int * ret)
957{
958        libtrace_packet_t* retrived_packet;
959
960        /* Lets see if we have one waiting */
961        if (libtrace_ringbuffer_try_read(&t->rbuffer, (void **) &retrived_packet)) {
962                /* Copy paste from trace_pread_packet_hasher_thread() except that we try read (non-blocking) */
963                assert(retrived_packet);
964
965                if (*packet) // Recycle the old get the new
966                        libtrace_ocache_free(&libtrace->packet_freelist, (void **) packet, 1, 1);
967                *packet = retrived_packet;
968                *ret = (*packet)->error;
969                return 1;
970        }
971        return 0;
972}
973
974/**
975 * Allows us to ensure all threads are finished writing to our threads ring_buffer
976 * before returning EOF/error.
977 */
978inline static int trace_handle_finishing_perpkt(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t)
979{
980        /* We are waiting for the condition that another thread ends to check
981         * our queue for new data, once all threads end we can go to finished */
982        bool complete = false;
983        int ret;
984
985        do {
986                // Wait for a thread to end
987                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
988
989                // Check before
990                if (libtrace->perpkt_thread_states[THREAD_FINISHING] == libtrace->perpkt_thread_count) {
991                        complete = true;
992                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
993                        continue;
994                }
995
996                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
997
998                // Check after
999                if (libtrace->perpkt_thread_states[THREAD_FINISHING] == libtrace->perpkt_thread_count) {
1000                        complete = true;
1001                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1002                        continue;
1003                }
1004
1005                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1006
1007                // Always trying to keep our buffer empty for the unlikely case more threads than buffer space want to write into our queue
1008                if(try_waiting_queue(libtrace, t, packet, &ret))
1009                        return ret;
1010        } while (!complete);
1011
1012        // We can only end up here once all threads complete
1013        try_waiting_queue(libtrace, t, packet, &ret);
1014
1015        return ret;
1016        // TODO rethink this logic fix bug here
1017}
1018
1019/**
1020 * Expects the libtrace_lock to not be held
1021 */
1022inline static int trace_finish_perpkt(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t)
1023{
1024        thread_change_state(libtrace, t, THREAD_FINISHING, true);
1025        return trace_handle_finishing_perpkt(libtrace, packet, t);
1026}
1027
1028/**
1029 * This case is much like the dedicated hasher, except that we will become
1030 * hasher if we don't have a a packet waiting.
1031 *
1032 * Note: This is only every used if we have are doing hashing.
1033 *
1034 * TODO: Can block on zero copy formats such as ring: and dpdk: if the
1035 * queue sizes in total are larger than the ring size.
1036 *
1037 * 1. We read a packet from our buffer
1038 * 2. Move that into the packet provided (packet)
1039 */
1040inline static int trace_pread_packet_hash_locked(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packet)
1041{
1042        int thread, ret/*, psize*/;
1043
1044        while (1) {
1045                if(try_waiting_queue(libtrace, t, packet, &ret))
1046                        return ret;
1047                // Can still block here if another thread is writing to a full queue
1048                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1049
1050                // Its impossible for our own queue to overfill, because no one can write
1051                // when we are in the lock
1052                if(try_waiting_queue(libtrace, t, packet, &ret)) {
1053                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1054                        return ret;
1055                }
1056
1057                // Another thread cannot write a packet because a queue has filled up. Is it ours?
1058                if (libtrace->perpkt_queue_full) {
1059                        contention_stats[t->perpkt_num].wait_for_fill_complete_hits++;
1060                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1061                        continue;
1062                }
1063
1064                if (!*packet)
1065                        libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) packet, 1, 1);
1066                assert(*packet);
1067
1068                // If we fail here we can guarantee that our queue is empty (and no new data will be added because we hold the lock)
1069                if (libtrace_halt || ((*packet)->error = trace_read_packet(libtrace, *packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
1070                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1071                        if (libtrace_halt)
1072                                return 0;
1073                        else
1074                                return (*packet)->error;
1075                }
1076
1077                trace_packet_set_hash(*packet, (*libtrace->hasher)(*packet, libtrace->hasher_data));
1078                thread = trace_packet_get_hash(*packet) % libtrace->perpkt_thread_count;
1079                if (thread == t->perpkt_num) {
1080                        // If it's this thread we must be in order because we checked the buffer once we got the lock
1081                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1082                        return (*packet)->error;
1083                }
1084
1085                if (libtrace->perpkt_threads[thread].state != THREAD_FINISHED) {
1086                        while (!libtrace_ringbuffer_try_swrite_bl(&libtrace->perpkt_threads[thread].rbuffer, *packet)) {
1087                                libtrace->perpkt_queue_full = true;
1088                                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1089                                contention_stats[t->perpkt_num].full_queue_hits++;
1090                                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1091                        }
1092                        *packet = NULL;
1093                        libtrace->perpkt_queue_full = false;
1094                } else {
1095                        /* We can get here if the user closes the thread before natural completion/or error */
1096                        assert (!"packet_hash_locked() The user terminated the trace in a abnormal manner");
1097                }
1098                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1099        }
1100}
1101
1102/**
1103 * For the first packet of each queue we keep a copy and note the system
1104 * time it was received at.
1105 *
1106 * This is used for finding the first packet when playing back a trace
1107 * in trace time. And can be used by real time applications to print
1108 * results out every XXX seconds.
1109 */
1110void store_first_packet(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t)
1111{
1112        if (!t->recorded_first) {
1113                struct timeval tv;
1114                libtrace_packet_t * dup;
1115                // For what it's worth we can call these outside of the lock
1116                gettimeofday(&tv, NULL);
1117                dup = trace_copy_packet(packet);
1118                ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
1119                libtrace->first_packets.packets[t->perpkt_num].packet = dup;
1120                //printf("Stored first packet time=%f\n", trace_get_seconds(dup));
1121                memcpy(&libtrace->first_packets.packets[t->perpkt_num].tv, &tv, sizeof(tv));
1122                // Now update the first
1123                libtrace->first_packets.count++;
1124                if (libtrace->first_packets.count == 1) {
1125                        // We the first entry hence also the first known packet
1126                        libtrace->first_packets.first = t->perpkt_num;
1127                } else {
1128                        // Check if we are newer than the previous 'first' packet
1129                        size_t first = libtrace->first_packets.first;
1130                        if (trace_get_seconds(dup) <
1131                                trace_get_seconds(libtrace->first_packets.packets[first].packet))
1132                                libtrace->first_packets.first = t->perpkt_num;
1133                }
1134                ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
1135                libtrace_message_t mesg = {0};
1136                mesg.code = MESSAGE_FIRST_PACKET;
1137                trace_send_message_to_reporter(libtrace, &mesg);
1138                t->recorded_first = true;
1139        }
1140}
1141
1142/**
1143 * Returns 1 if it's certain that the first packet is truly the first packet
1144 * rather than a best guess based upon threads that have published so far.
1145 * Otherwise 0 is returned.
1146 * It's recommended that this result is stored rather than calling this
1147 * function again.
1148 */
1149DLLEXPORT int retrive_first_packet(libtrace_t *libtrace, libtrace_packet_t **packet, struct timeval **tv)
1150{
1151        int ret = 0;
1152        ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
1153        if (libtrace->first_packets.count) {
1154                *packet = libtrace->first_packets.packets[libtrace->first_packets.first].packet;
1155                *tv = &libtrace->first_packets.packets[libtrace->first_packets.first].tv;
1156                if (libtrace->first_packets.count == (size_t) libtrace->perpkt_thread_count) {
1157                        ret = 1;
1158                } else {
1159                        struct timeval curr_tv;
1160                        // If a second has passed since the first entry we will assume this is the very first packet
1161                        gettimeofday(&curr_tv, NULL);
1162                        if (curr_tv.tv_sec > (*tv)->tv_sec) {
1163                                if(curr_tv.tv_usec > (*tv)->tv_usec || curr_tv.tv_sec - (*tv)->tv_sec > 1) {
1164                                        ret = 1;
1165                                }
1166                        }
1167                }
1168        } else {
1169                *packet = NULL;
1170                *tv = NULL;
1171        }
1172        ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
1173        return ret;
1174}
1175
1176
1177DLLEXPORT uint64_t tv_to_usec(struct timeval *tv)
1178{
1179        return (uint64_t) tv->tv_sec*1000000ull + (uint64_t) tv->tv_usec;
1180}
1181
1182inline static struct timeval usec_to_tv(uint64_t usec)
1183{
1184        struct timeval tv;
1185        tv.tv_sec = usec / 1000000;
1186        tv.tv_usec = usec % 1000000;
1187        return tv;
1188}
1189
1190/** Similar to delay_tracetime but send messages to all threads periodically */
1191static void* reporter_entry(void *data) {
1192        libtrace_message_t message = {0};
1193        libtrace_t *trace = (libtrace_t *)data;
1194        libtrace_thread_t *t = &trace->reporter_thread;
1195
1196        fprintf(stderr, "Reporter thread starting\n");
1197
1198        /* Wait until all threads are started */
1199        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
1200        if (trace->state == STATE_ERROR) {
1201                thread_change_state(trace, t, THREAD_FINISHED, false);
1202                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1203                pthread_exit(NULL);
1204        }
1205        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1206
1207        message.code = MESSAGE_STARTING;
1208        message.sender = t;
1209        (*trace->reporter)(trace, NULL, &message);
1210        message.code = MESSAGE_RESUMING;
1211        (*trace->reporter)(trace, NULL, &message);
1212
1213        while (!trace_finished(trace)) {
1214                if (trace->config.reporter_polling) {
1215                        if (libtrace_message_queue_try_get(&t->messages, &message) == LIBTRACE_MQ_FAILED)
1216                                message.code = MESSAGE_POST_REPORTER;
1217                } else {
1218                        libtrace_message_queue_get(&t->messages, &message);
1219                }
1220                switch (message.code) {
1221                        // Check for results
1222                        case MESSAGE_POST_REPORTER:
1223                                trace->combiner.read(trace, &trace->combiner);
1224                                break;
1225                        case MESSAGE_DO_PAUSE:
1226                                assert(trace->combiner.pause);
1227                                trace->combiner.pause(trace, &trace->combiner);
1228                                message.code = MESSAGE_PAUSING;
1229                                message.sender = t;
1230                                (*trace->reporter)(trace, NULL, &message);
1231                                trace_thread_pause(trace, t);
1232                                message.code = MESSAGE_RESUMING;
1233                                (*trace->reporter)(trace, NULL, &message);
1234                                break;
1235                        default:
1236                                (*trace->reporter)(trace, NULL, &message);
1237                }
1238        }
1239
1240        // Flush out whats left now all our threads have finished
1241        trace->combiner.read_final(trace, &trace->combiner);
1242
1243        // GOODBYE
1244        message.code = MESSAGE_PAUSING;
1245        message.sender = t;
1246        (*trace->reporter)(trace, NULL, &message);
1247        message.code = MESSAGE_STOPPING;
1248        (*trace->reporter)(trace, NULL, &message);
1249
1250        thread_change_state(trace, &trace->reporter_thread, THREAD_FINISHED, true);
1251        print_memory_stats();
1252        return NULL;
1253}
1254
1255/** Similar to delay_tracetime but send messages to all threads periodically */
1256static void* keepalive_entry(void *data) {
1257        struct timeval prev, next;
1258        libtrace_message_t message = {0};
1259        libtrace_t *trace = (libtrace_t *)data;
1260        uint64_t next_release;
1261        fprintf(stderr, "keepalive thread is starting\n");
1262
1263        /* Wait until all threads are started */
1264        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
1265        if (trace->state == STATE_ERROR) {
1266                thread_change_state(trace, &trace->keepalive_thread, THREAD_FINISHED, false);
1267                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1268                pthread_exit(NULL);
1269        }
1270        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1271
1272        gettimeofday(&prev, NULL);
1273        message.code = MESSAGE_TICK;
1274        while (trace->state != STATE_FINSHED) {
1275                fd_set rfds;
1276                next_release = tv_to_usec(&prev) + (trace->config.tick_interval * 1000);
1277                gettimeofday(&next, NULL);
1278                if (next_release > tv_to_usec(&next)) {
1279                        next = usec_to_tv(next_release - tv_to_usec(&next));
1280                        // Wait for timeout or a message
1281                        FD_ZERO(&rfds);
1282                        FD_SET(libtrace_message_queue_get_fd(&trace->keepalive_thread.messages), &rfds);
1283                        if (select(libtrace_message_queue_get_fd(&trace->keepalive_thread.messages)+1, &rfds, NULL, NULL, &next) == 1) {
1284                                libtrace_message_t msg;
1285                                libtrace_message_queue_get(&trace->keepalive_thread.messages, &msg);
1286                                assert(msg.code == MESSAGE_DO_STOP);
1287                                goto done;
1288                        }
1289                }
1290                prev = usec_to_tv(next_release);
1291                if (trace->state == STATE_RUNNING) {
1292                        message.additional.uint64 = tv_to_usec(&prev);
1293                        trace_send_message_to_perpkts(trace, &message);
1294                }
1295        }
1296done:
1297
1298        thread_change_state(trace, &trace->keepalive_thread, THREAD_FINISHED, true);
1299        return NULL;
1300}
1301
1302/**
1303 * Delays a packets playback so the playback will be in trace time.
1304 * This may break early if a message becomes available.
1305 *
1306 * Requires the first packet for this thread to be received.
1307 * @param libtrace  The trace
1308 * @param packet    The packet to delay
1309 * @param t         The current thread
1310 * @return Either READ_MESSAGE(-2) or 0 is successful
1311 */
1312static inline int delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t) {
1313        struct timeval curr_tv, pkt_tv;
1314        uint64_t next_release = t->tracetime_offset_usec;
1315        uint64_t curr_usec;
1316
1317        if (!t->tracetime_offset_usec) {
1318                libtrace_packet_t *first_pkt;
1319                struct timeval *sys_tv;
1320                int64_t initial_offset;
1321                int stable = retrive_first_packet(libtrace, &first_pkt, &sys_tv);
1322                assert(first_pkt);
1323                pkt_tv = trace_get_timeval(first_pkt);
1324                initial_offset = (int64_t)tv_to_usec(sys_tv) - (int64_t)tv_to_usec(&pkt_tv);
1325                /* In the unlikely case offset is 0, change it to 1 */
1326                if (stable)
1327                        t->tracetime_offset_usec = initial_offset ? initial_offset: 1;
1328                next_release = initial_offset;
1329        }
1330        /* next_release == offset */
1331        pkt_tv = trace_get_timeval(packet);
1332        next_release += tv_to_usec(&pkt_tv);
1333        gettimeofday(&curr_tv, NULL);
1334        curr_usec = tv_to_usec(&curr_tv);
1335        if (next_release > curr_usec) {
1336                int ret, mesg_fd = libtrace_message_queue_get_fd(&t->messages);
1337                struct timeval delay_tv = usec_to_tv(next_release-curr_usec);
1338                fd_set rfds;
1339                FD_ZERO(&rfds);
1340                FD_SET(mesg_fd, &rfds);
1341                // We need to wait
1342
1343                //printf("WAITING for %d.%d next=%"PRIu64" curr=%"PRIu64" seconds packettime %f\n", delay_tv.tv_sec, delay_tv.tv_usec, next_release, curr_usec, trace_get_seconds(packet));
1344                ret = select(mesg_fd+1, &rfds, NULL, NULL, &delay_tv);
1345                if (ret == 0) {
1346                        return 0;
1347                } else if (ret > 0) {
1348                        return READ_MESSAGE;
1349                } else {
1350                        fprintf(stderr, "I thnik we broke select\n");
1351                }
1352        }
1353        return 0;
1354}
1355
1356/* Discards packets that don't match the filter.
1357 * Discarded packets are emptied and then moved to the end of the packet list.
1358 *
1359 * @param trace       The trace format, containing the filter
1360 * @param packets     An array of packets
1361 * @param nb_packets  The number of valid items in packets
1362 *
1363 * @return The number of packets that passed the filter, which are moved to
1364 *          the start of the packets array
1365 */
1366static inline size_t filter_packets(libtrace_t *trace,
1367                                    libtrace_packet_t **packets,
1368                                    size_t nb_packets) {
1369        size_t offset = 0;
1370        size_t i;
1371
1372        for (i = 0; i < nb_packets; ++i) {
1373                // The filter needs the trace attached to receive the link type
1374                packets[i]->trace = trace;
1375                if (trace_apply_filter(trace->filter, packets[i])) {
1376                        libtrace_packet_t *tmp;
1377                        tmp = packets[offset];
1378                        packets[offset++] = packets[i];
1379                        packets[i] = tmp;
1380                } else {
1381                        trace_fin_packet(packets[i]);
1382                }
1383        }
1384
1385        return offset;
1386}
1387
1388/* Read a batch of packets from the trace into a buffer.
1389 * Note that this function will block until a packet is read (or EOF is reached)
1390 *
1391 * @param libtrace    The trace
1392 * @param t           The thread
1393 * @param packets     An array of packets
1394 * @param nb_packets  The number of empty packets in packets
1395 * @return The number of packets read, 0 on EOF (or an error/message -1,-2).
1396 */
1397static int trace_pread_packet_wrapper(libtrace_t *libtrace,
1398                                      libtrace_thread_t *t,
1399                                      libtrace_packet_t *packets[],
1400                                      size_t nb_packets) {
1401        int i;
1402        assert(nb_packets);
1403        assert(libtrace && "libtrace is NULL in trace_read_packet()");
1404        if (trace_is_err(libtrace))
1405                return -1;
1406        if (!libtrace->started) {
1407                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
1408                              "You must call libtrace_start() before trace_read_packet()\n");
1409                return -1;
1410        }
1411
1412        if (libtrace->format->pread_packets) {
1413                int ret;
1414                for (i = 0; i < (int) nb_packets; ++i) {
1415                        assert(i[packets]);
1416                        if (!(packets[i]->buf_control==TRACE_CTRL_PACKET ||
1417                              packets[i]->buf_control==TRACE_CTRL_EXTERNAL)) {
1418                                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,
1419                                              "Packet passed to trace_read_packet() is invalid\n");
1420                                return -1;
1421                        }
1422                }
1423                do {
1424                        ret=libtrace->format->pread_packets(libtrace, t,
1425                                                            packets,
1426                                                            nb_packets);
1427                        /* Error, EOF or message? */
1428                        if (ret <= 0) {
1429                                return ret;
1430                        }
1431
1432                        if (libtrace->filter) {
1433                                int remaining;
1434                                remaining = filter_packets(libtrace,
1435                                                           packets, ret);
1436                                t->filtered_packets += ret - remaining;
1437                                ret = remaining;
1438                        }
1439                        for (i = 0; i < ret; ++i) {
1440                                /* We do not mark the packet against the trace,
1441                                 * before hand or after. After breaks DAG meta
1442                                 * packets and before is inefficient */
1443                                //packets[i]->trace = libtrace;
1444                                /* TODO IN FORMAT?? Like traditional libtrace */
1445                                if (libtrace->snaplen>0)
1446                                        trace_set_capture_length(packets[i],
1447                                                        libtrace->snaplen);
1448                                trace_packet_set_order(packets[i], trace_get_erf_timestamp(packets[i]));
1449                        }
1450                } while(ret == 0);
1451                return ret;
1452        }
1453        trace_set_err(libtrace, TRACE_ERR_UNSUPPORTED,
1454                      "This format does not support reading packets\n");
1455        return ~0U;
1456}
1457
1458/* Restarts a parallel trace, this is called from trace_pstart.
1459 * The libtrace lock is held upon calling this function.
1460 * Typically with a parallel trace the threads are not
1461 * killed rather.
1462 */
1463static int trace_prestart(libtrace_t * libtrace, void *global_blob,
1464                          fn_per_pkt per_pkt, fn_reporter reporter) {
1465        int err = 0;
1466        if (libtrace->state != STATE_PAUSED) {
1467                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
1468                        "trace(%s) is not currently paused",
1469                              libtrace->uridata);
1470                return -1;
1471        }
1472
1473        /* Update functions if requested */
1474        if (per_pkt)
1475                libtrace->per_pkt = per_pkt;
1476        if (reporter)
1477                libtrace->reporter = reporter;
1478        if(global_blob)
1479                libtrace->global_blob = global_blob;
1480
1481        assert(libtrace_parallel);
1482        assert(!libtrace->perpkt_thread_states[THREAD_RUNNING]);
1483        assert(libtrace->per_pkt);
1484
1485        if (libtrace->perpkt_thread_count > 1 &&
1486            trace_supports_parallel(libtrace) &&
1487            !trace_has_dedicated_hasher(libtrace)) {
1488                fprintf(stderr, "Restarting trace pstart_input()\n");
1489                err = libtrace->format->pstart_input(libtrace);
1490        } else {
1491                if (libtrace->format->start_input) {
1492                        fprintf(stderr, "Restarting trace start_input()\n");
1493                        err = libtrace->format->start_input(libtrace);
1494                }
1495        }
1496
1497        if (err == 0) {
1498                libtrace->started = true;
1499                libtrace_change_state(libtrace, STATE_RUNNING, false);
1500        }
1501        return err;
1502}
1503
1504/**
1505 * Verifies the configuration and sets default values for any values not
1506 * specified by the user.
1507 * @return
1508 */
1509static void verify_configuration(libtrace_t *libtrace) {
1510        bool require_hasher = false;
1511
1512        /* Might we need a dedicated hasher thread? */
1513        if (libtrace->hasher && libtrace->hasher_type != HASHER_HARDWARE) {
1514                require_hasher = true;
1515        }
1516
1517        if (libtrace->config.hasher_queue_size <= 0)
1518                libtrace->config.hasher_queue_size = 1000;
1519
1520        if (libtrace->config.perpkt_threads <= 0) {
1521                // TODO add BSD support
1522                libtrace->perpkt_thread_count = sysconf(_SC_NPROCESSORS_ONLN);
1523                if (libtrace->perpkt_thread_count <= 0)
1524                        // Lets just use one
1525                        libtrace->perpkt_thread_count = 1;
1526        } else {
1527                libtrace->perpkt_thread_count = libtrace->config.perpkt_threads;
1528        }
1529
1530        if (libtrace->config.reporter_thold <= 0)
1531                libtrace->config.reporter_thold = 100;
1532        if (libtrace->config.burst_size <= 0)
1533                libtrace->config.burst_size = 10;
1534        if (libtrace->config.packet_thread_cache_size <= 0)
1535                libtrace->config.packet_thread_cache_size = 20;
1536        if (libtrace->config.packet_cache_size <= 0)
1537                libtrace->config.packet_cache_size = (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count;
1538
1539        if (libtrace->config.packet_cache_size <
1540                (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count)
1541                fprintf(stderr, "WARNING deadlocks may occur and extra memory allocating buffer sizes (packet_freelist_size) mismatched\n");
1542
1543        if (libtrace->combiner.initialise == NULL && libtrace->combiner.publish == NULL)
1544                libtrace->combiner = combiner_unordered;
1545
1546
1547        /* Figure out if we are using a dedicated hasher thread? */
1548        if (require_hasher && libtrace->perpkt_thread_count > 1) {
1549                libtrace->hasher_thread.type = THREAD_HASHER;
1550        }
1551}
1552
1553/**
1554 * Starts a libtrace_thread, including allocating memory for messaging.
1555 * Threads are expected to wait until the libtrace look is released.
1556 * Hence why we don't init structures until later.
1557 *
1558 * @param trace The trace the thread is associated with
1559 * @param t The thread that is filled when the thread is started
1560 * @param type The type of thread
1561 * @param start_routine The entry location of the thread
1562 * @param perpkt_num The perpkt thread number (should be set -1 if not perpkt)
1563 * @param name For debugging purposes set the threads name (Optional)
1564 *
1565 * @return 0 on success or -1 upon error in which case the libtrace error is set.
1566 *         In this situation the thread structure is zeroed.
1567 */
1568static int trace_start_thread(libtrace_t *trace,
1569                       libtrace_thread_t *t,
1570                       enum thread_types type,
1571                       void *(*start_routine) (void *),
1572                       int perpkt_num,
1573                       const char *name) {
1574        int ret;
1575        assert(t->type == THREAD_EMPTY);
1576        t->trace = trace;
1577        t->ret = NULL;
1578        t->user_data = NULL;
1579        t->type = type;
1580        t->state = THREAD_RUNNING;
1581        ret = pthread_create(&t->tid, NULL, start_routine, (void *) trace);
1582        if (ret != 0) {
1583                libtrace_zero_thread(t);
1584                trace_set_err(trace, ret, "Failed to create a thread");
1585                return -1;
1586        }
1587        libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
1588        if (trace_has_dedicated_hasher(trace) && type == THREAD_PERPKT) {
1589                libtrace_ringbuffer_init(&t->rbuffer,
1590                                         trace->config.hasher_queue_size,
1591                                         trace->config.hasher_polling?
1592                                                 LIBTRACE_RINGBUFFER_POLLING:
1593                                                 LIBTRACE_RINGBUFFER_BLOCKING);
1594        }
1595        if(name)
1596                pthread_setname_np(t->tid, name);
1597        t->perpkt_num = perpkt_num;
1598        return 0;
1599}
1600
1601/** Parses the environment variable LIBTRACE_CONF into the supplied
1602 * configuration structure.
1603 *
1604 * @param libtrace The trace from which we determine the URI
1605 * @param uc A configuration structure to be configured.
1606 *
1607 * We search for 3 environment variables and apply them to the config in the
1608 * following order. Such that the first has the lowest priority.
1609 *
1610 * 1. LIBTRACE_CONF, The global environment configuration
1611 * 2. LIBTRACE_CONF_<FORMAT>, Applied to a given format
1612 * 3. LIBTRACE_CONF_<FORMAT_URI>, Applied the specified trace
1613 *
1614 * E.g.
1615 * - int:eth0 would match LIBTRACE_CONF, LIBTRACE_CONF_INT, LIBTRACE_CONF_INT_ETH0
1616 * - dag:/dev/dag0,0 would match LIBTRACE_CONF, LIBTRACE_CONF_DAG, LIBTRACE_CONF_DAG__DEV_DAG0_0
1617 * - test.erf would match LIBTRACE_CONF, LIBTRACE_CONF_ERF, LIBTRACE_CONF_ERF_TEST_ERF
1618 *
1619 * @note All enironment variables names MUST only contian
1620 * [A-Z], [0-9] and [_] (underscore) and not start with a number. Any characters
1621 * outside of this range should be captilised if possible or replaced with an
1622 * underscore.
1623 */
1624static void parse_env_config (libtrace_t *libtrace, struct user_configuration* uc) {
1625        char env_name[1024] = "LIBTRACE_CONF_";
1626        size_t len = strlen(env_name);
1627        size_t mark = 0;
1628        size_t i;
1629        char * env;
1630
1631        /* Make our compound string */
1632        strncpy(&env_name[len], libtrace->format->name, sizeof(env_name) - len);
1633        len += strlen(libtrace->format->name);
1634        strncpy(&env_name[len], ":", sizeof(env_name) - len);
1635        len += 1;
1636        strncpy(&env_name[len], libtrace->uridata, sizeof(env_name) - len);
1637
1638        /* env names are allowed to be A-Z (CAPS) 0-9 and _ */
1639        for (i = 0; env_name[i] != 0; ++i) {
1640                env_name[i] = toupper(env_name[i]);
1641                if(env_name[i] == ':') {
1642                        mark = i;
1643                }
1644                if (!( (env_name[i] >= 'A' && env_name[i] <= 'Z') ||
1645                       (env_name[i] >= '0' && env_name[i] <= '9') )) {
1646                        env_name[i] = '_';
1647                }
1648        }
1649
1650        /* First apply global env settings LIBTRACE_CONF */
1651        env = getenv("LIBTRACE_CONF");
1652        if (env)
1653        {
1654                printf("Got env %s", env);
1655                parse_user_config(uc, env);
1656        }
1657
1658        /* Then format settings LIBTRACE_CONF_<FORMAT> */
1659        if (mark != 0) {
1660                env_name[mark] = 0;
1661                env = getenv(env_name);
1662                if (env) {
1663                        printf("Got %s=%s", env_name, env);
1664                        parse_user_config(uc, env);
1665                }
1666                env_name[mark] = '_';
1667        }
1668
1669        /* Finally this specific trace LIBTRACE_CONF_<FORMAT_URI> */
1670        env = getenv(env_name);
1671        if (env) {
1672                printf("Got %s=%s", env_name, env);
1673                parse_user_config(uc, env);
1674        }
1675}
1676
1677/* Start an input trace in the parallel libtrace framework.
1678 * This can also be used to restart an existing parallel.
1679 *
1680 * NOTE: libtrace lock is held for the majority of this function
1681 *
1682 * @param libtrace the input trace to start
1683 * @param global_blob some global data you can share with the new perpkt threads
1684 * @returns 0 on success, otherwise -1 to indicate an error has occured
1685 */
1686DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob,
1687                           fn_per_pkt per_pkt, fn_reporter reporter) {
1688        int i;
1689        int ret = -1;
1690        char name[16];
1691        sigset_t sig_before, sig_block_all;
1692        assert(libtrace);
1693
1694        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1695        if (trace_is_err(libtrace)) {
1696                goto cleanup_none;
1697        }
1698
1699        if (libtrace->state == STATE_PAUSED) {
1700                ret = trace_prestart(libtrace, global_blob, per_pkt, reporter);
1701                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1702                return ret;
1703        }
1704
1705        if (libtrace->state != STATE_NEW) {
1706                trace_set_err(libtrace, TRACE_ERR_BAD_STATE, "trace_pstart "
1707                              "should be called on a NEW or PAUSED trace but "
1708                              "instead was called from %s",
1709                              get_trace_state_name(libtrace->state));
1710                goto cleanup_none;
1711        }
1712
1713        /* Store the user defined things against the trace */
1714        libtrace->global_blob = global_blob;
1715        libtrace->per_pkt = per_pkt;
1716        libtrace->reporter = reporter;
1717        /* And zero other fields */
1718        for (i = 0; i < THREAD_STATE_MAX; ++i) {
1719                libtrace->perpkt_thread_states[i] = 0;
1720        }
1721        libtrace->first_packets.first = 0;
1722        libtrace->first_packets.count = 0;
1723        libtrace->first_packets.packets = NULL;
1724        libtrace->perpkt_threads = NULL;
1725        /* Set a global which says we are using a parallel trace. This is
1726         * for backwards compatability due to changes when destroying packets */
1727        libtrace_parallel = 1;
1728
1729        /* Parses configuration passed through environment variables */
1730        parse_env_config(libtrace, &libtrace->config);
1731        verify_configuration(libtrace);
1732
1733        /* Try start the format - we prefer parallel over single threaded, as
1734         * these formats should support messages better */
1735        if (trace_supports_parallel(libtrace) &&
1736            !trace_has_dedicated_hasher(libtrace)) {
1737                printf("Using the parallel trace format\n");
1738                ret = libtrace->format->pstart_input(libtrace);
1739                libtrace->pread = trace_pread_packet_wrapper;
1740        } else {
1741                printf("Using single threaded interface\n");
1742                if (libtrace->format->start_input) {
1743                        ret = libtrace->format->start_input(libtrace);
1744                }
1745                if (libtrace->perpkt_thread_count > 1)
1746                        libtrace->pread = trace_pread_packet_first_in_first_served;
1747                else
1748                        /* Use standard read_packet */
1749                        libtrace->pread = NULL;
1750        }
1751
1752        if (ret != 0) {
1753                goto cleanup_none;
1754        }
1755
1756        /* --- Start all the threads we need --- */
1757        /* Disable signals because it is inherited by the threads we start */
1758        sigemptyset(&sig_block_all);
1759        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_block_all, &sig_before), == 0);
1760
1761        /* If we need a hasher thread start it
1762         * Special Case: If single threaded we don't need a hasher
1763         */
1764        if (trace_has_dedicated_hasher(libtrace)) {
1765                libtrace->hasher_thread.type = THREAD_EMPTY;
1766                ret = trace_start_thread(libtrace, &libtrace->hasher_thread,
1767                                   THREAD_HASHER, hasher_entry, -1,
1768                                   "hasher-thread");
1769                if (ret != 0) {
1770                        trace_set_err(libtrace, errno, "trace_pstart "
1771                                      "failed to start a hasher thread.");
1772                        goto cleanup_started;
1773                }
1774                libtrace->pread = trace_pread_packet_hasher_thread;
1775        } else {
1776                libtrace->hasher_thread.type = THREAD_EMPTY;
1777        }
1778
1779        /* Start up our perpkt threads */
1780        libtrace->perpkt_threads = calloc(sizeof(libtrace_thread_t),
1781                                          libtrace->perpkt_thread_count);
1782        if (!libtrace->perpkt_threads) {
1783                trace_set_err(libtrace, errno, "trace_pstart "
1784                              "failed to allocate memory.");
1785                goto cleanup_threads;
1786        }
1787        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1788                snprintf(name, sizeof(name), "perpkt-%d", i);
1789                libtrace_zero_thread(&libtrace->perpkt_threads[i]);
1790                ret = trace_start_thread(libtrace, &libtrace->perpkt_threads[i],
1791                                   THREAD_PERPKT, perpkt_threads_entry, i,
1792                                   name);
1793                if (ret != 0) {
1794                        trace_set_err(libtrace, errno, "trace_pstart "
1795                                      "failed to start a perpkt thread.");
1796                        goto cleanup_threads;
1797                }
1798        }
1799
1800        /* Start the reporter thread */
1801        if (reporter) {
1802                if (libtrace->combiner.initialise)
1803                        libtrace->combiner.initialise(libtrace, &libtrace->combiner);
1804                ret = trace_start_thread(libtrace, &libtrace->reporter_thread,
1805                                   THREAD_REPORTER, reporter_entry, -1,
1806                                   "reporter_thread");
1807                if (ret != 0) {
1808                        trace_set_err(libtrace, errno, "trace_pstart "
1809                                      "failed to start reporter thread.");
1810                        goto cleanup_threads;
1811                }
1812        }
1813
1814        /* Start the keepalive thread */
1815        if (libtrace->config.tick_interval > 0) {
1816                ret = trace_start_thread(libtrace, &libtrace->keepalive_thread,
1817                                   THREAD_KEEPALIVE, keepalive_entry, -1,
1818                                   "keepalive_thread");
1819                if (ret != 0) {
1820                        trace_set_err(libtrace, errno, "trace_pstart "
1821                                      "failed to start keepalive thread.");
1822                        goto cleanup_threads;
1823                }
1824        }
1825
1826        /* Init other data structures */
1827        libtrace->perpkt_thread_states[THREAD_RUNNING] = libtrace->perpkt_thread_count;
1828        ASSERT_RET(pthread_spin_init(&libtrace->first_packets.lock, 0), == 0);
1829        libtrace->first_packets.packets = calloc(libtrace->perpkt_thread_count,
1830                                                 sizeof(struct  __packet_storage_magic_type));
1831        if (libtrace->first_packets.packets == NULL) {
1832                trace_set_err(libtrace, errno, "trace_pstart "
1833                              "failed to allocate memory.");
1834                goto cleanup_threads;
1835        }
1836
1837        if (libtrace_ocache_init(&libtrace->packet_freelist,
1838                             (void* (*)()) trace_create_packet,
1839                             (void (*)(void *))trace_destroy_packet,
1840                             libtrace->config.packet_thread_cache_size,
1841                             libtrace->config.packet_cache_size * 4,
1842                             libtrace->config.fixed_packet_count) != 0) {
1843                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "trace_pstart "
1844                              "failed to allocate ocache.");
1845                goto cleanup_threads;
1846        }
1847
1848        /* Threads don't start */
1849        libtrace->started = true;
1850        libtrace_change_state(libtrace, STATE_RUNNING, false);
1851
1852        ret = 0;
1853        goto success;
1854cleanup_threads:
1855        if (libtrace->first_packets.packets) {
1856                free(libtrace->first_packets.packets);
1857                libtrace->first_packets.packets = NULL;
1858        }
1859        libtrace_change_state(libtrace, STATE_ERROR, false);
1860        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1861        if (libtrace->hasher_thread.type == THREAD_HASHER) {
1862                pthread_join(libtrace->hasher_thread.tid, NULL);
1863                libtrace_zero_thread(&libtrace->hasher_thread);
1864        }
1865
1866        if (libtrace->perpkt_threads) {
1867                for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1868                        if (libtrace->perpkt_threads[i].type == THREAD_PERPKT) {
1869                                pthread_join(libtrace->perpkt_threads[i].tid, NULL);
1870                                libtrace_zero_thread(&libtrace->perpkt_threads[i]);
1871                        } else break;
1872                }
1873                free(libtrace->perpkt_threads);
1874                libtrace->perpkt_threads = NULL;
1875        }
1876
1877        if (libtrace->reporter_thread.type == THREAD_REPORTER) {
1878                pthread_join(libtrace->reporter_thread.tid, NULL);
1879                libtrace_zero_thread(&libtrace->reporter_thread);
1880        }
1881
1882        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
1883                pthread_join(libtrace->keepalive_thread.tid, NULL);
1884                libtrace_zero_thread(&libtrace->keepalive_thread);
1885        }
1886        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1887        libtrace_change_state(libtrace, STATE_NEW, false);
1888        assert(libtrace->perpkt_thread_states[THREAD_RUNNING] == 0);
1889        libtrace->perpkt_thread_states[THREAD_FINISHED] = 0;
1890cleanup_started:
1891        if (trace_supports_parallel(libtrace) &&
1892            !trace_has_dedicated_hasher(libtrace)
1893            && libtrace->perpkt_thread_count > 1) {
1894                if (libtrace->format->ppause_input)
1895                        libtrace->format->ppause_input(libtrace);
1896        } else {
1897                if (libtrace->format->pause_input)
1898                        libtrace->format->pause_input(libtrace);
1899        }
1900        ret = -1;
1901success:
1902        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_before, NULL), == 0);
1903cleanup_none:
1904        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1905        return ret;
1906}
1907
1908/**
1909 * Pauses a trace, this should only be called by the main thread
1910 * 1. Set started = false
1911 * 2. All perpkt threads are paused waiting on a condition var
1912 * 3. Then call ppause on the underlying format if found
1913 * 4. The traces state is paused
1914 *
1915 * Once done you should be able to modify the trace setup and call pstart again
1916 * TODO handle changing thread numbers
1917 */
1918DLLEXPORT int trace_ppause(libtrace_t *libtrace)
1919{
1920        libtrace_thread_t *t;
1921        int i;
1922        assert(libtrace);
1923
1924        t = get_thread_table(libtrace);
1925        // Check state from within the lock if we are going to change it
1926        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1927        if (!libtrace->started || libtrace->state != STATE_RUNNING) {
1928                fprintf(stderr, "pause failed started=%d state=%s (%d)\n", libtrace->started, get_trace_state_name(libtrace->state), libtrace->state);
1929                trace_set_err(libtrace,TRACE_ERR_BAD_STATE, "You must call trace_start() before calling trace_ppause()");
1930                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1931                return -1;
1932        }
1933
1934        libtrace_change_state(libtrace, STATE_PAUSING, false);
1935        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1936
1937        // Special case handle the hasher thread case
1938        if (trace_has_dedicated_hasher(libtrace)) {
1939                if (libtrace->config.debug_state)
1940                        fprintf(stderr, "Hasher thread is running, asking it to pause ...");
1941                libtrace_message_t message = {0};
1942                message.code = MESSAGE_DO_PAUSE;
1943                trace_send_message_to_thread(libtrace, &libtrace->hasher_thread, &message);
1944                // Wait for it to pause
1945                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1946                while (libtrace->hasher_thread.state == THREAD_RUNNING) {
1947                        ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
1948                }
1949                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1950                if (libtrace->config.debug_state)
1951                        fprintf(stderr, " DONE\n");
1952        }
1953
1954        if (libtrace->config.debug_state)
1955                fprintf(stderr, "Asking perpkt threads to pause ...");
1956        // Stop threads, skip this one if it's a perpkt
1957        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1958                if (&libtrace->perpkt_threads[i] != t) {
1959                        libtrace_message_t message = {0};
1960                        message.code = MESSAGE_DO_PAUSE;
1961                        trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message);
1962                        if(trace_has_dedicated_hasher(libtrace)) {
1963                                // The hasher has stopped and other threads have messages waiting therefore
1964                                // If the queues are empty the other threads would have no data
1965                                // So send some message packets to simply ask the threads to check
1966                                // We are the only writer since hasher has paused
1967                                libtrace_packet_t *pkt;
1968                                libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &pkt, 1, 1);
1969                                pkt->error = READ_MESSAGE;
1970                                libtrace_ringbuffer_write(&libtrace->perpkt_threads[i].rbuffer, pkt);
1971                        }
1972                } else {
1973                        fprintf(stderr, "Mapper threads should not be used to pause a trace this could cause any number of problems!!\n");
1974                }
1975        }
1976
1977        if (t) {
1978                // A perpkt is doing the pausing, interesting, fake an extra thread paused
1979                // We rely on the user to *not* return before starting the trace again
1980                thread_change_state(libtrace, t, THREAD_PAUSED, true);
1981        }
1982
1983        // Wait for all threads to pause
1984        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1985        while(libtrace->perpkt_thread_states[THREAD_RUNNING]) {
1986                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
1987        }
1988        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1989
1990        if (libtrace->config.debug_state)
1991                fprintf(stderr, " DONE\n");
1992
1993        // Deal with the reporter
1994        if (trace_has_dedicated_reporter(libtrace)) {
1995                if (libtrace->config.debug_state)
1996                        fprintf(stderr, "Reporter thread is running, asking it to pause ...");
1997                libtrace_message_t message = {0};
1998                message.code = MESSAGE_DO_PAUSE;
1999                trace_send_message_to_thread(libtrace, &libtrace->reporter_thread, &message);
2000                // Wait for it to pause
2001                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
2002                while (libtrace->reporter_thread.state == THREAD_RUNNING) {
2003                        ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
2004                }
2005                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2006                if (libtrace->config.debug_state)
2007                        fprintf(stderr, " DONE\n");
2008        }
2009
2010        /* Cache values before we pause */
2011        if (libtrace->stats == NULL)
2012                libtrace->stats = trace_create_statistics();
2013        // Save the statistics against the trace
2014        trace_get_statistics(libtrace, NULL);
2015        if (trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace) && libtrace->perpkt_thread_count > 1) {
2016                libtrace->started = false;
2017                if (libtrace->format->ppause_input)
2018                        libtrace->format->ppause_input(libtrace);
2019                // TODO What happens if we don't have pause input??
2020        } else {
2021                int err;
2022                fprintf(stderr, "Trace is not parallel so we are doing a normal pause %s\n", libtrace->uridata);
2023                err = trace_pause(libtrace);
2024                // We should handle this a bit better
2025                if (err)
2026                        return err;
2027        }
2028
2029        // Only set as paused after the pause has been called on the trace
2030        libtrace_change_state(libtrace, STATE_PAUSED, true);
2031        return 0;
2032}
2033
2034/**
2035 * Stop trace finish prematurely as though it meet an EOF
2036 * This should only be called by the main thread
2037 * 1. Calls ppause
2038 * 2. Sends a message asking for threads to finish
2039 * 3. Releases threads which will pause
2040 */
2041DLLEXPORT int trace_pstop(libtrace_t *libtrace)
2042{
2043        int i, err;
2044        libtrace_message_t message = {0};
2045        assert(libtrace);
2046
2047        // Ensure all threads have paused and the underlying trace format has
2048        // been closed and all packets associated are cleaned up
2049        // Pause will do any state checks for us
2050        err = trace_ppause(libtrace);
2051        if (err)
2052                return err;
2053
2054        // Now send a message asking the threads to stop
2055        // This will be retrieved before trying to read another packet
2056
2057        message.code = MESSAGE_DO_STOP;
2058        trace_send_message_to_perpkts(libtrace, &message);
2059        if (trace_has_dedicated_hasher(libtrace))
2060                trace_send_message_to_thread(libtrace, &libtrace->hasher_thread, &message);
2061
2062        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2063                trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message);
2064        }
2065
2066        // Now release the threads and let them stop
2067        libtrace_change_state(libtrace, STATE_FINSHED, true);
2068        return 0;
2069}
2070
2071/**
2072 * Set the hasher type along with a selected function, if hardware supports
2073 * that generic type of hashing it will be used otherwise the supplied
2074 * hasher function will be used and passed data when called.
2075 *
2076 * @return 0 if successful otherwise -1 on error
2077 */
2078DLLEXPORT int trace_set_hasher(libtrace_t *trace, enum hasher_types type, fn_hasher hasher, void *data) {
2079        int ret = -1;
2080        if (type == HASHER_HARDWARE || (type == HASHER_CUSTOM && !hasher) || (type == HASHER_BALANCE && hasher)) {
2081                return -1;
2082        }
2083
2084        // Save the requirements
2085        trace->hasher_type = type;
2086        if (hasher) {
2087                trace->hasher = hasher;
2088                trace->hasher_data = data;
2089        } else {
2090                trace->hasher = NULL;
2091                trace->hasher_data = NULL;
2092        }
2093
2094        // Try push this to hardware - NOTE hardware could do custom if
2095        // there is a more efficient way to apply it, in this case
2096        // it will simply grab the function out of libtrace_t
2097        if (trace->format->pconfig_input)
2098                ret = trace->format->pconfig_input(trace, TRACE_OPTION_SET_HASHER, &type);
2099
2100        if (ret == -1) {
2101                // We have to deal with this ourself
2102                // This most likely means single threaded reading of the trace
2103                if (!hasher) {
2104                        switch (type)
2105                        {
2106                                case HASHER_CUSTOM:
2107                                case HASHER_BALANCE:
2108                                        return 0;
2109                                case HASHER_BIDIRECTIONAL:
2110                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
2111                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
2112                                        toeplitz_init_config(trace->hasher_data, 1);
2113                                        return 0;
2114                                case HASHER_UNIDIRECTIONAL:
2115                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
2116                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
2117                                        toeplitz_init_config(trace->hasher_data, 0);
2118                                        return 0;
2119                                case HASHER_HARDWARE:
2120                                        return -1;
2121                        }
2122                        return -1;
2123                }
2124        } else {
2125                // The hardware is dealing with this yay
2126                trace->hasher_type = HASHER_HARDWARE;
2127        }
2128
2129        return 0;
2130}
2131
2132// Waits for all threads to finish
2133DLLEXPORT void trace_join(libtrace_t *libtrace) {
2134        int i;
2135
2136        /* Firstly wait for the perpkt threads to finish, since these are
2137         * user controlled */
2138        for (i=0; i< libtrace->perpkt_thread_count; i++) {
2139                //printf("Waiting to join with perpkt #%d\n", i);
2140                ASSERT_RET(pthread_join(libtrace->perpkt_threads[i].tid, NULL), == 0);
2141                //printf("Joined with perpkt #%d\n", i);
2142                // So we must do our best effort to empty the queue - so
2143                // the producer (or any other threads) don't block.
2144                libtrace_packet_t * packet;
2145                assert(libtrace->perpkt_threads[i].state == THREAD_FINISHED);
2146                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
2147                        if (packet) // This could be NULL iff the perpkt finishes early
2148                                trace_destroy_packet(packet);
2149        }
2150
2151        /* Now the hasher */
2152        if (trace_has_dedicated_hasher(libtrace)) {
2153                pthread_join(libtrace->hasher_thread.tid, NULL);
2154                assert(libtrace->hasher_thread.state == THREAD_FINISHED);
2155        }
2156
2157        // Now that everything is finished nothing can be touching our
2158        // buffers so clean them up
2159        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2160                // Its possible 1 packet got added by the reporter (or 1 per any other thread) since we cleaned up
2161                // if they lost timeslice before-during a write
2162                libtrace_packet_t * packet;
2163                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
2164                        trace_destroy_packet(packet);
2165                if (libtrace->hasher) {
2166                        assert(libtrace_ringbuffer_is_empty(&libtrace->perpkt_threads[i].rbuffer));
2167                        libtrace_ringbuffer_destroy(&libtrace->perpkt_threads[i].rbuffer);
2168                }
2169                // Cannot destroy vector yet, this happens with trace_destroy
2170        }
2171        // TODO consider perpkt threads marking trace as finished before join is called
2172        libtrace_change_state(libtrace, STATE_FINSHED, true);
2173
2174        if (trace_has_dedicated_reporter(libtrace)) {
2175                pthread_join(libtrace->reporter_thread.tid, NULL);
2176                assert(libtrace->reporter_thread.state == THREAD_FINISHED);
2177        }
2178
2179        // Wait for the tick (keepalive) thread if it has been started
2180        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
2181                libtrace_message_t msg = {0};
2182                msg.code = MESSAGE_DO_STOP;
2183                trace_send_message_to_thread(libtrace, &libtrace->keepalive_thread, &msg);
2184                pthread_join(libtrace->keepalive_thread.tid, NULL);
2185        }
2186
2187        libtrace_change_state(libtrace, STATE_JOINED, true);
2188        print_memory_stats();
2189}
2190
2191DLLEXPORT int libtrace_thread_get_message_count(libtrace_t * libtrace)
2192{
2193        libtrace_thread_t * t = get_thread_descriptor(libtrace);
2194        assert(t);
2195        return libtrace_message_queue_count(&t->messages);
2196}
2197
2198DLLEXPORT int libtrace_thread_get_message(libtrace_t * libtrace, libtrace_message_t * message)
2199{
2200        libtrace_thread_t * t = get_thread_descriptor(libtrace);
2201        assert(t);
2202        return libtrace_message_queue_get(&t->messages, message);
2203}
2204
2205DLLEXPORT int libtrace_thread_try_get_message(libtrace_t * libtrace, libtrace_message_t * message)
2206{
2207        libtrace_thread_t * t = get_thread_descriptor(libtrace);
2208        assert(t);
2209        return libtrace_message_queue_try_get(&t->messages, message);
2210}
2211
2212/**
2213 * Return backlog indicator
2214 */
2215DLLEXPORT int trace_post_reporter(libtrace_t *libtrace)
2216{
2217        libtrace_message_t message = {0};
2218        message.code = MESSAGE_POST_REPORTER;
2219        message.sender = get_thread_descriptor(libtrace);
2220        return libtrace_message_queue_put(&libtrace->reporter_thread.messages, (void *) &message);
2221}
2222
2223/**
2224 * Return backlog indicator
2225 */
2226DLLEXPORT int trace_send_message_to_reporter(libtrace_t * libtrace, libtrace_message_t * message)
2227{
2228        //printf("Sending message code=%d to reporter\n", message->code);
2229        message->sender = get_thread_descriptor(libtrace);
2230        return libtrace_message_queue_put(&libtrace->reporter_thread.messages, message);
2231}
2232
2233/**
2234 *
2235 */
2236DLLEXPORT int trace_send_message_to_thread(libtrace_t * libtrace, libtrace_thread_t *t, libtrace_message_t * message)
2237{
2238        //printf("Sending message code=%d to reporter\n", message->code);
2239        message->sender = get_thread_descriptor(libtrace);
2240        return libtrace_message_queue_put(&t->messages, message);
2241}
2242
2243DLLEXPORT int trace_send_message_to_perpkts(libtrace_t * libtrace, libtrace_message_t * message)
2244{
2245        int i;
2246        message->sender = get_thread_descriptor(libtrace);
2247        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2248                libtrace_message_queue_put(&libtrace->perpkt_threads[i].messages, message);
2249        }
2250        //printf("Sending message code=%d to reporter\n", message->code);
2251        return 0;
2252}
2253
2254DLLEXPORT void libtrace_result_set_key(libtrace_result_t * result, uint64_t key) {
2255        result->key = key;
2256}
2257DLLEXPORT uint64_t libtrace_result_get_key(libtrace_result_t * result) {
2258        return result->key;
2259}
2260DLLEXPORT void libtrace_result_set_value(libtrace_result_t * result, libtrace_generic_types_t value) {
2261        result->value = value;
2262}
2263DLLEXPORT libtrace_generic_types_t libtrace_result_get_value(libtrace_result_t * result) {
2264        return result->value;
2265}
2266DLLEXPORT void libtrace_result_set_key_value(libtrace_result_t * result, uint64_t key, libtrace_generic_types_t value) {
2267        result->key = key;
2268        result->value = value;
2269}
2270DLLEXPORT void trace_destroy_result(libtrace_result_t ** result) {
2271        free(*result);
2272        result = NULL;
2273        // TODO automatically back with a free list!!
2274}
2275
2276DLLEXPORT void * trace_get_global(libtrace_t *trace)
2277{
2278        return trace->global_blob;
2279}
2280
2281DLLEXPORT void * trace_set_global(libtrace_t *trace, void * data)
2282{
2283        if (trace->global_blob && trace->global_blob != data) {
2284                void * ret = trace->global_blob;
2285                trace->global_blob = data;
2286                return ret;
2287        } else {
2288                trace->global_blob = data;
2289                return NULL;
2290        }
2291}
2292
2293DLLEXPORT void * trace_get_tls(libtrace_thread_t *t)
2294{
2295        return t->user_data;
2296}
2297
2298DLLEXPORT void * trace_set_tls(libtrace_thread_t *t, void * data)
2299{
2300        if(t->user_data && t->user_data != data) {
2301                void *ret = t->user_data;
2302                t->user_data = data;
2303                return ret;
2304        } else {
2305                t->user_data = data;
2306                return NULL;
2307        }
2308}
2309
2310/**
2311 * Publishes a result to the reduce queue
2312 * Should only be called by a perpkt thread, i.e. from a perpkt handler
2313 */
2314DLLEXPORT void trace_publish_result(libtrace_t *libtrace, libtrace_thread_t *t, uint64_t key, libtrace_generic_types_t value, int type) {
2315        libtrace_result_t res;
2316        res.type = type;
2317        res.key = key;
2318        res.value = value;
2319        assert(libtrace->combiner.publish);
2320        libtrace->combiner.publish(libtrace, t->perpkt_num, &libtrace->combiner, &res);
2321        return;
2322}
2323
2324/**
2325 * Sets a combiner function against the trace.
2326 */
2327DLLEXPORT void trace_set_combiner(libtrace_t *trace, const libtrace_combine_t *combiner, libtrace_generic_types_t config){
2328        if (combiner) {
2329                trace->combiner = *combiner;
2330                trace->combiner.configuration = config;
2331        } else {
2332                // No combiner, so don't try use it
2333                memset(&trace->combiner, 0, sizeof(trace->combiner));
2334        }
2335}
2336
2337DLLEXPORT uint64_t trace_packet_get_order(libtrace_packet_t * packet) {
2338        return packet->order;
2339}
2340
2341DLLEXPORT uint64_t trace_packet_get_hash(libtrace_packet_t * packet) {
2342        return packet->hash;
2343}
2344
2345DLLEXPORT void trace_packet_set_order(libtrace_packet_t * packet, uint64_t order) {
2346        packet->order = order;
2347}
2348
2349DLLEXPORT void trace_packet_set_hash(libtrace_packet_t * packet, uint64_t hash) {
2350        packet->hash = hash;
2351}
2352
2353DLLEXPORT int trace_finished(libtrace_t * libtrace) {
2354        // TODO I don't like using this so much, we could use state!!!
2355        return libtrace->perpkt_thread_states[THREAD_FINISHED] == libtrace->perpkt_thread_count;
2356}
2357
2358DLLEXPORT int trace_parallel_config(libtrace_t *libtrace, trace_parallel_option_t option, void *value)
2359{
2360        UNUSED int ret = -1;
2361        switch (option) {
2362                case TRACE_OPTION_TICK_INTERVAL:
2363                        libtrace->config.tick_interval = *((int *) value);
2364                        return 1;
2365                case TRACE_OPTION_SET_HASHER:
2366                        return trace_set_hasher(libtrace, (enum hasher_types) *((int *) value), NULL, NULL);
2367                case TRACE_OPTION_SET_PERPKT_THREAD_COUNT:
2368                        libtrace->config.perpkt_threads = *((int *) value);
2369                        return 1;
2370                case TRACE_OPTION_TRACETIME:
2371                        if(*((int *) value))
2372                                libtrace->tracetime = 1;
2373                        else
2374                                libtrace->tracetime = 0;
2375                        return 0;
2376                case TRACE_OPTION_SET_CONFIG:
2377                        libtrace->config = *((struct user_configuration *) value);
2378                case TRACE_OPTION_GET_CONFIG:
2379                        *((struct user_configuration *) value) = libtrace->config;
2380        }
2381        return 0;
2382}
2383
2384static bool config_bool_parse(char *value, size_t nvalue) {
2385        if (strncmp(value, "true", nvalue) == 0)
2386                return true;
2387        else if (strncmp(value, "false", nvalue) == 0)
2388                return false;
2389        else
2390                return strtoll(value, NULL, 10) != 0;
2391}
2392
2393static void config_string(struct user_configuration *uc, char *key, size_t nkey, char *value, size_t nvalue) {
2394        assert(key);
2395        assert(value);
2396        assert(uc);
2397        if (strncmp(key, "packet_cache_size", nkey) == 0
2398            || strncmp(key, "pcs", nkey) == 0) {
2399                uc->packet_cache_size = strtoll(value, NULL, 10);
2400        } else if (strncmp(key, "packet_thread_cache_size", nkey) == 0
2401                   || strncmp(key, "ptcs", nkey) == 0) {
2402                uc->packet_thread_cache_size = strtoll(value, NULL, 10);
2403        } else if (strncmp(key, "fixed_packet_count", nkey) == 0
2404                   || strncmp(key, "fpc", nkey) == 0) {
2405                uc->fixed_packet_count = config_bool_parse(value, nvalue);
2406        } else if (strncmp(key, "burst_size", nkey) == 0
2407                   || strncmp(key, "bs", nkey) == 0) {
2408                uc->burst_size = strtoll(value, NULL, 10);
2409        } else if (strncmp(key, "tick_interval", nkey) == 0
2410                   || strncmp(key, "ti", nkey) == 0) {
2411                uc->tick_interval = strtoll(value, NULL, 10);
2412        } else if (strncmp(key, "tick_count", nkey) == 0
2413                   || strncmp(key, "tc", nkey) == 0) {
2414                uc->tick_count = strtoll(value, NULL, 10);
2415        } else if (strncmp(key, "perpkt_threads", nkey) == 0
2416                   || strncmp(key, "pt", nkey) == 0) {
2417                uc->perpkt_threads = strtoll(value, NULL, 10);
2418        } else if (strncmp(key, "hasher_queue_size", nkey) == 0
2419                   || strncmp(key, "hqs", nkey) == 0) {
2420                uc->hasher_queue_size = strtoll(value, NULL, 10);
2421        } else if (strncmp(key, "hasher_polling", nkey) == 0
2422                   || strncmp(key, "hp", nkey) == 0) {
2423                uc->hasher_polling = config_bool_parse(value, nvalue);
2424        } else if (strncmp(key, "reporter_polling", nkey) == 0
2425                   || strncmp(key, "rp", nkey) == 0) {
2426                uc->reporter_polling = config_bool_parse(value, nvalue);
2427        } else if (strncmp(key, "reporter_thold", nkey) == 0
2428                   || strncmp(key, "rt", nkey) == 0) {
2429                uc->reporter_thold = strtoll(value, NULL, 10);
2430        } else if (strncmp(key, "debug_state", nkey) == 0
2431                   || strncmp(key, "ds", nkey) == 0) {
2432                uc->debug_state = config_bool_parse(value, nvalue);
2433        } else {
2434                fprintf(stderr, "No matching value %s(=%s)\n", key, value);
2435        }
2436}
2437
2438DLLEXPORT void parse_user_config(struct user_configuration* uc, char * str) {
2439        char *pch;
2440        char key[100];
2441        char value[100];
2442        assert(str);
2443        assert(uc);
2444        pch = strtok (str," ,.-");
2445        while (pch != NULL)
2446        {
2447                if (sscanf(pch, "%99[^=]=%99s", key, value) == 2) {
2448                        config_string(uc, key, sizeof(key), value, sizeof(value));
2449                } else {
2450                        fprintf(stderr, "Error parsing %s\n", pch);
2451                }
2452                pch = strtok (NULL," ,.-");
2453        }
2454}
2455
2456DLLEXPORT void parse_user_config_file(struct user_configuration* uc, FILE *file) {
2457        char line[1024];
2458        while (fgets(line, sizeof(line), file) != NULL)
2459        {
2460                parse_user_config(uc, line);
2461        }
2462}
2463
2464DLLEXPORT libtrace_packet_t* trace_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet) {
2465        libtrace_packet_t* result;
2466        libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &result, 1, 1);
2467        assert(result);
2468        swap_packets(result, packet); // Move the current packet into our copy
2469        return result;
2470}
2471
2472DLLEXPORT void trace_free_result_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
2473        // Try write back the packet
2474        assert(packet);
2475        // Always release any resources this might be holding such as a slot in a ringbuffer
2476        trace_fin_packet(packet);
2477        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packet, 1, 1);
2478}
2479
2480DLLEXPORT libtrace_info_t *trace_get_information(libtrace_t * libtrace) {
2481        if (libtrace->format)
2482                return &libtrace->format->info;
2483        else
2484                return NULL;
2485}
Note: See TracBrowser for help on using the repository browser.