source: lib/trace_parallel.c @ 858ce90

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 858ce90 was 858ce90, checked in by Richard Sanger <rsangerarj@…>, 7 years ago

Refactoring the packet loop

Removes the pread type selection function and replaces it with a
function pointer created when the format is started.

Tidies the per packet loop, removes duplicate code and rewrites
tracetime-delaying so that a message can be processed while waiting
for the delay to return within a batch of packets.

In this case if a pause message is encountered we will first
notify the trace that we are pausing. Then send any remaining
packets from the batch without delay, and then properly pause. The resume
message is sent to the per packet thread before normal packet
flow continues.

This still is WIP and contains alot of debugging code. The error
path still needs to be better implemented

  • Property mode set to 100644
File size: 79.3 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * All rights reserved.
8 *
9 * This code has been developed by the University of Waikato WAND
10 * research group. For further information please see http://www.wand.net.nz/
11 *
12 * libtrace is free software; you can redistribute it and/or modify
13 * it under the terms of the GNU General Public License as published by
14 * the Free Software Foundation; either version 2 of the License, or
15 * (at your option) any later version.
16 *
17 * libtrace is distributed in the hope that it will be useful,
18 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 * GNU General Public License for more details.
21 *
22 * You should have received a copy of the GNU General Public License
23 * along with libtrace; if not, write to the Free Software
24 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25 *
26 * $Id$
27 *
28 */
29
30
31#define _GNU_SOURCE
32#include "common.h"
33#include "config.h"
34#include <assert.h>
35#include <errno.h>
36#include <fcntl.h>
37#include <stdio.h>
38#include <stdlib.h>
39#include <string.h>
40#include <sys/stat.h>
41#include <sys/types.h>
42#ifndef WIN32
43#include <sys/socket.h>
44#endif
45#include <stdarg.h>
46#include <sys/param.h>
47
48#ifdef HAVE_LIMITS_H
49#  include <limits.h>
50#endif
51
52#ifdef HAVE_SYS_LIMITS_H
53#  include <sys/limits.h>
54#endif
55
56#ifdef HAVE_NET_IF_ARP_H
57#  include <net/if_arp.h>
58#endif
59
60#ifdef HAVE_NET_IF_H
61#  include <net/if.h>
62#endif
63
64#ifdef HAVE_NETINET_IN_H
65#  include <netinet/in.h>
66#endif
67
68#ifdef HAVE_NET_ETHERNET_H
69#  include <net/ethernet.h>
70#endif
71
72#ifdef HAVE_NETINET_IF_ETHER_H
73#  include <netinet/if_ether.h>
74#endif
75
76#include <time.h>
77#ifdef WIN32
78#include <sys/timeb.h>
79#endif
80
81#include "libtrace.h"
82#include "libtrace_int.h"
83
84#ifdef HAVE_PCAP_BPF_H
85#  include <pcap-bpf.h>
86#else
87#  ifdef HAVE_NET_BPF_H
88#    include <net/bpf.h>
89#  endif
90#endif
91
92
93#include "libtrace_int.h"
94#include "format_helper.h"
95#include "rt_protocol.h"
96#include "hash_toeplitz.h"
97#include "combiners.h"
98
99#include <pthread.h>
100#include <signal.h>
101#include <unistd.h>
102
103static inline int delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t);
104extern int libtrace_parallel;
105
106struct multithreading_stats {
107        uint64_t full_queue_hits;
108        uint64_t wait_for_fill_complete_hits;
109} contention_stats[1024];
110
111struct mem_stats {
112        struct memfail {
113           uint64_t cache_hit;
114           uint64_t ring_hit;
115           uint64_t miss;
116           uint64_t recycled;
117        } readbulk, read, write, writebulk;
118};
119
120// Grrr gcc wants this spelt out
121__thread struct mem_stats mem_hits = {{0},{0},{0},{0}};
122
123static void print_memory_stats() {
124#if 0
125        char t_name[50];
126        uint64_t total;
127        pthread_getname_np(pthread_self(), t_name, sizeof(t_name));
128
129        fprintf(stderr, "Thread ID#%d - %s\n", (int) pthread_self(), t_name);
130
131        total = mem_hits.read.cache_hit + mem_hits.read.ring_hit + mem_hits.read.miss;
132        if (total) {
133                fprintf(stderr, "\tRead:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
134                                mem_hits.read.cache_hit, mem_hits.read.ring_hit, mem_hits.read.miss, mem_hits.read.recycled);
135                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
136                                total, (double) mem_hits.read.miss / (double) total * 100.0);
137        }
138
139        total = mem_hits.readbulk.cache_hit + mem_hits.readbulk.ring_hit + mem_hits.readbulk.miss;
140        if (total) {
141                fprintf(stderr, "\tReadbulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
142                                mem_hits.readbulk.cache_hit, mem_hits.readbulk.ring_hit, mem_hits.readbulk.miss, mem_hits.readbulk.recycled);
143
144
145                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
146                                total, (double) mem_hits.readbulk.miss / (double) total * 100.0);
147        }
148
149        total = mem_hits.write.cache_hit + mem_hits.write.ring_hit + mem_hits.write.miss;
150        if (total) {
151                fprintf(stderr, "\tWrite:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
152                                mem_hits.write.cache_hit, mem_hits.write.ring_hit, mem_hits.write.miss, mem_hits.write.recycled);
153
154                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
155                                total, (double) mem_hits.write.miss / (double) total * 100.0);
156        }
157
158        total = mem_hits.writebulk.cache_hit + mem_hits.writebulk.ring_hit + mem_hits.writebulk.miss;
159        if (total) {
160                fprintf(stderr, "\tWritebulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
161                                mem_hits.writebulk.cache_hit, mem_hits.writebulk.ring_hit, mem_hits.writebulk.miss, mem_hits.writebulk.recycled);
162
163                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
164                                total, (double) mem_hits.writebulk.miss / (double) total * 100.0);
165        }
166#endif
167}
168
169/**
170 * True if the trace has dedicated hasher thread otherwise false.
171 * This can be used once the hasher thread has been started.
172 */
173static inline int trace_has_dedicated_hasher(libtrace_t * libtrace)
174{
175        return libtrace->hasher_thread.type == THREAD_HASHER;
176}
177
178/**
179 * True if the trace has dedicated hasher thread otherwise false,
180 * to be used after the trace is running
181 */
182static inline int trace_has_dedicated_reporter(libtrace_t * libtrace)
183{
184        assert(libtrace->state != STATE_NEW);
185        return libtrace->reporter_thread.type == THREAD_REPORTER && libtrace->reporter;
186}
187
188/**
189 * When running the number of perpkt threads in use.
190 * TODO what if the trace is not running yet, or has finished??
191 *
192 * @brief libtrace_perpkt_thread_nb
193 * @param t The trace
194 * @return
195 */
196DLLEXPORT int libtrace_get_perpkt_count(libtrace_t * t) {
197        return t->perpkt_thread_count;
198}
199
200/**
201 * Changes a thread's state and broadcasts the condition variable. This
202 * should always be done when the lock is held.
203 *
204 * Additionally for perpkt threads the state counts are updated.
205 *
206 * @param trace A pointer to the trace
207 * @param t A pointer to the thread to modify
208 * @param new_state The new state of the thread
209 * @param need_lock Set to true if libtrace_lock is not held, otherwise
210 *        false in the case the lock is currently held by this thread.
211 */
212static inline void thread_change_state(libtrace_t *trace, libtrace_thread_t *t,
213        const enum thread_states new_state, const bool need_lock)
214{
215        enum thread_states prev_state;
216        if (need_lock)
217                pthread_mutex_lock(&trace->libtrace_lock);
218        prev_state = t->state;
219        t->state = new_state;
220        if (t->type == THREAD_PERPKT) {
221                --trace->perpkt_thread_states[prev_state];
222                ++trace->perpkt_thread_states[new_state];
223        }
224
225        if (trace->config.debug_state)
226                fprintf(stderr, "Thread %d state changed from %d to %d\n",
227                        (int) t->tid, prev_state, t->state);
228
229        pthread_cond_broadcast(&trace->perpkt_cond);
230        if (need_lock)
231                pthread_mutex_unlock(&trace->libtrace_lock);
232}
233
234/**
235 * Changes the overall traces state and signals the condition.
236 *
237 * @param trace A pointer to the trace
238 * @param new_state The new state of the trace
239 * @param need_lock Set to true if libtrace_lock is not held, otherwise
240 *        false in the case the lock is currently held by this thread.
241 */
242static inline void libtrace_change_state(libtrace_t *trace,
243        const enum trace_state new_state, const bool need_lock)
244{
245        UNUSED enum trace_state prev_state;
246        if (need_lock)
247                pthread_mutex_lock(&trace->libtrace_lock);
248        prev_state = trace->state;
249        trace->state = new_state;
250
251        if (trace->config.debug_state)
252                fprintf(stderr, "Trace(%s) state changed from %s to %s\n",
253                        trace->uridata, get_trace_state_name(prev_state),
254                        get_trace_state_name(trace->state));
255
256        pthread_cond_broadcast(&trace->perpkt_cond);
257        if (need_lock)
258                pthread_mutex_unlock(&trace->libtrace_lock);
259}
260
261/**
262 * @return True if the format supports parallel threads.
263 */
264static inline bool trace_supports_parallel(libtrace_t *trace)
265{
266        assert(trace);
267        assert(trace->format);
268        if (trace->format->pstart_input)
269                return true;
270        else
271                return false;
272}
273
274DLLEXPORT void print_contention_stats(libtrace_t *libtrace) {
275        int i;
276        struct multithreading_stats totals = {0};
277        for (i = 0; i < libtrace->perpkt_thread_count ; i++) {
278                fprintf(stderr, "\nStats for perpkt thread#%d\n", i);
279                fprintf(stderr, "\tfull_queue_hits: %"PRIu64"\n", contention_stats[i].full_queue_hits);
280                totals.full_queue_hits += contention_stats[i].full_queue_hits;
281                fprintf(stderr, "\twait_for_fill_complete_hits: %"PRIu64"\n", contention_stats[i].wait_for_fill_complete_hits);
282                totals.wait_for_fill_complete_hits += contention_stats[i].wait_for_fill_complete_hits;
283        }
284        fprintf(stderr, "\nTotals for perpkt threads\n");
285        fprintf(stderr, "\tfull_queue_hits: %"PRIu64"\n", totals.full_queue_hits);
286        fprintf(stderr, "\twait_for_fill_complete_hits: %"PRIu64"\n", totals.wait_for_fill_complete_hits);
287
288        return;
289}
290
291void libtrace_zero_thread(libtrace_thread_t * t) {
292        t->accepted_packets = 0;
293        t->filtered_packets = 0;
294        t->recorded_first = false;
295        t->tracetime_offset_usec = 0;
296        t->user_data = 0;
297        t->format_data = 0;
298        libtrace_zero_ringbuffer(&t->rbuffer);
299        t->trace = NULL;
300        t->ret = NULL;
301        t->type = THREAD_EMPTY;
302        t->perpkt_num = -1;
303}
304
305// Ints are aligned int is atomic so safe to read and write at same time
306// However write must be locked, read doesn't (We never try read before written to table)
307libtrace_thread_t * get_thread_table(libtrace_t *libtrace) {
308        int i = 0;
309        pthread_t tid = pthread_self();
310
311        for (;i<libtrace->perpkt_thread_count ;++i) {
312                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
313                        return &libtrace->perpkt_threads[i];
314        }
315        return NULL;
316}
317
318int get_thread_table_num(libtrace_t *libtrace) {
319        int i = 0;
320        pthread_t tid = pthread_self();
321        for (;i<libtrace->perpkt_thread_count; ++i) {
322                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
323                        return i;
324        }
325        return -1;
326}
327
328static libtrace_thread_t * get_thread_descriptor(libtrace_t *libtrace) {
329        libtrace_thread_t *ret;
330        if (!(ret = get_thread_table(libtrace))) {
331                pthread_t tid = pthread_self();
332                // Check if we are reporter or something else
333                if (pthread_equal(tid, libtrace->reporter_thread.tid))
334                        ret = &libtrace->reporter_thread;
335                else if (pthread_equal(tid, libtrace->hasher_thread.tid))
336                        ret = &libtrace->hasher_thread;
337                else
338                        ret = NULL;
339        }
340        return ret;
341}
342
343/** Makes a packet safe, a packet may become invaild after a
344 * pause (or stop/destroy) of a trace. This copies a packet
345 * in such a way that it will be able to survive a pause.
346 *
347 * However this will not allow the packet to be used after
348 * the format is destroyed. Or while the trace is still paused.
349 */
350DLLEXPORT void libtrace_make_packet_safe(libtrace_packet_t *pkt) {
351        // Duplicate the packet in standard malloc'd memory and free the
352        // original, This is a 1:1 exchange so is ocache count remains unchanged.
353        if (pkt->buf_control != TRACE_CTRL_PACKET) {
354                libtrace_packet_t *dup;
355                dup = trace_copy_packet(pkt);
356                /* Release the external buffer */
357                trace_fin_packet(pkt);
358                /* Copy the duplicated packet over the existing */
359                memcpy(pkt, dup, sizeof(libtrace_packet_t));
360        }
361}
362
363/**
364 * Makes a libtrace_result_t safe, used when pausing a trace.
365 * This will call libtrace_make_packet_safe if the result is
366 * a packet.
367 */
368DLLEXPORT void libtrace_make_result_safe(libtrace_result_t *res) {
369        if (res->type == RESULT_PACKET) {
370                libtrace_make_packet_safe(res->value.pkt);
371        }
372}
373
374/**
375 * Holds threads in a paused state, until released by broadcasting
376 * the condition mutex.
377 */
378static void trace_thread_pause(libtrace_t *trace, libtrace_thread_t *t) {
379        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
380        thread_change_state(trace, t, THREAD_PAUSED, false);
381        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
382                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
383        }
384        thread_change_state(trace, t, THREAD_RUNNING, false);
385        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
386}
387
388/**
389 * Sends a packet to the user, expects either a valid packet or a TICK packet.
390 *
391 * Note READ_MESSAGE will only be returned if tracetime is true.
392 *
393 * @brief dispatch_packet
394 * @param trace
395 * @param t
396 * @param packet A pointer to the packet storage, which may be set to null upon
397 *               return, or a packet to be finished.
398 * @return 0 is successful, otherwise if playing back in tracetime
399 *         READ_MESSAGE(-2) can be returned in which case the packet is not sent.
400 */
401static inline int dispatch_packet(libtrace_t *trace,
402                                                 libtrace_thread_t *t,
403                                                 libtrace_packet_t **packet,
404                                                 bool tracetime) {
405        if ((*packet)->error > 0) {
406                if (tracetime) {
407                        if (delay_tracetime(trace, packet[0], t) == READ_MESSAGE)
408                                return READ_MESSAGE;
409                }
410                *packet = (*trace->per_pkt)(trace, *packet, NULL, t);
411                trace_fin_packet(*packet);
412        } else {
413                libtrace_message_t message;
414                assert((*packet)->error == READ_TICK);
415                message.code = MESSAGE_TICK;
416                message.additional.uint64 = trace_packet_get_order(*packet);
417                message.sender = t;
418                (*trace->per_pkt)(trace, NULL, &message, t);
419        }
420        return 0;
421}
422
423/**
424 * Pauses a per packet thread, messages will not be processed when the thread
425 * is paused.
426 *
427 * This process involves reading packets if a hasher thread is used. As such
428 * this function can fail to pause due to errors when reading in which case
429 * the thread should be stopped instead.
430 *
431 *
432 * @brief trace_perpkt_thread_pause
433 * @return READ_ERROR(-1) or READ_EOF(0) or 1 if successfull
434 */
435static int trace_perpkt_thread_pause(libtrace_t *trace, libtrace_thread_t *t,
436                                     libtrace_packet_t *packets[],
437                                     int *nb_packets, int *empty, int *offset) {
438        libtrace_message_t message = {0};
439        libtrace_packet_t * packet = NULL;
440
441        /* Let the user thread know we are going to pause */
442        message.code = MESSAGE_PAUSING;
443        message.sender = t;
444        (*trace->per_pkt)(trace, NULL, &message, t);
445
446        /* Send through any remaining packets (or messages) without delay */
447
448        /* First send those packets already read, as fast as possible
449         * This should never fail or check for messages etc. */
450        for (;*offset < *nb_packets; ++*offset) {
451                ASSERT_RET(dispatch_packet(trace, t, &packets[*offset], false), == 0);
452                /* Move full slots to front as we go */
453                if (packets[*offset]) {
454                        if (*empty != *offset) {
455                                packets[*empty] = packets[*offset];
456                                packets[*offset] = NULL;
457                        }
458                        ++*empty;
459                }
460        }
461
462        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
463        /* If a hasher thread is running, empty input queues so we don't lose data */
464        if (trace_has_dedicated_hasher(trace)) {
465                fprintf(stderr, "Trace is using a hasher thread emptying queues\n");
466                // The hasher has stopped by this point, so the queue shouldn't be filling
467                while(!libtrace_ringbuffer_is_empty(&t->rbuffer) || t->format_data) {
468                        int ret = trace->pread(trace, t, &packet, 1);
469                        if (ret == 1) {
470                                if (packet->error > 0) {
471                                        store_first_packet(trace, packet, t);
472                                }
473                                ASSERT_RET(dispatch_packet(trace, t, &packet, 1), == 0);
474                                if (packet == NULL)
475                                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
476                        } else if (ret != READ_MESSAGE) {
477                                /* Ignore messages we pick these up next loop */
478                                assert (ret == READ_EOF || ret == READ_ERROR);
479                                /* Verify no packets are remaining */
480                                /* TODO refactor this sanity check out!! */
481                                while (!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
482                                        ASSERT_RET(trace->pread(trace, t, &packet, 1), <= 0);
483                                        // No packets after this should have any data in them
484                                        assert(packet->error <= 0);
485                                }
486                                fprintf(stderr, "PREAD_FAILED %d\n", ret);
487                                libtrace_ocache_free(&trace->packet_freelist, (void **) &packet, 1, 1);
488                                return -1;
489                        }
490                }
491        }
492        libtrace_ocache_free(&trace->packet_freelist, (void **) &packet, 1, 1);
493
494        /* Now we do the actual pause, this returns when we resumed */
495        trace_thread_pause(trace, t);
496        message.code = MESSAGE_RESUMING;
497        (*trace->per_pkt)(trace, NULL, &message, t);
498        return 1;
499}
500
501/**
502 * The is the entry point for our packet processing threads.
503 */
504static void* perpkt_threads_entry(void *data) {
505        libtrace_t *trace = (libtrace_t *)data;
506        libtrace_thread_t *t;
507        libtrace_message_t message = {0};
508        libtrace_packet_t *packets[trace->config.burst_size];
509        size_t i;
510        //int ret;
511        /* The current reading position into the packets */
512        int offset = 0;
513        /* The number of packets last read */
514        int nb_packets = 0;
515        /* The offset to the first NULL packet upto offset */
516        int empty = 0;
517
518        /* Wait until trace_pstart has been completed */
519        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
520        t = get_thread_table(trace);
521        assert(t);
522        if (trace->state == STATE_ERROR) {
523                thread_change_state(trace, t, THREAD_FINISHED, false);
524                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
525                pthread_exit(NULL);
526        }
527        //printf("Yay Started perpkt thread #%d\n", (int) get_thread_table_num(trace));
528        if (trace->format->pregister_thread) {
529                trace->format->pregister_thread(trace, t, !trace_has_dedicated_hasher(trace));
530        }
531        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
532
533        /* Fill our buffer with empty packets */
534        memset(&packets, 0, sizeof(void*) * trace->config.burst_size);
535        libtrace_ocache_alloc(&trace->packet_freelist, (void **) packets,
536                              trace->config.burst_size,
537                              trace->config.burst_size);
538
539        /* ~~~~~~~~~~~ Setup complete now we loop ~~~~~~~~~~~~~~~ */
540        // Send a message to say we've started
541
542        // Let the per_packet function know we have started
543        message.code = MESSAGE_STARTING;
544        message.sender = t;
545        (*trace->per_pkt)(trace, NULL, &message, t);
546        message.code = MESSAGE_RESUMING;
547        (*trace->per_pkt)(trace, NULL, &message, t);
548
549
550        for (;;) {
551
552                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
553                        int ret;
554                        switch (message.code) {
555                                case MESSAGE_DO_PAUSE: // This is internal
556                                        ret = trace_perpkt_thread_pause(trace, t, packets, &nb_packets, &empty, &offset);
557                                        if (ret == READ_EOF) {
558                                                fprintf(stderr, "PAUSE stop eof!!\n");
559                                                goto eof;
560                                        } else if (ret == READ_ERROR) {
561                                                fprintf(stderr, "PAUSE stop error!!\n");
562                                                goto error;
563                                        }
564                                        assert(ret == 1);
565                                        continue;
566                                case MESSAGE_DO_STOP: // This is internal
567                                        fprintf(stderr, "DO_STOP stop!!\n");
568                                        goto eof;
569                        }
570                        (*trace->per_pkt)(trace, NULL, &message, t);
571                        /* Continue and the empty messages out before packets */
572                        continue;
573                }
574
575
576                /* Do we need to read a new set of packets MOST LIKELY we do */
577                if (offset == nb_packets) {
578                        /* Refill the packet buffer */
579                        if (empty != nb_packets) {
580                                // Refill the empty packets
581                                libtrace_ocache_alloc(&trace->packet_freelist,
582                                                      (void **) &packets[empty],
583                                                      nb_packets - empty,
584                                                      nb_packets - empty);
585                        }
586                        if (!trace->pread) {
587                                assert(packets[0]);
588                                nb_packets = trace_read_packet(trace, packets[0]);
589                                packets[0]->error = nb_packets;
590                                if (nb_packets > 0)
591                                        nb_packets = 1;
592                        } else {
593                                nb_packets = trace->pread(trace, t, packets, trace->config.burst_size);
594                        }
595                        offset = 0;
596                        empty = 0;
597                }
598
599                /* Handle error/message cases */
600                if (nb_packets > 0) {
601                        /* Store the first packet */
602                        if (packets[0]->error > 0) {
603                                store_first_packet(trace, packets[0], t);
604                        }
605                        for (;offset < nb_packets; ++offset) {
606                                int ret;
607                                ret = dispatch_packet(trace, t, &packets[offset], trace->tracetime);
608                                if (ret == 0) {
609                                        /* Move full slots to front as we go */
610                                        if (packets[offset]) {
611                                                if (empty != offset) {
612                                                        packets[empty] = packets[offset];
613                                                        packets[offset] = NULL;
614                                                }
615                                                ++empty;
616                                        }
617                                } else {
618                                        assert(ret == READ_MESSAGE);
619                                        /* Loop around and process the message, note */
620                                        continue;
621                                }
622                        }
623                } else {
624                        switch (nb_packets) {
625                        case READ_EOF:
626                                fprintf(stderr, "EOF stop %d!!\n", nb_packets);
627                                goto eof;
628                        case READ_ERROR:
629                                fprintf(stderr, "ERROR stop %d!!\n", nb_packets);
630                                goto error;
631                        case READ_MESSAGE:
632                                nb_packets = 0;
633                                continue;
634                        default:
635                                fprintf(stderr, "Unexpected error %d!!\n", nb_packets);
636                                goto error;
637                        }
638                }
639
640        }
641
642error:
643        fprintf(stderr, "An error occured in trace\n");
644eof:
645        fprintf(stderr, "An eof occured in trace\n");
646        /* ~~~~~~~~~~~~~~ Trace is finished do tear down ~~~~~~~~~~~~~~~~~~~~~ */
647
648        // Let the per_packet function know we have stopped
649        message.code = MESSAGE_PAUSING;
650        message.sender = t;
651        (*trace->per_pkt)(trace, NULL, &message, t);
652        message.code = MESSAGE_STOPPING;
653        message.additional.uint64 = 0;
654        (*trace->per_pkt)(trace, NULL, &message, t);
655
656        // Free any remaining packets
657        for (i = 0; i < trace->config.burst_size; i++) {
658                if (packets[i]) {
659                        libtrace_ocache_free(&trace->packet_freelist, (void **) &packets[i], 1, 1);
660                        packets[i] = NULL;
661                }
662        }
663
664
665        thread_change_state(trace, t, THREAD_FINISHED, true);
666
667        // Notify only after we've defiantly set the state to finished
668        message.code = MESSAGE_PERPKT_ENDED;
669        message.additional.uint64 = 0;
670        trace_send_message_to_reporter(trace, &message);
671
672        // Release all ocache memory before unregistering with the format
673        // because this might(it does in DPDK) unlink the formats mempool
674        // causing destroy/finish packet to fail.
675        libtrace_ocache_unregister_thread(&trace->packet_freelist);
676        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
677        if (trace->format->punregister_thread) {
678                trace->format->punregister_thread(trace, t);
679        }
680        print_memory_stats();
681
682        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
683
684        pthread_exit(NULL);
685};
686
687/**
688 * The start point for our single threaded hasher thread, this will read
689 * and hash a packet from a data source and queue it against the correct
690 * core to process it.
691 */
692static void* hasher_entry(void *data) {
693        libtrace_t *trace = (libtrace_t *)data;
694        libtrace_thread_t * t;
695        int i;
696        libtrace_packet_t * packet;
697        libtrace_message_t message = {0};
698
699        assert(trace_has_dedicated_hasher(trace));
700        /* Wait until all threads are started and objects are initialised (ring buffers) */
701        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
702        t = &trace->hasher_thread;
703        assert(t->type == THREAD_HASHER && pthread_equal(pthread_self(), t->tid));
704        if (trace->state == STATE_ERROR) {
705                thread_change_state(trace, t, THREAD_FINISHED, false);
706                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
707                pthread_exit(NULL);
708        }
709
710        printf("Hasher Thread started\n");
711        if (trace->format->pregister_thread) {
712                trace->format->pregister_thread(trace, t, true);
713        }
714        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
715        int pkt_skipped = 0;
716        /* Read all packets in then hash and queue against the correct thread */
717        while (1) {
718                int thread;
719                if (!pkt_skipped)
720                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
721                assert(packet);
722
723                if (libtrace_halt) // Signal to die has been sent - TODO
724                        break;
725
726                // Check for messages that we expect MESSAGE_DO_PAUSE, (internal messages only)
727                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
728                        switch(message.code) {
729                                case MESSAGE_DO_PAUSE:
730                                        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
731                                        thread_change_state(trace, t, THREAD_PAUSED, false);
732                                        pthread_cond_broadcast(&trace->perpkt_cond);
733                                        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
734                                                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
735                                        }
736                                        thread_change_state(trace, t, THREAD_RUNNING, false);
737                                        pthread_cond_broadcast(&trace->perpkt_cond);
738                                        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
739                                        break;
740                                case MESSAGE_DO_STOP:
741                                        // Stop called after pause
742                                        assert(trace->started == false);
743                                        assert(trace->state == STATE_FINSHED);
744                                        break;
745                                default:
746                                        fprintf(stderr, "Hasher thread didn't expect message code=%d\n", message.code);
747                        }
748                        pkt_skipped = 1;
749                        continue;
750                }
751
752                if ((packet->error = trace_read_packet(trace, packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
753                        break; /* We are EOF or error'd either way we stop  */
754                }
755
756                /* We are guaranteed to have a hash function i.e. != NULL */
757                trace_packet_set_hash(packet, (*trace->hasher)(packet, trace->hasher_data));
758                thread = trace_packet_get_hash(packet) % trace->perpkt_thread_count;
759                /* Blocking write to the correct queue - I'm the only writer */
760                if (trace->perpkt_threads[thread].state != THREAD_FINISHED) {
761                        uint64_t order = trace_packet_get_order(packet);
762                        libtrace_ringbuffer_write(&trace->perpkt_threads[thread].rbuffer, packet);
763                        if (trace->config.tick_count && order % trace->config.tick_count == 0) {
764                                // Write ticks to everyone else
765                                libtrace_packet_t * pkts[trace->perpkt_thread_count];
766                                memset(pkts, 0, sizeof(void *) * trace->perpkt_thread_count);
767                                libtrace_ocache_alloc(&trace->packet_freelist, (void **) pkts, trace->perpkt_thread_count, trace->perpkt_thread_count);
768                                for (i = 0; i < trace->perpkt_thread_count; i++) {
769                                        pkts[i]->error = READ_TICK;
770                                        trace_packet_set_order(pkts[i], order);
771                                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, pkts[i]);
772                                }
773                        }
774                        pkt_skipped = 0;
775                } else {
776                        assert(!"Dropping a packet!!");
777                        pkt_skipped = 1; // Reuse that packet no one read it
778                }
779        }
780
781        /* Broadcast our last failed read to all threads */
782        for (i = 0; i < trace->perpkt_thread_count; i++) {
783                libtrace_packet_t * bcast;
784                fprintf(stderr, "Broadcasting error/EOF now the trace is over\n");
785                if (i == trace->perpkt_thread_count - 1) {
786                        bcast = packet;
787                } else {
788                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &bcast, 1, 1);
789                        bcast->error = packet->error;
790                }
791                ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
792                if (trace->perpkt_threads[i].state != THREAD_FINISHED) {
793                        // Unlock early otherwise we could deadlock
794                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, bcast);
795                        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
796                } else {
797                        fprintf(stderr, "SKIPPING THREAD !!!%d!!!/n", (int) i);
798                        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
799                }
800        }
801
802        // We don't need to free the packet
803        thread_change_state(trace, t, THREAD_FINISHED, true);
804
805        // Notify only after we've defiantly set the state to finished
806        message.code = MESSAGE_PERPKT_ENDED;
807        message.additional.uint64 = 0;
808        trace_send_message_to_reporter(trace, &message);
809        libtrace_ocache_unregister_thread(&trace->packet_freelist);
810        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
811        if (trace->format->punregister_thread) {
812                trace->format->punregister_thread(trace, t);
813        }
814        print_memory_stats();
815        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
816
817        // TODO remove from TTABLE t sometime
818        pthread_exit(NULL);
819};
820
821/**
822 * Moves src into dest(Complete copy) and copies the memory buffer and
823 * its flags from dest into src ready for reuse without needing extra mallocs.
824 */
825static inline void swap_packets(libtrace_packet_t *dest, libtrace_packet_t *src) {
826        // Save the passed in buffer status
827        assert(dest->trace == NULL); // Must be a empty packet
828        void * temp_buf = dest->buffer;
829        buf_control_t temp_buf_control = dest->buf_control;
830        // Completely copy StoredPacket into packet
831        memcpy(dest, src, sizeof(libtrace_packet_t));
832        // Set the buffer settings on the returned packet
833        src->buffer = temp_buf;
834        src->buf_control = temp_buf_control;
835        src->trace = NULL;
836}
837
838/* Our simplest case when a thread becomes ready it can obtain an exclusive
839 * lock to read packets from the underlying trace.
840 */
841static int trace_pread_packet_first_in_first_served(libtrace_t *libtrace,
842                                                    libtrace_thread_t *t,
843                                                    libtrace_packet_t *packets[],
844                                                    size_t nb_packets) {
845        size_t i = 0;
846        //bool tick_hit = false;
847
848        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
849        /* Read nb_packets */
850        for (i = 0; i < nb_packets; ++i) {
851                packets[i]->error = trace_read_packet(libtrace, packets[i]);
852
853                if (packets[i]->error <= 0) {
854                        /* We'll catch this next time if we have already got packets */
855                        if ( i==0 ) {
856                                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
857                                return packets[i]->error;
858                        } else {
859                                break;
860                        }
861                }
862                /*
863                if (libtrace->config.tick_count && trace_packet_get_order(packets[i]) % libtrace->config.tick_count == 0) {
864                        tick_hit = true;
865                }*/
866        }
867        // Doing this inside the lock ensures the first packet is always
868        // recorded first
869        if (packets[0]->error > 0) {
870                store_first_packet(libtrace, packets[0], t);
871        }
872        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
873        /* XXX TODO this needs to be inband with packets, or we don't bother in this case
874        if (tick_hit) {
875                libtrace_message_t tick;
876                tick.additional.uint64 = trace_packet_get_order(packets[i]);
877                tick.code = MESSAGE_TICK;
878                trace_send_message_to_perpkts(libtrace, &tick);
879        } */
880        return i;
881}
882
883/**
884 * For the case that we have a dedicated hasher thread
885 * 1. We read a packet from our buffer
886 * 2. Move that into the packet provided (packet)
887 */
888inline static int trace_pread_packet_hasher_thread(libtrace_t *libtrace,
889                                                      libtrace_thread_t *t,
890                                                      libtrace_packet_t *packets[],
891                                                      size_t nb_packets) {
892        size_t i;
893
894        /* We store the last error message here */
895        if (t->format_data) {
896                fprintf(stderr, "Hit me, ohh yeah got error %d\n",
897                        ((libtrace_packet_t *)t->format_data)->error);
898                return ((libtrace_packet_t *)t->format_data)->error;
899        }
900
901        // Always grab at least one
902        if (packets[0]) // Recycle the old get the new
903                libtrace_ocache_free(&libtrace->packet_freelist, (void **) packets, 1, 1);
904        packets[0] = libtrace_ringbuffer_read(&t->rbuffer);
905
906        if (packets[0]->error <= 0 && packets[0]->error != READ_TICK) {
907                fprintf(stderr, "Hit me, ohh yeah returning error %d\n", packets[0]->error);
908                return packets[0]->error;
909        }
910
911        for (i = 1; i < nb_packets; i++) {
912                if (packets[i]) // Recycle the old get the new
913                        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packets[i], 1, 1);
914                if (!libtrace_ringbuffer_try_read(&t->rbuffer, (void **) &packets[i])) {
915                        packets[i] = NULL;
916                        break;
917                }
918
919                /* We will return an error or EOF the next time around */
920                if (packets[i]->error <= 0 && packets[0]->error != READ_TICK) {
921                        /* The message case will be checked automatically -
922                           However other cases like EOF and error will only be
923                           sent once*/
924                        if (packets[i]->error != READ_MESSAGE) {
925                                assert(t->format_data == NULL);
926                                t->format_data = packets[i];
927                                fprintf(stderr, "Hit me, ohh yeah set error %d\n",
928                                        ((libtrace_packet_t *)t->format_data)->error);
929                        }
930                        break;
931                }
932        }
933
934        return i;
935}
936
937/**
938 * Tries to read from our queue and returns 1 if a packet was retrieved
939 */
940static inline int try_waiting_queue(libtrace_t *libtrace, libtrace_thread_t * t, libtrace_packet_t **packet, int * ret)
941{
942        libtrace_packet_t* retrived_packet;
943
944        /* Lets see if we have one waiting */
945        if (libtrace_ringbuffer_try_read(&t->rbuffer, (void **) &retrived_packet)) {
946                /* Copy paste from trace_pread_packet_hasher_thread() except that we try read (non-blocking) */
947                assert(retrived_packet);
948
949                if (*packet) // Recycle the old get the new
950                        libtrace_ocache_free(&libtrace->packet_freelist, (void **) packet, 1, 1);
951                *packet = retrived_packet;
952                *ret = (*packet)->error;
953                return 1;
954        }
955        return 0;
956}
957
958/**
959 * Allows us to ensure all threads are finished writing to our threads ring_buffer
960 * before returning EOF/error.
961 */
962inline static int trace_handle_finishing_perpkt(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t)
963{
964        /* We are waiting for the condition that another thread ends to check
965         * our queue for new data, once all threads end we can go to finished */
966        bool complete = false;
967        int ret;
968
969        do {
970                // Wait for a thread to end
971                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
972
973                // Check before
974                if (libtrace->perpkt_thread_states[THREAD_FINISHING] == libtrace->perpkt_thread_count) {
975                        complete = true;
976                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
977                        continue;
978                }
979
980                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
981
982                // Check after
983                if (libtrace->perpkt_thread_states[THREAD_FINISHING] == libtrace->perpkt_thread_count) {
984                        complete = true;
985                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
986                        continue;
987                }
988
989                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
990
991                // Always trying to keep our buffer empty for the unlikely case more threads than buffer space want to write into our queue
992                if(try_waiting_queue(libtrace, t, packet, &ret))
993                        return ret;
994        } while (!complete);
995
996        // We can only end up here once all threads complete
997        try_waiting_queue(libtrace, t, packet, &ret);
998
999        return ret;
1000        // TODO rethink this logic fix bug here
1001}
1002
1003/**
1004 * Expects the libtrace_lock to not be held
1005 */
1006inline static int trace_finish_perpkt(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t)
1007{
1008        thread_change_state(libtrace, t, THREAD_FINISHING, true);
1009        return trace_handle_finishing_perpkt(libtrace, packet, t);
1010}
1011
1012/**
1013 * This case is much like the dedicated hasher, except that we will become
1014 * hasher if we don't have a a packet waiting.
1015 *
1016 * Note: This is only every used if we have are doing hashing.
1017 *
1018 * TODO: Can block on zero copy formats such as ring: and dpdk: if the
1019 * queue sizes in total are larger than the ring size.
1020 *
1021 * 1. We read a packet from our buffer
1022 * 2. Move that into the packet provided (packet)
1023 */
1024inline static int trace_pread_packet_hash_locked(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packet)
1025{
1026        int thread, ret/*, psize*/;
1027
1028        while (1) {
1029                if(try_waiting_queue(libtrace, t, packet, &ret))
1030                        return ret;
1031                // Can still block here if another thread is writing to a full queue
1032                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1033
1034                // Its impossible for our own queue to overfill, because no one can write
1035                // when we are in the lock
1036                if(try_waiting_queue(libtrace, t, packet, &ret)) {
1037                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1038                        return ret;
1039                }
1040
1041                // Another thread cannot write a packet because a queue has filled up. Is it ours?
1042                if (libtrace->perpkt_queue_full) {
1043                        contention_stats[t->perpkt_num].wait_for_fill_complete_hits++;
1044                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1045                        continue;
1046                }
1047
1048                if (!*packet)
1049                        libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) packet, 1, 1);
1050                assert(*packet);
1051
1052                // If we fail here we can guarantee that our queue is empty (and no new data will be added because we hold the lock)
1053                if (libtrace_halt || ((*packet)->error = trace_read_packet(libtrace, *packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
1054                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1055                        if (libtrace_halt)
1056                                return 0;
1057                        else
1058                                return (*packet)->error;
1059                }
1060
1061                trace_packet_set_hash(*packet, (*libtrace->hasher)(*packet, libtrace->hasher_data));
1062                thread = trace_packet_get_hash(*packet) % libtrace->perpkt_thread_count;
1063                if (thread == t->perpkt_num) {
1064                        // If it's this thread we must be in order because we checked the buffer once we got the lock
1065                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1066                        return (*packet)->error;
1067                }
1068
1069                if (libtrace->perpkt_threads[thread].state != THREAD_FINISHED) {
1070                        while (!libtrace_ringbuffer_try_swrite_bl(&libtrace->perpkt_threads[thread].rbuffer, *packet)) {
1071                                libtrace->perpkt_queue_full = true;
1072                                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1073                                contention_stats[t->perpkt_num].full_queue_hits++;
1074                                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1075                        }
1076                        *packet = NULL;
1077                        libtrace->perpkt_queue_full = false;
1078                } else {
1079                        /* We can get here if the user closes the thread before natural completion/or error */
1080                        assert (!"packet_hash_locked() The user terminated the trace in a abnormal manner");
1081                }
1082                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1083        }
1084}
1085
1086/**
1087 * For the first packet of each queue we keep a copy and note the system
1088 * time it was received at.
1089 *
1090 * This is used for finding the first packet when playing back a trace
1091 * in trace time. And can be used by real time applications to print
1092 * results out every XXX seconds.
1093 */
1094void store_first_packet(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t)
1095{
1096        if (!t->recorded_first) {
1097                struct timeval tv;
1098                libtrace_packet_t * dup;
1099                // For what it's worth we can call these outside of the lock
1100                gettimeofday(&tv, NULL);
1101                dup = trace_copy_packet(packet);
1102                ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
1103                libtrace->first_packets.packets[t->perpkt_num].packet = dup;
1104                //printf("Stored first packet time=%f\n", trace_get_seconds(dup));
1105                memcpy(&libtrace->first_packets.packets[t->perpkt_num].tv, &tv, sizeof(tv));
1106                // Now update the first
1107                libtrace->first_packets.count++;
1108                if (libtrace->first_packets.count == 1) {
1109                        // We the first entry hence also the first known packet
1110                        libtrace->first_packets.first = t->perpkt_num;
1111                } else {
1112                        // Check if we are newer than the previous 'first' packet
1113                        size_t first = libtrace->first_packets.first;
1114                        if (trace_get_seconds(dup) <
1115                                trace_get_seconds(libtrace->first_packets.packets[first].packet))
1116                                libtrace->first_packets.first = t->perpkt_num;
1117                }
1118                ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
1119                libtrace_message_t mesg = {0};
1120                mesg.code = MESSAGE_FIRST_PACKET;
1121                trace_send_message_to_reporter(libtrace, &mesg);
1122                t->recorded_first = true;
1123        }
1124}
1125
1126/**
1127 * Returns 1 if it's certain that the first packet is truly the first packet
1128 * rather than a best guess based upon threads that have published so far.
1129 * Otherwise 0 is returned.
1130 * It's recommended that this result is stored rather than calling this
1131 * function again.
1132 */
1133DLLEXPORT int retrive_first_packet(libtrace_t *libtrace, libtrace_packet_t **packet, struct timeval **tv)
1134{
1135        int ret = 0;
1136        ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
1137        if (libtrace->first_packets.count) {
1138                *packet = libtrace->first_packets.packets[libtrace->first_packets.first].packet;
1139                *tv = &libtrace->first_packets.packets[libtrace->first_packets.first].tv;
1140                if (libtrace->first_packets.count == (size_t) libtrace->perpkt_thread_count) {
1141                        ret = 1;
1142                } else {
1143                        struct timeval curr_tv;
1144                        // If a second has passed since the first entry we will assume this is the very first packet
1145                        gettimeofday(&curr_tv, NULL);
1146                        if (curr_tv.tv_sec > (*tv)->tv_sec) {
1147                                if(curr_tv.tv_usec > (*tv)->tv_usec || curr_tv.tv_sec - (*tv)->tv_sec > 1) {
1148                                        ret = 1;
1149                                }
1150                        }
1151                }
1152        } else {
1153                *packet = NULL;
1154                *tv = NULL;
1155        }
1156        ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
1157        return ret;
1158}
1159
1160
1161DLLEXPORT uint64_t tv_to_usec(struct timeval *tv)
1162{
1163        return (uint64_t) tv->tv_sec*1000000ull + (uint64_t) tv->tv_usec;
1164}
1165
1166inline static struct timeval usec_to_tv(uint64_t usec)
1167{
1168        struct timeval tv;
1169        tv.tv_sec = usec / 1000000;
1170        tv.tv_usec = usec % 1000000;
1171        return tv;
1172}
1173
1174/** Similar to delay_tracetime but send messages to all threads periodically */
1175static void* reporter_entry(void *data) {
1176        libtrace_message_t message = {0};
1177        libtrace_t *trace = (libtrace_t *)data;
1178        libtrace_thread_t *t = &trace->reporter_thread;
1179        libtrace_vector_t results;
1180
1181        fprintf(stderr, "Reporter thread starting\n");
1182
1183        /* Wait until all threads are started */
1184        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
1185        if (trace->state == STATE_ERROR) {
1186                thread_change_state(trace, t, THREAD_FINISHED, false);
1187                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1188                pthread_exit(NULL);
1189        }
1190        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1191
1192        libtrace_vector_init(&results, sizeof(libtrace_result_t));
1193
1194        message.code = MESSAGE_STARTING;
1195        message.sender = t;
1196        (*trace->reporter)(trace, NULL, &message);
1197        message.code = MESSAGE_RESUMING;
1198        (*trace->reporter)(trace, NULL, &message);
1199
1200        while (!trace_finished(trace)) {
1201                if (trace->config.reporter_polling) {
1202                        if (libtrace_message_queue_try_get(&t->messages, &message) == LIBTRACE_MQ_FAILED)
1203                                message.code = MESSAGE_POST_REPORTER;
1204                } else {
1205                        libtrace_message_queue_get(&t->messages, &message);
1206                }
1207                switch (message.code) {
1208                        // Check for results
1209                        case MESSAGE_POST_REPORTER:
1210                                trace->combiner.read(trace, &trace->combiner);
1211                                break;
1212                        case MESSAGE_DO_PAUSE:
1213                                assert(trace->combiner.pause);
1214                                trace->combiner.pause(trace, &trace->combiner);
1215                                message.code = MESSAGE_PAUSING;
1216                                message.sender = t;
1217                                (*trace->reporter)(trace, NULL, &message);
1218                                trace_thread_pause(trace, t);
1219                                message.code = MESSAGE_RESUMING;
1220                                (*trace->reporter)(trace, NULL, &message);
1221                                break;
1222                        default:
1223                                (*trace->reporter)(trace, NULL, &message);
1224                }
1225        }
1226
1227        // Flush out whats left now all our threads have finished
1228        trace->combiner.read_final(trace, &trace->combiner);
1229
1230        // GOODBYE
1231        message.code = MESSAGE_PAUSING;
1232        message.sender = t;
1233        (*trace->reporter)(trace, NULL, &message);
1234        message.code = MESSAGE_STOPPING;
1235        (*trace->reporter)(trace, NULL, &message);
1236
1237        thread_change_state(trace, &trace->reporter_thread, THREAD_FINISHED, true);
1238        print_memory_stats();
1239        return NULL;
1240}
1241
1242/** Similar to delay_tracetime but send messages to all threads periodically */
1243static void* keepalive_entry(void *data) {
1244        struct timeval prev, next;
1245        libtrace_message_t message = {0};
1246        libtrace_t *trace = (libtrace_t *)data;
1247        uint64_t next_release;
1248        fprintf(stderr, "keepalive thread is starting\n");
1249
1250        /* Wait until all threads are started */
1251        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
1252        if (trace->state == STATE_ERROR) {
1253                thread_change_state(trace, &trace->keepalive_thread, THREAD_FINISHED, false);
1254                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1255                pthread_exit(NULL);
1256        }
1257        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1258
1259        gettimeofday(&prev, NULL);
1260        message.code = MESSAGE_TICK;
1261        while (trace->state != STATE_FINSHED) {
1262                fd_set rfds;
1263                next_release = tv_to_usec(&prev) + (trace->config.tick_interval * 1000);
1264                gettimeofday(&next, NULL);
1265                if (next_release > tv_to_usec(&next)) {
1266                        next = usec_to_tv(next_release - tv_to_usec(&next));
1267                        // Wait for timeout or a message
1268                        FD_ZERO(&rfds);
1269                        FD_SET(libtrace_message_queue_get_fd(&trace->keepalive_thread.messages), &rfds);
1270                        if (select(libtrace_message_queue_get_fd(&trace->keepalive_thread.messages)+1, &rfds, NULL, NULL, &next) == 1) {
1271                                libtrace_message_t msg;
1272                                libtrace_message_queue_get(&trace->keepalive_thread.messages, &msg);
1273                                assert(msg.code == MESSAGE_DO_STOP);
1274                                goto done;
1275                        }
1276                }
1277                prev = usec_to_tv(next_release);
1278                if (trace->state == STATE_RUNNING) {
1279                        message.additional.uint64 = tv_to_usec(&prev);
1280                        trace_send_message_to_perpkts(trace, &message);
1281                }
1282        }
1283done:
1284
1285        thread_change_state(trace, &trace->keepalive_thread, THREAD_FINISHED, true);
1286        return NULL;
1287}
1288
1289/**
1290 * Delays a packets playback so the playback will be in trace time.
1291 * This may break early if a message becomes available.
1292 *
1293 * Requires the first packet for this thread to be received.
1294 * @param libtrace  The trace
1295 * @param packet    The packet to delay
1296 * @param t         The current thread
1297 * @return Either READ_MESSAGE(-2) or 0 is successful
1298 */
1299static inline int delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t) {
1300        struct timeval curr_tv, pkt_tv;
1301        uint64_t next_release = t->tracetime_offset_usec;
1302        uint64_t curr_usec;
1303
1304        if (!t->tracetime_offset_usec) {
1305                libtrace_packet_t *first_pkt;
1306                struct timeval *sys_tv;
1307                int64_t initial_offset;
1308                int stable = retrive_first_packet(libtrace, &first_pkt, &sys_tv);
1309                assert(first_pkt);
1310                pkt_tv = trace_get_timeval(first_pkt);
1311                initial_offset = (int64_t)tv_to_usec(sys_tv) - (int64_t)tv_to_usec(&pkt_tv);
1312                /* In the unlikely case offset is 0, change it to 1 */
1313                if (stable)
1314                        t->tracetime_offset_usec = initial_offset ? initial_offset: 1;
1315                next_release = initial_offset;
1316        }
1317        /* next_release == offset */
1318        pkt_tv = trace_get_timeval(packet);
1319        next_release += tv_to_usec(&pkt_tv);
1320        gettimeofday(&curr_tv, NULL);
1321        curr_usec = tv_to_usec(&curr_tv);
1322        if (next_release > curr_usec) {
1323                int ret, mesg_fd = libtrace_message_queue_get_fd(&t->messages);
1324                struct timeval delay_tv = usec_to_tv(next_release-curr_usec);
1325                fd_set rfds;
1326                FD_ZERO(&rfds);
1327                FD_SET(mesg_fd, &rfds);
1328                // We need to wait
1329
1330                //printf("WAITING for %d.%d next=%"PRIu64" curr=%"PRIu64" seconds packettime %f\n", delay_tv.tv_sec, delay_tv.tv_usec, next_release, curr_usec, trace_get_seconds(packet));
1331                ret = select(mesg_fd+1, &rfds, NULL, NULL, &delay_tv);
1332                if (ret == 0) {
1333                        return 0;
1334                } else if (ret > 0) {
1335                        return READ_MESSAGE;
1336                } else {
1337                        fprintf(stderr, "I thnik we broke select\n");
1338                }
1339        }
1340        return 0;
1341}
1342
1343/* Discards packets that don't match the filter.
1344 * Discarded packets are emptied and then moved to the end of the packet list.
1345 *
1346 * @param trace       The trace format, containing the filter
1347 * @param packets     An array of packets
1348 * @param nb_packets  The number of valid items in packets
1349 *
1350 * @return The number of packets that passed the filter, which are moved to
1351 *          the start of the packets array
1352 */
1353static inline size_t filter_packets(libtrace_t *trace,
1354                                    libtrace_packet_t **packets,
1355                                    size_t nb_packets) {
1356        size_t offset = 0;
1357        size_t i;
1358
1359        for (i = 0; i < nb_packets; ++i) {
1360                // The filter needs the trace attached to receive the link type
1361                packets[i]->trace = trace;
1362                if (trace_apply_filter(trace->filter, packets[i])) {
1363                        libtrace_packet_t *tmp;
1364                        tmp = packets[offset];
1365                        packets[offset++] = packets[i];
1366                        packets[i] = tmp;
1367                } else {
1368                        trace_fin_packet(packets[i]);
1369                }
1370        }
1371
1372        return offset;
1373}
1374
1375/* Read a batch of packets from the trace into a buffer.
1376 * Note that this function will block until a packet is read (or EOF is reached)
1377 *
1378 * @param libtrace    The trace
1379 * @param t           The thread
1380 * @param packets     An array of packets
1381 * @param nb_packets  The number of empty packets in packets
1382 * @return The number of packets read, 0 on EOF (or an error/message -1,-2).
1383 */
1384static int trace_pread_packet_wrapper(libtrace_t *libtrace,
1385                                      libtrace_thread_t *t,
1386                                      libtrace_packet_t *packets[],
1387                                      size_t nb_packets) {
1388        int i;
1389        assert(nb_packets);
1390        assert(libtrace && "libtrace is NULL in trace_read_packet()");
1391        if (trace_is_err(libtrace))
1392                return -1;
1393        if (!libtrace->started) {
1394                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
1395                              "You must call libtrace_start() before trace_read_packet()\n");
1396                return -1;
1397        }
1398
1399        if (libtrace->format->pread_packets) {
1400                int ret;
1401                for (i = 0; i < (int) nb_packets; ++i) {
1402                        assert(i[packets]);
1403                        if (!(packets[i]->buf_control==TRACE_CTRL_PACKET ||
1404                              packets[i]->buf_control==TRACE_CTRL_EXTERNAL)) {
1405                                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,
1406                                              "Packet passed to trace_read_packet() is invalid\n");
1407                                return -1;
1408                        }
1409                }
1410                do {
1411                        ret=libtrace->format->pread_packets(libtrace, t,
1412                                                            packets,
1413                                                            nb_packets);
1414                        /* Error, EOF or message? */
1415                        if (ret <= 0) {
1416                                return ret;
1417                        }
1418
1419                        if (libtrace->filter) {
1420                                int remaining;
1421                                remaining = filter_packets(libtrace,
1422                                                           packets, ret);
1423                                t->filtered_packets += ret - remaining;
1424                                ret = remaining;
1425                        }
1426                        for (i = 0; i < ret; ++i) {
1427                                packets[i]->trace = libtrace;
1428                                /* TODO IN FORMAT?? Like traditional libtrace */
1429                                if (libtrace->snaplen>0)
1430                                        trace_set_capture_length(packets[i],
1431                                                        libtrace->snaplen);
1432                                trace_packet_set_order(packets[i], trace_get_erf_timestamp(packets[i]));
1433                        }
1434                        t->accepted_packets += ret;
1435                } while(ret == 0);
1436                return ret;
1437        }
1438        trace_set_err(libtrace, TRACE_ERR_UNSUPPORTED,
1439                      "This format does not support reading packets\n");
1440        return ~0U;
1441}
1442
1443/* Restarts a parallel trace, this is called from trace_pstart.
1444 * The libtrace lock is held upon calling this function.
1445 * Typically with a parallel trace the threads are not
1446 * killed rather.
1447 */
1448static int trace_prestart(libtrace_t * libtrace, void *global_blob,
1449                          fn_per_pkt per_pkt, fn_reporter reporter) {
1450        int err = 0;
1451        if (libtrace->state != STATE_PAUSED) {
1452                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
1453                        "trace(%s) is not currently paused",
1454                              libtrace->uridata);
1455                return -1;
1456        }
1457
1458        /* Update functions if requested */
1459        if (per_pkt)
1460                libtrace->per_pkt = per_pkt;
1461        if (reporter)
1462                libtrace->reporter = reporter;
1463        if(global_blob)
1464                libtrace->global_blob = global_blob;
1465
1466        assert(libtrace_parallel);
1467        assert(!libtrace->perpkt_thread_states[THREAD_RUNNING]);
1468        assert(libtrace->per_pkt);
1469
1470        if (libtrace->perpkt_thread_count > 1 &&
1471            trace_supports_parallel(libtrace) &&
1472            !trace_has_dedicated_hasher(libtrace)) {
1473                fprintf(stderr, "Restarting trace pstart_input()\n");
1474                err = libtrace->format->pstart_input(libtrace);
1475        } else {
1476                if (libtrace->format->start_input) {
1477                        fprintf(stderr, "Restarting trace start_input()\n");
1478                        err = libtrace->format->start_input(libtrace);
1479                }
1480        }
1481
1482        if (err == 0) {
1483                libtrace->started = true;
1484                libtrace_change_state(libtrace, STATE_RUNNING, false);
1485        }
1486        return err;
1487}
1488
1489/**
1490 * Verifies the configuration and sets default values for any values not
1491 * specified by the user.
1492 * @return
1493 */
1494static void verify_configuration(libtrace_t *libtrace) {
1495
1496        if (libtrace->config.hasher_queue_size <= 0)
1497                libtrace->config.hasher_queue_size = 1000;
1498
1499        if (libtrace->config.perpkt_threads <= 0) {
1500                // TODO add BSD support
1501                libtrace->perpkt_thread_count = sysconf(_SC_NPROCESSORS_ONLN);
1502                if (libtrace->perpkt_thread_count <= 0)
1503                        // Lets just use one
1504                        libtrace->perpkt_thread_count = 1;
1505        } else {
1506                libtrace->perpkt_thread_count = libtrace->config.perpkt_threads;
1507        }
1508
1509        if (libtrace->config.reporter_thold <= 0)
1510                libtrace->config.reporter_thold = 100;
1511        if (libtrace->config.burst_size <= 0)
1512                libtrace->config.burst_size = 10;
1513        if (libtrace->config.packet_thread_cache_size <= 0)
1514                libtrace->config.packet_thread_cache_size = 20;
1515        if (libtrace->config.packet_cache_size <= 0)
1516                libtrace->config.packet_cache_size = (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count;
1517
1518        if (libtrace->config.packet_cache_size <
1519                (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count)
1520                fprintf(stderr, "WARNING deadlocks may occur and extra memory allocating buffer sizes (packet_freelist_size) mismatched\n");
1521
1522        if (libtrace->combiner.initialise == NULL && libtrace->combiner.publish == NULL)
1523                libtrace->combiner = combiner_unordered;
1524}
1525
1526/**
1527 * Starts a libtrace_thread, including allocating memory for messaging.
1528 * Threads are expected to wait until the libtrace look is released.
1529 * Hence why we don't init structures until later.
1530 *
1531 * @param trace The trace the thread is associated with
1532 * @param t The thread that is filled when the thread is started
1533 * @param type The type of thread
1534 * @param start_routine The entry location of the thread
1535 * @param perpkt_num The perpkt thread number (should be set -1 if not perpkt)
1536 * @param name For debugging purposes set the threads name (Optional)
1537 *
1538 * @return 0 on success or -1 upon error in which case the libtrace error is set.
1539 *         In this situation the thread structure is zeroed.
1540 */
1541static int trace_start_thread(libtrace_t *trace,
1542                       libtrace_thread_t *t,
1543                       enum thread_types type,
1544                       void *(*start_routine) (void *),
1545                       int perpkt_num,
1546                       const char *name) {
1547        int ret;
1548        assert(t->type == THREAD_EMPTY);
1549        t->trace = trace;
1550        t->ret = NULL;
1551        t->user_data = NULL;
1552        t->type = type;
1553        t->state = THREAD_RUNNING;
1554        ret = pthread_create(&t->tid, NULL, start_routine, (void *) trace);
1555        if (ret != 0) {
1556                libtrace_zero_thread(t);
1557                trace_set_err(trace, ret, "Failed to create a thread");
1558                return -1;
1559        }
1560        libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
1561        if (trace_has_dedicated_hasher(trace) && type == THREAD_PERPKT) {
1562                libtrace_ringbuffer_init(&t->rbuffer,
1563                                         trace->config.hasher_queue_size,
1564                                         trace->config.hasher_polling?
1565                                                 LIBTRACE_RINGBUFFER_POLLING:
1566                                                 LIBTRACE_RINGBUFFER_BLOCKING);
1567        }
1568        if(name)
1569                pthread_setname_np(t->tid, name);
1570        t->perpkt_num = perpkt_num;
1571        return 0;
1572}
1573
1574/* Start an input trace in the parallel libtrace framework.
1575 * This can also be used to restart an existing parallel.
1576 *
1577 * NOTE: libtrace lock is held for the majority of this function
1578 *
1579 * @param libtrace the input trace to start
1580 * @param global_blob some global data you can share with the new perpkt threads
1581 * @returns 0 on success, otherwise -1 to indicate an error has occured
1582 */
1583DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob,
1584                           fn_per_pkt per_pkt, fn_reporter reporter) {
1585        int i;
1586        int ret = -1;
1587        char name[16];
1588        sigset_t sig_before, sig_block_all;
1589        assert(libtrace);
1590
1591        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1592        if (trace_is_err(libtrace)) {
1593                goto cleanup_none;
1594        }
1595
1596        if (libtrace->state == STATE_PAUSED) {
1597                ret = trace_prestart(libtrace, global_blob, per_pkt, reporter);
1598                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1599                return ret;
1600        }
1601
1602        if (libtrace->state != STATE_NEW) {
1603                trace_set_err(libtrace, TRACE_ERR_BAD_STATE, "trace_pstart "
1604                              "should be called on a NEW or PAUSED trace but "
1605                              "instead was called from %s",
1606                              get_trace_state_name(libtrace->state));
1607                goto cleanup_none;
1608        }
1609
1610        /* Store the user defined things against the trace */
1611        libtrace->global_blob = global_blob;
1612        libtrace->per_pkt = per_pkt;
1613        libtrace->reporter = reporter;
1614        /* And zero other fields */
1615        for (i = 0; i < THREAD_STATE_MAX; ++i) {
1616                libtrace->perpkt_thread_states[i] = 0;
1617        }
1618        libtrace->first_packets.first = 0;
1619        libtrace->first_packets.count = 0;
1620        libtrace->first_packets.packets = NULL;
1621        libtrace->perpkt_threads = NULL;
1622        /* Set a global which says we are using a parallel trace. This is
1623         * for backwards compatability due to changes when destroying packets */
1624        libtrace_parallel = 1;
1625
1626        verify_configuration(libtrace);
1627
1628        /* Try start the format */
1629        if (libtrace->perpkt_thread_count > 1 &&
1630            trace_supports_parallel(libtrace) &&
1631            !trace_has_dedicated_hasher(libtrace)) {
1632                printf("This format has direct support for p's\n");
1633                ret = libtrace->format->pstart_input(libtrace);
1634                libtrace->pread = trace_pread_packet_wrapper;
1635        } else {
1636                if (libtrace->format->start_input) {
1637                        ret = libtrace->format->start_input(libtrace);
1638                }
1639                if (libtrace->perpkt_thread_count > 1)
1640                        libtrace->pread = trace_pread_packet_first_in_first_served;
1641                else
1642                        libtrace->pread = NULL;
1643        }
1644
1645        if (ret != 0) {
1646                goto cleanup_none;
1647        }
1648
1649        /* --- Start all the threads we need --- */
1650        /* Disable signals because it is inherited by the threads we start */
1651        sigemptyset(&sig_block_all);
1652        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_block_all, &sig_before), == 0);
1653
1654        /* If we need a hasher thread start it
1655         * Special Case: If single threaded we don't need a hasher
1656         */
1657        if (libtrace->perpkt_thread_count > 1 && libtrace->hasher
1658            && libtrace->hasher_type != HASHER_HARDWARE) {
1659                ret = trace_start_thread(libtrace, &libtrace->hasher_thread,
1660                                   THREAD_HASHER, hasher_entry, -1,
1661                                   "hasher-thread");
1662                if (ret != 0) {
1663                        trace_set_err(libtrace, errno, "trace_pstart "
1664                                      "failed to start a hasher thread.");
1665                        goto cleanup_started;
1666                }
1667                libtrace->pread = trace_pread_packet_hasher_thread;
1668        } else {
1669                libtrace->hasher_thread.type = THREAD_EMPTY;
1670        }
1671
1672        /* Start up our perpkt threads */
1673        libtrace->perpkt_threads = calloc(sizeof(libtrace_thread_t),
1674                                          libtrace->perpkt_thread_count);
1675        if (!libtrace->perpkt_threads) {
1676                trace_set_err(libtrace, errno, "trace_pstart "
1677                              "failed to allocate memory.");
1678                goto cleanup_threads;
1679        }
1680        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1681                snprintf(name, sizeof(name), "perpkt-%d", i);
1682                libtrace_zero_thread(&libtrace->perpkt_threads[i]);
1683                ret = trace_start_thread(libtrace, &libtrace->perpkt_threads[i],
1684                                   THREAD_PERPKT, perpkt_threads_entry, i,
1685                                   name);
1686                if (ret != 0) {
1687                        trace_set_err(libtrace, errno, "trace_pstart "
1688                                      "failed to start a perpkt thread.");
1689                        goto cleanup_threads;
1690                }
1691        }
1692
1693        /* Start the reporter thread */
1694        if (reporter) {
1695                if (libtrace->combiner.initialise)
1696                        libtrace->combiner.initialise(libtrace, &libtrace->combiner);
1697                ret = trace_start_thread(libtrace, &libtrace->reporter_thread,
1698                                   THREAD_REPORTER, reporter_entry, -1,
1699                                   "reporter_thread");
1700                if (ret != 0) {
1701                        trace_set_err(libtrace, errno, "trace_pstart "
1702                                      "failed to start reporter thread.");
1703                        goto cleanup_threads;
1704                }
1705        }
1706
1707        /* Start the keepalive thread */
1708        if (libtrace->config.tick_interval > 0) {
1709                ret = trace_start_thread(libtrace, &libtrace->keepalive_thread,
1710                                   THREAD_KEEPALIVE, keepalive_entry, -1,
1711                                   "keepalive_thread");
1712                if (ret != 0) {
1713                        trace_set_err(libtrace, errno, "trace_pstart "
1714                                      "failed to start keepalive thread.");
1715                        goto cleanup_threads;
1716                }
1717        }
1718
1719        /* Init other data structures */
1720        libtrace->perpkt_thread_states[THREAD_RUNNING] = libtrace->perpkt_thread_count;
1721        ASSERT_RET(pthread_spin_init(&libtrace->first_packets.lock, 0), == 0);
1722        libtrace->first_packets.packets = calloc(libtrace->perpkt_thread_count,
1723                                                 sizeof(struct  __packet_storage_magic_type));
1724        if (libtrace->first_packets.packets == NULL) {
1725                trace_set_err(libtrace, errno, "trace_pstart "
1726                              "failed to allocate memory.");
1727                goto cleanup_threads;
1728        }
1729
1730        /*trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "trace_pstart "
1731                      "failed to allocate ocache.");
1732        goto cleanup_threads;*/
1733
1734        if (libtrace_ocache_init(&libtrace->packet_freelist,
1735                             (void* (*)()) trace_create_packet,
1736                             (void (*)(void *))trace_destroy_packet,
1737                             libtrace->config.packet_thread_cache_size,
1738                             libtrace->config.packet_cache_size * 4,
1739                             libtrace->config.fixed_packet_count) != 0) {
1740                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "trace_pstart "
1741                              "failed to allocate ocache.");
1742                goto cleanup_threads;
1743        }
1744
1745        /* Threads don't start */
1746        libtrace->started = true;
1747        libtrace_change_state(libtrace, STATE_RUNNING, false);
1748
1749        ret = 0;
1750        goto success;
1751cleanup_threads:
1752        if (libtrace->first_packets.packets) {
1753                free(libtrace->first_packets.packets);
1754                libtrace->first_packets.packets = NULL;
1755        }
1756        libtrace_change_state(libtrace, STATE_ERROR, false);
1757        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1758        if (libtrace->hasher_thread.type == THREAD_HASHER) {
1759                pthread_join(libtrace->hasher_thread.tid, NULL);
1760                libtrace_zero_thread(&libtrace->hasher_thread);
1761        }
1762
1763        if (libtrace->perpkt_threads) {
1764                for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1765                        if (libtrace->perpkt_threads[i].type == THREAD_PERPKT) {
1766                                pthread_join(libtrace->perpkt_threads[i].tid, NULL);
1767                                libtrace_zero_thread(&libtrace->perpkt_threads[i]);
1768                        } else break;
1769                }
1770                free(libtrace->perpkt_threads);
1771                libtrace->perpkt_threads = NULL;
1772        }
1773
1774        if (libtrace->reporter_thread.type == THREAD_REPORTER) {
1775                pthread_join(libtrace->reporter_thread.tid, NULL);
1776                libtrace_zero_thread(&libtrace->reporter_thread);
1777        }
1778
1779        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
1780                pthread_join(libtrace->keepalive_thread.tid, NULL);
1781                libtrace_zero_thread(&libtrace->keepalive_thread);
1782        }
1783        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1784        libtrace_change_state(libtrace, STATE_NEW, false);
1785        assert(libtrace->perpkt_thread_states[THREAD_RUNNING] == 0);
1786        libtrace->perpkt_thread_states[THREAD_FINISHED] = 0;
1787cleanup_started:
1788        if (trace_supports_parallel(libtrace) &&
1789            !trace_has_dedicated_hasher(libtrace)
1790            && libtrace->perpkt_thread_count > 1) {
1791                if (libtrace->format->ppause_input)
1792                        libtrace->format->ppause_input(libtrace);
1793        } else {
1794                if (libtrace->format->pause_input)
1795                        libtrace->format->pause_input(libtrace);
1796        }
1797        ret = -1;
1798success:
1799        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_before, NULL), == 0);
1800cleanup_none:
1801        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1802        return ret;
1803}
1804
1805/**
1806 * Pauses a trace, this should only be called by the main thread
1807 * 1. Set started = false
1808 * 2. All perpkt threads are paused waiting on a condition var
1809 * 3. Then call ppause on the underlying format if found
1810 * 4. The traces state is paused
1811 *
1812 * Once done you should be able to modify the trace setup and call pstart again
1813 * TODO handle changing thread numbers
1814 */
1815DLLEXPORT int trace_ppause(libtrace_t *libtrace)
1816{
1817        libtrace_thread_t *t;
1818        int i;
1819        assert(libtrace);
1820
1821        t = get_thread_table(libtrace);
1822        // Check state from within the lock if we are going to change it
1823        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1824        if (!libtrace->started || libtrace->state != STATE_RUNNING) {
1825                fprintf(stderr, "pause failed started=%d state=%s (%d)\n", libtrace->started, get_trace_state_name(libtrace->state), libtrace->state);
1826                trace_set_err(libtrace,TRACE_ERR_BAD_STATE, "You must call trace_start() before calling trace_ppause()");
1827                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1828                return -1;
1829        }
1830
1831        libtrace_change_state(libtrace, STATE_PAUSING, false);
1832        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1833
1834        // Special case handle the hasher thread case
1835        if (trace_has_dedicated_hasher(libtrace)) {
1836                if (libtrace->config.debug_state)
1837                        fprintf(stderr, "Hasher thread is running, asking it to pause ...");
1838                libtrace_message_t message = {0};
1839                message.code = MESSAGE_DO_PAUSE;
1840                trace_send_message_to_thread(libtrace, &libtrace->hasher_thread, &message);
1841                // Wait for it to pause
1842                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1843                while (libtrace->hasher_thread.state == THREAD_RUNNING) {
1844                        ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
1845                }
1846                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1847                if (libtrace->config.debug_state)
1848                        fprintf(stderr, " DONE\n");
1849        }
1850
1851        if (libtrace->config.debug_state)
1852                fprintf(stderr, "Asking perpkt threads to pause ...");
1853        // Stop threads, skip this one if it's a perpkt
1854        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1855                if (&libtrace->perpkt_threads[i] != t) {
1856                        libtrace_message_t message = {0};
1857                        message.code = MESSAGE_DO_PAUSE;
1858                        trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message);
1859                        if(trace_has_dedicated_hasher(libtrace)) {
1860                                // The hasher has stopped and other threads have messages waiting therefore
1861                                // If the queues are empty the other threads would have no data
1862                                // So send some message packets to simply ask the threads to check
1863                                // We are the only writer since hasher has paused
1864                                libtrace_packet_t *pkt;
1865                                libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &pkt, 1, 1);
1866                                pkt->error = READ_MESSAGE;
1867                                libtrace_ringbuffer_write(&libtrace->perpkt_threads[i].rbuffer, pkt);
1868                        }
1869                } else {
1870                        fprintf(stderr, "Mapper threads should not be used to pause a trace this could cause any number of problems!!\n");
1871                }
1872        }
1873
1874        if (t) {
1875                // A perpkt is doing the pausing, interesting, fake an extra thread paused
1876                // We rely on the user to *not* return before starting the trace again
1877                thread_change_state(libtrace, t, THREAD_PAUSED, true);
1878        }
1879
1880        // Wait for all threads to pause
1881        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1882        while(libtrace->perpkt_thread_states[THREAD_RUNNING]) {
1883                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
1884        }
1885        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1886
1887        if (libtrace->config.debug_state)
1888                fprintf(stderr, " DONE\n");
1889
1890        // Deal with the reporter
1891        if (trace_has_dedicated_reporter(libtrace)) {
1892                if (libtrace->config.debug_state)
1893                        fprintf(stderr, "Reporter thread is running, asking it to pause ...");
1894                libtrace_message_t message = {0};
1895                message.code = MESSAGE_DO_PAUSE;
1896                trace_send_message_to_thread(libtrace, &libtrace->reporter_thread, &message);
1897                // Wait for it to pause
1898                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1899                while (libtrace->reporter_thread.state == THREAD_RUNNING) {
1900                        ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
1901                }
1902                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1903                if (libtrace->config.debug_state)
1904                        fprintf(stderr, " DONE\n");
1905        }
1906
1907        /* Cache values before we pause */
1908        libtrace->dropped_packets = trace_get_dropped_packets(libtrace);
1909        libtrace->received_packets = trace_get_received_packets(libtrace);
1910        uint64_t tmp_stats;
1911        if (libtrace->format->get_filtered_packets) {
1912                if ((tmp_stats = libtrace->format->get_filtered_packets(libtrace)) != UINT64_MAX) {
1913                        libtrace->filtered_packets += tmp_stats;
1914                }
1915        }
1916        if (trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace) && libtrace->perpkt_thread_count > 1) {
1917                libtrace->started = false;
1918                if (libtrace->format->ppause_input)
1919                        libtrace->format->ppause_input(libtrace);
1920                // TODO What happens if we don't have pause input??
1921        } else {
1922                int err;
1923                fprintf(stderr, "Trace is not parallel so we are doing a normal pause %s\n", libtrace->uridata);
1924                err = trace_pause(libtrace);
1925                // We should handle this a bit better
1926                if (err)
1927                        return err;
1928        }
1929
1930        // Only set as paused after the pause has been called on the trace
1931        libtrace_change_state(libtrace, STATE_PAUSED, true);
1932        return 0;
1933}
1934
1935/**
1936 * Stop trace finish prematurely as though it meet an EOF
1937 * This should only be called by the main thread
1938 * 1. Calls ppause
1939 * 2. Sends a message asking for threads to finish
1940 * 3. Releases threads which will pause
1941 */
1942DLLEXPORT int trace_pstop(libtrace_t *libtrace)
1943{
1944        int i, err;
1945        libtrace_message_t message = {0};
1946        assert(libtrace);
1947
1948        // Ensure all threads have paused and the underlying trace format has
1949        // been closed and all packets associated are cleaned up
1950        // Pause will do any state checks for us
1951        err = trace_ppause(libtrace);
1952        if (err)
1953                return err;
1954
1955        // Now send a message asking the threads to stop
1956        // This will be retrieved before trying to read another packet
1957
1958        message.code = MESSAGE_DO_STOP;
1959        trace_send_message_to_perpkts(libtrace, &message);
1960        if (trace_has_dedicated_hasher(libtrace))
1961                trace_send_message_to_thread(libtrace, &libtrace->hasher_thread, &message);
1962
1963        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1964                trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message);
1965        }
1966
1967        // Now release the threads and let them stop
1968        libtrace_change_state(libtrace, STATE_FINSHED, true);
1969        return 0;
1970}
1971
1972/**
1973 * Set the hasher type along with a selected function, if hardware supports
1974 * that generic type of hashing it will be used otherwise the supplied
1975 * hasher function will be used and passed data when called.
1976 *
1977 * @return 0 if successful otherwise -1 on error
1978 */
1979DLLEXPORT int trace_set_hasher(libtrace_t *trace, enum hasher_types type, fn_hasher hasher, void *data) {
1980        int ret = -1;
1981        if (type == HASHER_HARDWARE || (type == HASHER_CUSTOM && !hasher) || (type == HASHER_BALANCE && hasher)) {
1982                return -1;
1983        }
1984
1985        // Save the requirements
1986        trace->hasher_type = type;
1987        if (hasher) {
1988                trace->hasher = hasher;
1989                trace->hasher_data = data;
1990        } else {
1991                trace->hasher = NULL;
1992                // TODO consider how to handle freeing this
1993                trace->hasher_data = NULL;
1994        }
1995
1996        // Try push this to hardware - NOTE hardware could do custom if
1997        // there is a more efficient way to apply it, in this case
1998        // it will simply grab the function out of libtrace_t
1999        if (trace->format->pconfig_input)
2000                ret = trace->format->pconfig_input(trace, TRACE_OPTION_SET_HASHER, &type);
2001
2002        if (ret == -1) {
2003                // We have to deal with this ourself
2004                // This most likely means single threaded reading of the trace
2005                if (!hasher) {
2006                        switch (type)
2007                        {
2008                                case HASHER_CUSTOM:
2009                                case HASHER_BALANCE:
2010                                        return 0;
2011                                case HASHER_BIDIRECTIONAL:
2012                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
2013                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
2014                                        toeplitz_init_config(trace->hasher_data, 1);
2015                                        return 0;
2016                                case HASHER_UNIDIRECTIONAL:
2017                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
2018                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
2019                                        toeplitz_init_config(trace->hasher_data, 0);
2020                                        return 0;
2021                                case HASHER_HARDWARE:
2022                                        return -1;
2023                        }
2024                        return -1;
2025                }
2026        } else {
2027                // The hardware is dealing with this yay
2028                trace->hasher_type = HASHER_HARDWARE;
2029        }
2030
2031        return 0;
2032}
2033
2034// Waits for all threads to finish
2035DLLEXPORT void trace_join(libtrace_t *libtrace) {
2036        int i;
2037
2038        /* Firstly wait for the perpkt threads to finish, since these are
2039         * user controlled */
2040        for (i=0; i< libtrace->perpkt_thread_count; i++) {
2041                //printf("Waiting to join with perpkt #%d\n", i);
2042                ASSERT_RET(pthread_join(libtrace->perpkt_threads[i].tid, NULL), == 0);
2043                //printf("Joined with perpkt #%d\n", i);
2044                // So we must do our best effort to empty the queue - so
2045                // the producer (or any other threads) don't block.
2046                libtrace_packet_t * packet;
2047                assert(libtrace->perpkt_threads[i].state == THREAD_FINISHED);
2048                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
2049                        if (packet) // This could be NULL iff the perpkt finishes early
2050                                trace_destroy_packet(packet);
2051        }
2052
2053        /* Now the hasher */
2054        if (trace_has_dedicated_hasher(libtrace)) {
2055                pthread_join(libtrace->hasher_thread.tid, NULL);
2056                assert(libtrace->hasher_thread.state == THREAD_FINISHED);
2057        }
2058
2059        // Now that everything is finished nothing can be touching our
2060        // buffers so clean them up
2061        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2062                // Its possible 1 packet got added by the reporter (or 1 per any other thread) since we cleaned up
2063                // if they lost timeslice before-during a write
2064                libtrace_packet_t * packet;
2065                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
2066                        trace_destroy_packet(packet);
2067                if (libtrace->hasher) {
2068                        assert(libtrace_ringbuffer_is_empty(&libtrace->perpkt_threads[i].rbuffer));
2069                        libtrace_ringbuffer_destroy(&libtrace->perpkt_threads[i].rbuffer);
2070                }
2071                // Cannot destroy vector yet, this happens with trace_destroy
2072        }
2073        // TODO consider perpkt threads marking trace as finished before join is called
2074        libtrace_change_state(libtrace, STATE_FINSHED, true);
2075
2076        if (trace_has_dedicated_reporter(libtrace)) {
2077                pthread_join(libtrace->reporter_thread.tid, NULL);
2078                assert(libtrace->reporter_thread.state == THREAD_FINISHED);
2079        }
2080
2081        // Wait for the tick (keepalive) thread if it has been started
2082        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
2083                libtrace_message_t msg = {0};
2084                msg.code = MESSAGE_DO_STOP;
2085                trace_send_message_to_thread(libtrace, &libtrace->keepalive_thread, &msg);
2086                pthread_join(libtrace->keepalive_thread.tid, NULL);
2087        }
2088
2089        libtrace_change_state(libtrace, STATE_JOINED, true);
2090        print_memory_stats();
2091}
2092
2093DLLEXPORT int libtrace_thread_get_message_count(libtrace_t * libtrace)
2094{
2095        libtrace_thread_t * t = get_thread_descriptor(libtrace);
2096        assert(t);
2097        return libtrace_message_queue_count(&t->messages);
2098}
2099
2100DLLEXPORT int libtrace_thread_get_message(libtrace_t * libtrace, libtrace_message_t * message)
2101{
2102        libtrace_thread_t * t = get_thread_descriptor(libtrace);
2103        assert(t);
2104        return libtrace_message_queue_get(&t->messages, message);
2105}
2106
2107DLLEXPORT int libtrace_thread_try_get_message(libtrace_t * libtrace, libtrace_message_t * message)
2108{
2109        libtrace_thread_t * t = get_thread_descriptor(libtrace);
2110        assert(t);
2111        return libtrace_message_queue_try_get(&t->messages, message);
2112}
2113
2114/**
2115 * Return backlog indicator
2116 */
2117DLLEXPORT int trace_post_reporter(libtrace_t *libtrace)
2118{
2119        libtrace_message_t message = {0};
2120        message.code = MESSAGE_POST_REPORTER;
2121        message.sender = get_thread_descriptor(libtrace);
2122        return libtrace_message_queue_put(&libtrace->reporter_thread.messages, (void *) &message);
2123}
2124
2125/**
2126 * Return backlog indicator
2127 */
2128DLLEXPORT int trace_send_message_to_reporter(libtrace_t * libtrace, libtrace_message_t * message)
2129{
2130        //printf("Sending message code=%d to reporter\n", message->code);
2131        message->sender = get_thread_descriptor(libtrace);
2132        return libtrace_message_queue_put(&libtrace->reporter_thread.messages, message);
2133}
2134
2135/**
2136 *
2137 */
2138DLLEXPORT int trace_send_message_to_thread(libtrace_t * libtrace, libtrace_thread_t *t, libtrace_message_t * message)
2139{
2140        //printf("Sending message code=%d to reporter\n", message->code);
2141        message->sender = get_thread_descriptor(libtrace);
2142        return libtrace_message_queue_put(&t->messages, message);
2143}
2144
2145DLLEXPORT int trace_send_message_to_perpkts(libtrace_t * libtrace, libtrace_message_t * message)
2146{
2147        int i;
2148        message->sender = get_thread_descriptor(libtrace);
2149        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2150                libtrace_message_queue_put(&libtrace->perpkt_threads[i].messages, message);
2151        }
2152        //printf("Sending message code=%d to reporter\n", message->code);
2153        return 0;
2154}
2155
2156DLLEXPORT void libtrace_result_set_key(libtrace_result_t * result, uint64_t key) {
2157        result->key = key;
2158}
2159DLLEXPORT uint64_t libtrace_result_get_key(libtrace_result_t * result) {
2160        return result->key;
2161}
2162DLLEXPORT void libtrace_result_set_value(libtrace_result_t * result, libtrace_generic_types_t value) {
2163        result->value = value;
2164}
2165DLLEXPORT libtrace_generic_types_t libtrace_result_get_value(libtrace_result_t * result) {
2166        return result->value;
2167}
2168DLLEXPORT void libtrace_result_set_key_value(libtrace_result_t * result, uint64_t key, libtrace_generic_types_t value) {
2169        result->key = key;
2170        result->value = value;
2171}
2172DLLEXPORT void trace_destroy_result(libtrace_result_t ** result) {
2173        free(*result);
2174        result = NULL;
2175        // TODO automatically back with a free list!!
2176}
2177
2178DLLEXPORT void * trace_get_global(libtrace_t *trace)
2179{
2180        return trace->global_blob;
2181}
2182
2183DLLEXPORT void * trace_set_global(libtrace_t *trace, void * data)
2184{
2185        if (trace->global_blob && trace->global_blob != data) {
2186                void * ret = trace->global_blob;
2187                trace->global_blob = data;
2188                return ret;
2189        } else {
2190                trace->global_blob = data;
2191                return NULL;
2192        }
2193}
2194
2195DLLEXPORT void * trace_get_tls(libtrace_thread_t *t)
2196{
2197        return t->user_data;
2198}
2199
2200DLLEXPORT void * trace_set_tls(libtrace_thread_t *t, void * data)
2201{
2202        if(t->user_data && t->user_data != data) {
2203                void *ret = t->user_data;
2204                t->user_data = data;
2205                return ret;
2206        } else {
2207                t->user_data = data;
2208                return NULL;
2209        }
2210}
2211
2212/**
2213 * Publishes a result to the reduce queue
2214 * Should only be called by a perpkt thread, i.e. from a perpkt handler
2215 */
2216DLLEXPORT void trace_publish_result(libtrace_t *libtrace, libtrace_thread_t *t, uint64_t key, libtrace_generic_types_t value, int type) {
2217        libtrace_result_t res;
2218        res.type = type;
2219        res.key = key;
2220        res.value = value;
2221        assert(libtrace->combiner.publish);
2222        libtrace->combiner.publish(libtrace, t->perpkt_num, &libtrace->combiner, &res);
2223        return;
2224}
2225
2226/**
2227 * Sets a combiner function against the trace.
2228 */
2229DLLEXPORT void trace_set_combiner(libtrace_t *trace, const libtrace_combine_t *combiner, libtrace_generic_types_t config){
2230        if (combiner) {
2231                trace->combiner = *combiner;
2232                trace->combiner.configuration = config;
2233        } else {
2234                // No combiner, so don't try use it
2235                memset(&trace->combiner, 0, sizeof(trace->combiner));
2236        }
2237}
2238
2239DLLEXPORT uint64_t trace_packet_get_order(libtrace_packet_t * packet) {
2240        return packet->order;
2241}
2242
2243DLLEXPORT uint64_t trace_packet_get_hash(libtrace_packet_t * packet) {
2244        return packet->hash;
2245}
2246
2247DLLEXPORT void trace_packet_set_order(libtrace_packet_t * packet, uint64_t order) {
2248        packet->order = order;
2249}
2250
2251DLLEXPORT void trace_packet_set_hash(libtrace_packet_t * packet, uint64_t hash) {
2252        packet->hash = hash;
2253}
2254
2255DLLEXPORT int trace_finished(libtrace_t * libtrace) {
2256        // TODO I don't like using this so much, we could use state!!!
2257        return libtrace->perpkt_thread_states[THREAD_FINISHED] == libtrace->perpkt_thread_count;
2258}
2259
2260DLLEXPORT int trace_parallel_config(libtrace_t *libtrace, trace_parallel_option_t option, void *value)
2261{
2262        UNUSED int ret = -1;
2263        switch (option) {
2264                case TRACE_OPTION_TICK_INTERVAL:
2265                        libtrace->config.tick_interval = *((int *) value);
2266                        return 1;
2267                case TRACE_OPTION_SET_HASHER:
2268                        return trace_set_hasher(libtrace, (enum hasher_types) *((int *) value), NULL, NULL);
2269                case TRACE_OPTION_SET_PERPKT_THREAD_COUNT:
2270                        libtrace->config.perpkt_threads = *((int *) value);
2271                        return 1;
2272                case TRACE_OPTION_TRACETIME:
2273                        if(*((int *) value))
2274                                libtrace->tracetime = 1;
2275                        else
2276                                libtrace->tracetime = 0;
2277                        return 0;
2278                case TRACE_OPTION_SET_CONFIG:
2279                        libtrace->config = *((struct user_configuration *) value);
2280                case TRACE_OPTION_GET_CONFIG:
2281                        *((struct user_configuration *) value) = libtrace->config;
2282        }
2283        return 0;
2284}
2285
2286static bool config_bool_parse(char *value, size_t nvalue) {
2287        if (strncmp(value, "true", nvalue) == 0)
2288                return true;
2289        else if (strncmp(value, "false", nvalue) == 0)
2290                return false;
2291        else
2292                return strtoll(value, NULL, 10) != 0;
2293}
2294
2295static void config_string(struct user_configuration *uc, char *key, size_t nkey, char *value, size_t nvalue) {
2296        assert(key);
2297        assert(value);
2298        assert(uc);
2299        if (strncmp(key, "packet_cache_size", nkey) == 0
2300            || strncmp(key, "pcs", nkey) == 0) {
2301                uc->packet_cache_size = strtoll(value, NULL, 10);
2302        } else if (strncmp(key, "packet_thread_cache_size", nkey) == 0
2303                   || strncmp(key, "ptcs", nkey) == 0) {
2304                uc->packet_thread_cache_size = strtoll(value, NULL, 10);
2305        } else if (strncmp(key, "fixed_packet_count", nkey) == 0
2306                   || strncmp(key, "fpc", nkey) == 0) {
2307                uc->fixed_packet_count = config_bool_parse(value, nvalue);
2308        } else if (strncmp(key, "burst_size", nkey) == 0
2309                   || strncmp(key, "bs", nkey) == 0) {
2310                uc->burst_size = strtoll(value, NULL, 10);
2311        } else if (strncmp(key, "tick_interval", nkey) == 0
2312                   || strncmp(key, "ti", nkey) == 0) {
2313                uc->tick_interval = strtoll(value, NULL, 10);
2314        } else if (strncmp(key, "tick_count", nkey) == 0
2315                   || strncmp(key, "tc", nkey) == 0) {
2316                uc->tick_count = strtoll(value, NULL, 10);
2317        } else if (strncmp(key, "perpkt_threads", nkey) == 0
2318                   || strncmp(key, "pt", nkey) == 0) {
2319                uc->perpkt_threads = strtoll(value, NULL, 10);
2320        } else if (strncmp(key, "hasher_queue_size", nkey) == 0
2321                   || strncmp(key, "hqs", nkey) == 0) {
2322                uc->hasher_queue_size = strtoll(value, NULL, 10);
2323        } else if (strncmp(key, "hasher_polling", nkey) == 0
2324                   || strncmp(key, "hp", nkey) == 0) {
2325                uc->hasher_polling = config_bool_parse(value, nvalue);
2326        } else if (strncmp(key, "reporter_polling", nkey) == 0
2327                   || strncmp(key, "rp", nkey) == 0) {
2328                uc->reporter_polling = config_bool_parse(value, nvalue);
2329        } else if (strncmp(key, "reporter_thold", nkey) == 0
2330                   || strncmp(key, "rt", nkey) == 0) {
2331                uc->reporter_thold = strtoll(value, NULL, 10);
2332        } else if (strncmp(key, "debug_state", nkey) == 0
2333                   || strncmp(key, "ds", nkey) == 0) {
2334                uc->debug_state = config_bool_parse(value, nvalue);
2335        } else {
2336                fprintf(stderr, "No matching value %s(=%s)\n", key, value);
2337        }
2338}
2339
2340DLLEXPORT void parse_user_config(struct user_configuration* uc, char * str) {
2341        char *pch;
2342        char key[100];
2343        char value[100];
2344        assert(str);
2345        assert(uc);
2346        pch = strtok (str," ,.-");
2347        while (pch != NULL)
2348        {
2349                if (sscanf(pch, "%99[^=]=%99s", key, value) == 2) {
2350                        config_string(uc, key, sizeof(key), value, sizeof(value));
2351                } else {
2352                        fprintf(stderr, "Error parsing %s\n", pch);
2353                }
2354                pch = strtok (NULL," ,.-");
2355        }
2356}
2357
2358DLLEXPORT void parse_user_config_file(struct user_configuration* uc, FILE *file) {
2359        char line[1024];
2360        while (fgets(line, sizeof(line), file) != NULL)
2361        {
2362                parse_user_config(uc, line);
2363        }
2364}
2365
2366DLLEXPORT libtrace_packet_t* trace_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet) {
2367        libtrace_packet_t* result;
2368        libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &result, 1, 1);
2369        assert(result);
2370        swap_packets(result, packet); // Move the current packet into our copy
2371        return result;
2372}
2373
2374DLLEXPORT void trace_free_result_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
2375        // Try write back the packet
2376        assert(packet);
2377        // Always release any resources this might be holding such as a slot in a ringbuffer
2378        trace_fin_packet(packet);
2379        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packet, 1, 1);
2380}
2381
2382DLLEXPORT libtrace_info_t *trace_get_information(libtrace_t * libtrace) {
2383        if (libtrace->format)
2384                return &libtrace->format->info;
2385        else
2386                return NULL;
2387}
Note: See TracBrowser for help on using the repository browser.