source: lib/trace_parallel.c @ 2498008

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 2498008 was 2498008, checked in by Richard Sanger <rsangerarj@…>, 7 years ago

Refactor the combining step to allow user defined functions here.

Remove the old trace_get_results, now instead simply provide a reporter function which gets called as soon as results are ready.
The combiner function used determines the order of these results and when they are released etc.
The combiner function can be selected from those built-in or a custom version can be defined results are provided when ready.
Quickly hacked the parallel tests to work with this update, these are still a bit messy.

Also some fixes some compile warnings.

  • Property mode set to 100644
File size: 76.0 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * All rights reserved.
8 *
9 * This code has been developed by the University of Waikato WAND
10 * research group. For further information please see http://www.wand.net.nz/
11 *
12 * libtrace is free software; you can redistribute it and/or modify
13 * it under the terms of the GNU General Public License as published by
14 * the Free Software Foundation; either version 2 of the License, or
15 * (at your option) any later version.
16 *
17 * libtrace is distributed in the hope that it will be useful,
18 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 * GNU General Public License for more details.
21 *
22 * You should have received a copy of the GNU General Public License
23 * along with libtrace; if not, write to the Free Software
24 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25 *
26 * $Id$
27 *
28 */
29
30
31#define _GNU_SOURCE
32#include "common.h"
33#include "config.h"
34#include <assert.h>
35#include <errno.h>
36#include <fcntl.h>
37#include <stdio.h>
38#include <stdlib.h>
39#include <string.h>
40#include <sys/stat.h>
41#include <sys/types.h>
42#ifndef WIN32
43#include <sys/socket.h>
44#endif
45#include <stdarg.h>
46#include <sys/param.h>
47
48#ifdef HAVE_LIMITS_H
49#  include <limits.h>
50#endif
51
52#ifdef HAVE_SYS_LIMITS_H
53#  include <sys/limits.h>
54#endif
55
56#ifdef HAVE_NET_IF_ARP_H
57#  include <net/if_arp.h>
58#endif
59
60#ifdef HAVE_NET_IF_H
61#  include <net/if.h>
62#endif
63
64#ifdef HAVE_NETINET_IN_H
65#  include <netinet/in.h>
66#endif
67
68#ifdef HAVE_NET_ETHERNET_H
69#  include <net/ethernet.h>
70#endif
71
72#ifdef HAVE_NETINET_IF_ETHER_H
73#  include <netinet/if_ether.h>
74#endif
75
76#include <time.h>
77#ifdef WIN32
78#include <sys/timeb.h>
79#endif
80
81#include "libtrace.h"
82#include "libtrace_int.h"
83
84#ifdef HAVE_PCAP_BPF_H
85#  include <pcap-bpf.h>
86#else
87#  ifdef HAVE_NET_BPF_H
88#    include <net/bpf.h>
89#  endif
90#endif
91
92
93#include "libtrace_int.h"
94#include "format_helper.h"
95#include "rt_protocol.h"
96#include "hash_toeplitz.h"
97#include "combiners.h"
98
99#include <pthread.h>
100#include <signal.h>
101#include <unistd.h>
102
103
104static size_t trace_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packets[], size_t nb_packets);
105
106extern int libtrace_parallel;
107
108struct multithreading_stats {
109        uint64_t full_queue_hits;
110        uint64_t wait_for_fill_complete_hits;
111} contention_stats[1024];
112
113struct mem_stats {
114        struct memfail {
115           uint64_t cache_hit;
116           uint64_t ring_hit;
117           uint64_t miss;
118           uint64_t recycled;
119        } readbulk, read, write, writebulk;
120};
121
122// Grrr gcc wants this spelt out
123__thread struct mem_stats mem_hits = {{0},{0},{0},{0}};
124
125static void print_memory_stats() {
126        char t_name[50];
127        uint64_t total;
128        pthread_getname_np(pthread_self(), t_name, sizeof(t_name));
129
130        fprintf(stderr, "Thread ID#%d - %s\n", (int) pthread_self(), t_name);
131
132        total = mem_hits.read.cache_hit + mem_hits.read.ring_hit + mem_hits.read.miss;
133        if (total) {
134                fprintf(stderr, "\tRead:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
135                                mem_hits.read.cache_hit, mem_hits.read.ring_hit, mem_hits.read.miss, mem_hits.read.recycled);
136                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
137                                total, (double) mem_hits.read.miss / (double) total * 100.0);
138        }
139
140        total = mem_hits.readbulk.cache_hit + mem_hits.readbulk.ring_hit + mem_hits.readbulk.miss;
141        if (total) {
142                fprintf(stderr, "\tReadbulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
143                                mem_hits.readbulk.cache_hit, mem_hits.readbulk.ring_hit, mem_hits.readbulk.miss, mem_hits.readbulk.recycled);
144
145
146                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
147                                total, (double) mem_hits.readbulk.miss / (double) total * 100.0);
148        }
149
150        total = mem_hits.write.cache_hit + mem_hits.write.ring_hit + mem_hits.write.miss;
151        if (total) {
152                fprintf(stderr, "\tWrite:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
153                                mem_hits.write.cache_hit, mem_hits.write.ring_hit, mem_hits.write.miss, mem_hits.write.recycled);
154
155                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
156                                total, (double) mem_hits.write.miss / (double) total * 100.0);
157        }
158
159        total = mem_hits.writebulk.cache_hit + mem_hits.writebulk.ring_hit + mem_hits.writebulk.miss;
160        if (total) {
161                fprintf(stderr, "\tWritebulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
162                                mem_hits.writebulk.cache_hit, mem_hits.writebulk.ring_hit, mem_hits.writebulk.miss, mem_hits.writebulk.recycled);
163
164                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
165                                total, (double) mem_hits.writebulk.miss / (double) total * 100.0);
166        }
167
168}
169
170/**
171 * True if the trace has dedicated hasher thread otherwise false,
172 * to be used after the trace is running
173 */
174static inline int trace_has_dedicated_hasher(libtrace_t * libtrace)
175{
176        assert(libtrace->state != STATE_NEW);
177        return libtrace->hasher_thread.type == THREAD_HASHER;
178}
179
180/**
181 * True if the trace has dedicated hasher thread otherwise false,
182 * to be used after the trace is running
183 */
184static inline int trace_has_dedicated_reporter(libtrace_t * libtrace)
185{
186        assert(libtrace->state != STATE_NEW);
187        return libtrace->reporter_thread.type == THREAD_REPORTER && libtrace->reporter;
188}
189
190/**
191 * When running the number of perpkt threads in use.
192 * TODO what if the trace is not running yet, or has finished??
193 *
194 * @brief libtrace_perpkt_thread_nb
195 * @param t The trace
196 * @return
197 */
198DLLEXPORT int libtrace_get_perpkt_count(libtrace_t * t) {
199        return t->perpkt_thread_count;
200}
201
202/**
203 * Changes a thread's state and broadcasts the condition variable. This
204 * should always be done when the lock is held.
205 *
206 * Additionally for perpkt threads the state counts are updated.
207 *
208 * @param trace A pointer to the trace
209 * @param t A pointer to the thread to modify
210 * @param new_state The new state of the thread
211 * @param need_lock Set to true if libtrace_lock is not held, otherwise
212 *        false in the case the lock is currently held by this thread.
213 */
214static inline void thread_change_state(libtrace_t *trace, libtrace_thread_t *t,
215        const enum thread_states new_state, const bool need_lock)
216{
217        enum thread_states prev_state;
218        if (need_lock)
219                pthread_mutex_lock(&trace->libtrace_lock);
220        prev_state = t->state;
221        t->state = new_state;
222        if (t->type == THREAD_PERPKT) {
223                --trace->perpkt_thread_states[prev_state];
224                ++trace->perpkt_thread_states[new_state];
225        }
226
227        if (trace->config.debug_state)
228                fprintf(stderr, "Thread %d state changed from %d to %d\n", (int) t->tid,
229                        prev_state, t->state);
230
231        if (need_lock)
232                pthread_mutex_unlock(&trace->libtrace_lock);
233        pthread_cond_broadcast(&trace->perpkt_cond);
234}
235
236/**
237 * Changes the overall traces state and signals the condition.
238 *
239 * @param trace A pointer to the trace
240 * @param new_state The new state of the trace
241 * @param need_lock Set to true if libtrace_lock is not held, otherwise
242 *        false in the case the lock is currently held by this thread.
243 */
244static inline void libtrace_change_state(libtrace_t *trace, 
245        const enum trace_state new_state, const bool need_lock)
246{
247        UNUSED enum trace_state prev_state;
248        if (need_lock)
249                pthread_mutex_lock(&trace->libtrace_lock);
250        prev_state = trace->state;
251        trace->state = new_state;
252
253        if (trace->config.debug_state)
254                fprintf(stderr, "Trace(%s) state changed from %s to %s\n",
255                        trace->uridata, get_trace_state_name(prev_state),
256                        get_trace_state_name(trace->state));
257
258        if (need_lock)
259                pthread_mutex_unlock(&trace->libtrace_lock);
260        pthread_cond_broadcast(&trace->perpkt_cond);
261}
262
263/**
264 * @return True if the format supports parallel threads.
265 */
266static inline bool trace_supports_parallel(libtrace_t *trace)
267{
268        assert(trace);
269        assert(trace->format);
270        if (trace->format->pstart_input)
271                return true;
272        else
273                return false;
274}
275
276DLLEXPORT void print_contention_stats(libtrace_t *libtrace) {
277        int i;
278        struct multithreading_stats totals = {0};
279        for (i = 0; i < libtrace->perpkt_thread_count ; i++) {
280                fprintf(stderr, "\nStats for perpkt thread#%d\n", i);
281                fprintf(stderr, "\tfull_queue_hits: %"PRIu64"\n", contention_stats[i].full_queue_hits);
282                totals.full_queue_hits += contention_stats[i].full_queue_hits;
283                fprintf(stderr, "\twait_for_fill_complete_hits: %"PRIu64"\n", contention_stats[i].wait_for_fill_complete_hits);
284                totals.wait_for_fill_complete_hits += contention_stats[i].wait_for_fill_complete_hits;
285        }
286        fprintf(stderr, "\nTotals for perpkt threads\n");
287        fprintf(stderr, "\tfull_queue_hits: %"PRIu64"\n", totals.full_queue_hits);
288        fprintf(stderr, "\twait_for_fill_complete_hits: %"PRIu64"\n", totals.wait_for_fill_complete_hits);
289
290        return;
291}
292
293void libtrace_zero_thread(libtrace_thread_t * t) {
294        t->trace = NULL;
295        t->ret = NULL;
296        t->type = THREAD_EMPTY;
297        libtrace_zero_ringbuffer(&t->rbuffer);
298        t->recorded_first = false;
299        t->perpkt_num = -1;
300        t->accepted_packets = 0;
301}
302
303// Ints are aligned int is atomic so safe to read and write at same time
304// However write must be locked, read doesn't (We never try read before written to table)
305libtrace_thread_t * get_thread_table(libtrace_t *libtrace) {
306        int i = 0;
307        pthread_t tid = pthread_self();
308
309        for (;i<libtrace->perpkt_thread_count ;++i) {
310                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
311                        return &libtrace->perpkt_threads[i];
312        }
313        return NULL;
314}
315
316int get_thread_table_num(libtrace_t *libtrace) {
317        int i = 0;
318        pthread_t tid = pthread_self();
319        for (;i<libtrace->perpkt_thread_count; ++i) {
320                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
321                        return i;
322        }
323        return -1;
324}
325
326static libtrace_thread_t * get_thread_descriptor(libtrace_t *libtrace) {
327        libtrace_thread_t *ret;
328        if (!(ret = get_thread_table(libtrace))) {
329                pthread_t tid = pthread_self();
330                // Check if we are reporter or something else
331                if (pthread_equal(tid, libtrace->reporter_thread.tid))
332                        ret = &libtrace->reporter_thread;
333                else if (pthread_equal(tid, libtrace->hasher_thread.tid))
334                        ret = &libtrace->hasher_thread;
335                else
336                        ret = NULL;
337        }
338        return ret;
339}
340
341/** Used below in trace_make_results_packets_safe */
342static void do_copy_result_packet(void *data)
343{
344        libtrace_result_t *res = (libtrace_result_t *)data;
345        if (res->type == RESULT_PACKET) {
346                // Duplicate the packet in standard malloc'd memory and free the
347                // original, This is a 1:1 exchange so is ocache count remains unchanged.
348                libtrace_packet_t *oldpkt, *dup;
349                oldpkt = (libtrace_packet_t *) res->value;
350                dup = trace_copy_packet(oldpkt);
351                res->value = (void *)dup;
352                trace_destroy_packet(oldpkt);
353        }
354}
355
356/**
357 * Holds threads in a paused state, until released by broadcasting
358 * the condition mutex.
359 */
360static void trace_thread_pause(libtrace_t *trace, libtrace_thread_t *t) {
361        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
362        thread_change_state(trace, t, THREAD_PAUSED, false);
363        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
364                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
365        }
366        thread_change_state(trace, t, THREAD_RUNNING, false);
367        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
368}
369
370
371
372/**
373 * Dispatches packets to their correct place and applies any translations
374 * as needed
375 * @param trace
376 * @param t
377 * @param packet (in, out) this will be set to NULL if the user doesn't return the packet for reuse
378 * @return -1 if an error or EOF has occured and the trace should end otherwise 0 to continue as normal
379 */
380static inline int dispatch_packets(libtrace_t *trace, libtrace_thread_t *t, libtrace_packet_t **packets,
381                                   size_t nb_packets) {
382        libtrace_message_t message;
383        size_t i;
384        for (i = 0; i < nb_packets; ++i) {
385                if (packets[i]->error > 0) {
386                        packets[i] = (*trace->per_pkt)(trace, packets[i], NULL, t);
387                } else if (packets[i]->error == READ_TICK) {
388                        message.code = MESSAGE_TICK;
389                        message.additional.uint64 = trace_packet_get_order(packets[i]);
390                        message.sender = t;
391                        (*trace->per_pkt)(trace, NULL, &message, t);
392                } else if (packets[i]->error != READ_MESSAGE) {
393                        // An error this should be the last packet we read
394                        size_t z;
395                        // We could have an eof or error and a message such as pause
396                        for (z = i ; z < nb_packets; ++z) {
397                                fprintf(stderr, "i=%d nb_packet=%d err=%d\n", (int) z, (int) nb_packets, packets[z]->error);
398                                assert (packets[z]->error <= 0);
399                        }
400                        return -1;
401                }
402                // -2 is a message its not worth checking now just finish this lot and we'll check
403                // when we loop next
404        }
405        return 0;
406}
407
408/**
409 * The is the entry point for our packet processing threads.
410 */
411static void* perpkt_threads_entry(void *data) {
412        libtrace_t *trace = (libtrace_t *)data;
413        libtrace_thread_t * t;
414        libtrace_message_t message = {0};
415        libtrace_packet_t *packets[trace->config.burst_size];
416        size_t nb_packets;
417        size_t i;
418
419        memset(&packets, 0, sizeof(void*) * trace->config.burst_size);
420        // Force this thread to wait until trace_pstart has been completed
421        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
422        t = get_thread_table(trace);
423        assert(t);
424        //printf("Yay Started perpkt thread #%d\n", (int) get_thread_table_num(trace));
425        if (trace->format->pregister_thread) {
426                trace->format->pregister_thread(trace, t, !trace_has_dedicated_hasher(trace));
427        }
428        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
429
430        /* ~~~~~~~~~~~ Setup complete now we loop ~~~~~~~~~~~~~~~ */
431        // Send a message to say we've started
432
433        // Let the per_packet function know we have started
434        message.code = MESSAGE_STARTING;
435        message.sender = t;
436        (*trace->per_pkt)(trace, NULL, &message, t);
437        message.code = MESSAGE_RESUMING;
438        (*trace->per_pkt)(trace, NULL, &message, t);
439
440
441        for (;;) {
442
443                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
444                        switch (message.code) {
445                                case MESSAGE_DO_PAUSE: // This is internal
446                                        // Send message to say we are pausing, TODO consider sender
447                                        message.code = MESSAGE_PAUSING;
448                                        message.sender = t;
449                                        (*trace->per_pkt)(trace, NULL, &message, t);
450                                        // If a hasher thread is running empty input queues so we don't lose data
451                                        if (trace_has_dedicated_hasher(trace)) {
452                                                fprintf(stderr, "Trace is using a hasher thread emptying queues\n");
453                                                // The hasher has stopped by this point, so the queue shouldn't be filling
454                                                while(!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
455                                                        ASSERT_RET(trace_pread_packet(trace, t, packets, 1), == 1);
456                                                        if (dispatch_packets(trace, t, packets, 1) == -1) {
457                                                                // EOF or error, either way we'll stop
458                                                                while (!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
459                                                                        ASSERT_RET(trace_pread_packet(trace, t, packets, 1), == 1);
460                                                                        // No packets after this should have any data in them
461                                                                        assert(packets[0]->error <= 0);
462                                                                }
463                                                                goto stop;
464                                                        }
465                                                }
466                                        }
467                                        // Now we do the actual pause, this returns when we are done
468                                        trace_thread_pause(trace, t);
469                                        message.code = MESSAGE_RESUMING;
470                                        (*trace->per_pkt)(trace, NULL, &message, t);
471                                        // Check for new messages as soon as we return
472                                        continue;
473                                case MESSAGE_DO_STOP: // This is internal
474                                        goto stop;
475                        }
476                        (*trace->per_pkt)(trace, NULL, &message, t);
477                        continue;
478                }
479
480                if (trace->perpkt_thread_count == 1) {
481                        if (!packets[0]) {
482                                libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packets[0], 1, 1);
483                        }
484                        assert(packets[0]);
485                        packets[0]->error = trace_read_packet(trace, packets[0]);
486                        nb_packets = 1;
487                } else {
488                        nb_packets = trace_pread_packet(trace, t, packets, trace->config.burst_size);
489                }
490                // Loop through the packets we just read
491                if (dispatch_packets(trace, t, packets, nb_packets) == -1)
492                        break;
493        }
494
495
496stop:
497        /* ~~~~~~~~~~~~~~ Trace is finished do tear down ~~~~~~~~~~~~~~~~~~~~~ */
498
499        // Let the per_packet function know we have stopped
500        message.code = MESSAGE_PAUSING;
501        message.sender = t;
502        (*trace->per_pkt)(trace, NULL, &message, t);
503        message.code = MESSAGE_STOPPING;
504        message.additional.uint64 = 0;
505        (*trace->per_pkt)(trace, NULL, &message, t);
506
507        // Free any remaining packets
508        for (i = 0; i < trace->config.burst_size; i++) {
509                if (packets[i]) {
510                        libtrace_ocache_free(&trace->packet_freelist, (void **) &packets[i], 1, 1);
511                        packets[i] = NULL;
512                }
513        }
514
515       
516        thread_change_state(trace, t, THREAD_FINISHED, true);
517
518        // Notify only after we've defiantly set the state to finished
519        message.code = MESSAGE_PERPKT_ENDED;
520        message.additional.uint64 = 0;
521        trace_send_message_to_reporter(trace, &message);
522
523        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
524        if (trace->format->punregister_thread) {
525                trace->format->punregister_thread(trace, t);
526        }
527        print_memory_stats();
528
529        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
530
531        pthread_exit(NULL);
532};
533
534/**
535 * The start point for our single threaded hasher thread, this will read
536 * and hash a packet from a data source and queue it against the correct
537 * core to process it.
538 */
539static void* hasher_entry(void *data) {
540        libtrace_t *trace = (libtrace_t *)data;
541        libtrace_thread_t * t;
542        int i;
543        libtrace_packet_t * packet;
544        libtrace_message_t message = {0};
545
546        assert(trace_has_dedicated_hasher(trace));
547        /* Wait until all threads are started and objects are initialised (ring buffers) */
548        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
549        t = &trace->hasher_thread;
550        assert(t->type == THREAD_HASHER && pthread_equal(pthread_self(), t->tid));
551        printf("Hasher Thread started\n");
552        if (trace->format->pregister_thread) {
553                trace->format->pregister_thread(trace, t, true);
554        }
555        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
556        int pkt_skipped = 0;
557        /* Read all packets in then hash and queue against the correct thread */
558        while (1) {
559                int thread;
560                if (!pkt_skipped)
561                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
562                assert(packet);
563
564                if (libtrace_halt) // Signal to die has been sent - TODO
565                        break;
566
567                // Check for messages that we expect MESSAGE_DO_PAUSE, (internal messages only)
568                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
569                        switch(message.code) {
570                                case MESSAGE_DO_PAUSE:
571                                        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
572                                        thread_change_state(trace, t, THREAD_PAUSED, false);
573                                        pthread_cond_broadcast(&trace->perpkt_cond);
574                                        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
575                                                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
576                                        }
577                                        thread_change_state(trace, t, THREAD_RUNNING, false);
578                                        pthread_cond_broadcast(&trace->perpkt_cond);
579                                        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
580                                        break;
581                                case MESSAGE_DO_STOP:
582                                        // Stop called after pause
583                                        assert(trace->started == false);
584                                        assert(trace->state == STATE_FINSHED);
585                                        break;
586                                default:
587                                        fprintf(stderr, "Hasher thread didn't expect message code=%d\n", message.code);
588                        }
589                        pkt_skipped = 1;
590                        continue;
591                }
592
593                if ((packet->error = trace_read_packet(trace, packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
594                        break; /* We are EOF or error'd either way we stop  */
595                }
596
597                /* We are guaranteed to have a hash function i.e. != NULL */
598                trace_packet_set_hash(packet, (*trace->hasher)(packet, trace->hasher_data));
599                thread = trace_packet_get_hash(packet) % trace->perpkt_thread_count;
600                /* Blocking write to the correct queue - I'm the only writer */
601                if (trace->perpkt_threads[thread].state != THREAD_FINISHED) {
602                        uint64_t order = trace_packet_get_order(packet);
603                        libtrace_ringbuffer_write(&trace->perpkt_threads[thread].rbuffer, packet);
604                        if (trace->config.tick_count && order % trace->config.tick_count == 0) {
605                                // Write ticks to everyone else
606                                libtrace_packet_t * pkts[trace->perpkt_thread_count];
607                                memset(pkts, 0, sizeof(void *) * trace->perpkt_thread_count);
608                                libtrace_ocache_alloc(&trace->packet_freelist, (void **) pkts, trace->perpkt_thread_count, trace->perpkt_thread_count);
609                                for (i = 0; i < trace->perpkt_thread_count; i++) {
610                                        pkts[i]->error = READ_TICK;
611                                        trace_packet_set_order(pkts[i], order);
612                                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, pkts[i]);
613                                }
614                        }
615                        pkt_skipped = 0;
616                } else {
617                        assert(!"Dropping a packet!!");
618                        pkt_skipped = 1; // Reuse that packet no one read it
619                }
620        }
621
622        /* Broadcast our last failed read to all threads */
623        for (i = 0; i < trace->perpkt_thread_count; i++) {
624                libtrace_packet_t * bcast;
625                fprintf(stderr, "Broadcasting error/EOF now the trace is over\n");
626                if (i == trace->perpkt_thread_count - 1) {
627                        bcast = packet;
628                } else {
629                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &bcast, 1, 1);
630                        bcast->error = packet->error;
631                }
632                ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
633                if (trace->perpkt_threads[i].state != THREAD_FINISHED) {
634                        // Unlock early otherwise we could deadlock
635                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, bcast);
636                        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
637                } else {
638                        fprintf(stderr, "SKIPPING THREAD !!!%d!!!/n", (int) i);
639                        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
640                }
641        }
642
643        // We don't need to free the packet
644        thread_change_state(trace, t, THREAD_FINISHED, true);
645
646        // Notify only after we've defiantly set the state to finished
647        message.code = MESSAGE_PERPKT_ENDED;
648        message.additional.uint64 = 0;
649        trace_send_message_to_reporter(trace, &message);
650        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
651        if (trace->format->punregister_thread) {
652                trace->format->punregister_thread(trace, t);
653        }
654        print_memory_stats();
655        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
656
657        // TODO remove from TTABLE t sometime
658        pthread_exit(NULL);
659};
660
661/**
662 * Moves src into dest(Complete copy) and copies the memory buffer and
663 * its flags from dest into src ready for reuse without needing extra mallocs.
664 */
665static inline void swap_packets(libtrace_packet_t *dest, libtrace_packet_t *src) {
666        // Save the passed in buffer status
667        assert(dest->trace == NULL); // Must be a empty packet
668        void * temp_buf = dest->buffer;
669        buf_control_t temp_buf_control = dest->buf_control;
670        // Completely copy StoredPacket into packet
671        memcpy(dest, src, sizeof(libtrace_packet_t));
672        // Set the buffer settings on the returned packet
673        src->buffer = temp_buf;
674        src->buf_control = temp_buf_control;
675        src->trace = NULL;
676}
677
678/**
679 * @brief Move NULLs to the end of an array.
680 * @param values
681 * @param len
682 * @return The location the first NULL, aka the number of non NULL elements
683 */
684static inline size_t move_nulls_back(void *arr[], size_t len) {
685        size_t fr=0, en = len-1;
686        // Shift all non NULL elements to the front of the array, and NULLs to the
687        // end, traverses every element at most once
688        for (;fr < en; ++fr) {
689                if (arr[fr] == NULL) {
690                        for (;en > fr; --en) {
691                                if(arr[en]) {
692                                        arr[fr] = arr[en];
693                                        arr[en] = NULL;
694                                        break;
695                                }
696                        }
697                }
698        }
699        // This is the index of the first NULL
700        en = MIN(fr, en);
701        // Or the end of the array if this special case
702        if (arr[en])
703                en++;
704        return en;
705}
706
707/** returns the number of packets successfully allocated in the final array
708 these will all be at the front of the array */
709inline static size_t fill_array_with_empty_packets(libtrace_t *libtrace, libtrace_packet_t *packets[], size_t nb_packets) {
710        size_t nb;
711        nb = move_nulls_back((void **) packets, nb_packets);
712        mem_hits.read.recycled += nb;
713        nb += libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &packets[nb], nb_packets - nb, nb_packets - nb);
714        assert(nb_packets == nb);
715        return nb;
716}
717
718
719inline static size_t empty_array_of_packets(libtrace_t *libtrace, libtrace_packet_t *packets[], size_t nb_packets) {
720        size_t nb;
721        nb = move_nulls_back((void **) packets, nb_packets);
722        mem_hits.write.recycled += nb_packets - nb;
723        nb += nb_packets - libtrace_ocache_free(&libtrace->packet_freelist, (void **)packets, nb, nb);
724        memset(packets, 0, nb); // XXX make better, maybe do this in ocache??
725        return nb;
726}
727
728/* Our simplest case when a thread becomes ready it can obtain an exclusive
729 * lock to read packets from the underlying trace.
730 */
731inline static size_t trace_pread_packet_first_in_first_served(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packets[], size_t nb_packets)
732{
733        size_t i = 0;
734        bool tick_hit = false;
735
736        nb_packets = fill_array_with_empty_packets(libtrace, packets, nb_packets);
737
738        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
739        /* Read nb_packets */
740        for (i = 0; i < nb_packets; ++i) {
741                packets[i]->error = trace_read_packet(libtrace, packets[i]);
742                if (packets[i]->error <= 0) {
743                        ++i;
744                        break;
745                }
746                /*
747                if (libtrace->config.tick_count && trace_packet_get_order(packets[i]) % libtrace->config.tick_count == 0) {
748                        tick_hit = true;
749                }*/
750        }
751        // Doing this inside the lock ensures the first packet is always
752        // recorded first
753        if (packets[0]->error > 0) {
754                store_first_packet(libtrace, packets[0], t);
755        }
756        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
757        /* XXX TODO this needs to be inband with packets, or we don't bother in this case
758        if (tick_hit) {
759                libtrace_message_t tick;
760                tick.additional.uint64 = trace_packet_get_order(packets[i]);
761                tick.code = MESSAGE_TICK;
762                trace_send_message_to_perpkts(libtrace, &tick);
763        } */
764        return i;
765}
766
767/**
768 * For the case that we have a dedicated hasher thread
769 * 1. We read a packet from our buffer
770 * 2. Move that into the packet provided (packet)
771 */
772inline static size_t trace_pread_packet_hasher_thread(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packets, size_t nb_packets)
773{
774        size_t i;
775
776        // Always grab at least one
777        if (packets[0]) // Recycle the old get the new
778                libtrace_ocache_free(&libtrace->packet_freelist, (void **) packets, 1, 1);
779        packets[0] = libtrace_ringbuffer_read(&t->rbuffer);
780
781        if (packets[0]->error < 0)
782                return 1;
783
784        for (i = 1; i < nb_packets; i++) {
785                if (packets[i]) // Recycle the old get the new
786                        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packets[i], 1, 1);
787                if (!libtrace_ringbuffer_try_read(&t->rbuffer, (void **) &packets[i])) {
788                        packets[i] = NULL;
789                        break;
790                }
791                // These are typically urgent
792                if (packets[i]->error < 0)
793                        break;
794        }
795       
796        return i;
797}
798
799/**
800 * Tries to read from our queue and returns 1 if a packet was retrieved
801 */
802static inline int try_waiting_queue(libtrace_t *libtrace, libtrace_thread_t * t, libtrace_packet_t **packet, int * ret)
803{
804        libtrace_packet_t* retrived_packet;
805
806        /* Lets see if we have one waiting */
807        if (libtrace_ringbuffer_try_read(&t->rbuffer, (void **) &retrived_packet)) {
808                /* Copy paste from trace_pread_packet_hasher_thread() except that we try read (non-blocking) */
809                assert(retrived_packet);
810
811                if (*packet) // Recycle the old get the new
812                        libtrace_ocache_free(&libtrace->packet_freelist, (void **) packet, 1, 1);
813                *packet = retrived_packet;
814                *ret = (*packet)->error;
815                return 1;
816        }
817        return 0;
818}
819
820/**
821 * Allows us to ensure all threads are finished writing to our threads ring_buffer
822 * before returning EOF/error.
823 */
824inline static int trace_handle_finishing_perpkt(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t)
825{
826        /* We are waiting for the condition that another thread ends to check
827         * our queue for new data, once all threads end we can go to finished */
828        bool complete = false;
829        int ret;
830
831        do {
832                // Wait for a thread to end
833                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
834
835                // Check before
836                if (libtrace->perpkt_thread_states[THREAD_FINISHING] == libtrace->perpkt_thread_count) {
837                        complete = true;
838                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
839                        continue;
840                }
841
842                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
843
844                // Check after
845                if (libtrace->perpkt_thread_states[THREAD_FINISHING] == libtrace->perpkt_thread_count) {
846                        complete = true;
847                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
848                        continue;
849                }
850
851                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
852
853                // Always trying to keep our buffer empty for the unlikely case more threads than buffer space want to write into our queue
854                if(try_waiting_queue(libtrace, t, packet, &ret))
855                        return ret;
856        } while (!complete);
857
858        // We can only end up here once all threads complete
859        try_waiting_queue(libtrace, t, packet, &ret);
860
861        return ret;
862        // TODO rethink this logic fix bug here
863}
864
865/**
866 * Expects the libtrace_lock to not be held
867 */
868inline static int trace_finish_perpkt(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t)
869{
870        thread_change_state(libtrace, t, THREAD_FINISHING, true);
871        return trace_handle_finishing_perpkt(libtrace, packet, t);
872}
873
874/**
875 * This case is much like the dedicated hasher, except that we will become
876 * hasher if we don't have a a packet waiting.
877 *
878 * Note: This is only every used if we have are doing hashing.
879 *
880 * TODO: Can block on zero copy formats such as ring: and dpdk: if the
881 * queue sizes in total are larger than the ring size.
882 *
883 * 1. We read a packet from our buffer
884 * 2. Move that into the packet provided (packet)
885 */
886inline static int trace_pread_packet_hash_locked(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packet)
887{
888        int thread, ret/*, psize*/;
889
890        while (1) {
891                if(try_waiting_queue(libtrace, t, packet, &ret))
892                        return ret;
893                // Can still block here if another thread is writing to a full queue
894                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
895
896                // Its impossible for our own queue to overfill, because no one can write
897                // when we are in the lock
898                if(try_waiting_queue(libtrace, t, packet, &ret)) {
899                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
900                        return ret;
901                }
902
903                // Another thread cannot write a packet because a queue has filled up. Is it ours?
904                if (libtrace->perpkt_queue_full) {
905                        contention_stats[t->perpkt_num].wait_for_fill_complete_hits++;
906                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
907                        continue;
908                }
909
910                if (!*packet)
911                        libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) packet, 1, 1);
912                assert(*packet);
913
914                // If we fail here we can guarantee that our queue is empty (and no new data will be added because we hold the lock)
915                if (libtrace_halt || ((*packet)->error = trace_read_packet(libtrace, *packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
916                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
917                        if (libtrace_halt)
918                                return 0;
919                        else
920                                return (*packet)->error;
921                }
922
923                trace_packet_set_hash(*packet, (*libtrace->hasher)(*packet, libtrace->hasher_data));
924                thread = trace_packet_get_hash(*packet) % libtrace->perpkt_thread_count;
925                if (thread == t->perpkt_num) {
926                        // If it's this thread we must be in order because we checked the buffer once we got the lock
927                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
928                        return (*packet)->error;
929                }
930
931                if (libtrace->perpkt_threads[thread].state != THREAD_FINISHED) {
932                        while (!libtrace_ringbuffer_try_swrite_bl(&libtrace->perpkt_threads[thread].rbuffer, *packet)) {
933                                libtrace->perpkt_queue_full = true;
934                                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
935                                contention_stats[t->perpkt_num].full_queue_hits++;
936                                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
937                        }
938                        *packet = NULL;
939                        libtrace->perpkt_queue_full = false;
940                } else {
941                        /* We can get here if the user closes the thread before natural completion/or error */
942                        assert (!"packet_hash_locked() The user terminated the trace in a abnormal manner");
943                }
944                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
945        }
946}
947
948/**
949 * This case is much like the dedicated hasher, except that we will become
950 * hasher if we don't have a packet waiting.
951 *
952 * TODO: You can lose the tail of a trace if the final thread
953 * fills its own queue and therefore breaks early and doesn't empty the sliding window.
954 *
955 * TODO: Can block on zero copy formats such as ring: and dpdk: if the
956 * queue sizes in total are larger than the ring size.
957 *
958 * 1. We read a packet from our buffer
959 * 2. Move that into the packet provided (packet)
960 */
961inline static int trace_pread_packet_sliding_window(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packet)
962{
963        int ret, i, thread/*, psize*/;
964
965        if (t->state == THREAD_FINISHING)
966                return trace_handle_finishing_perpkt(libtrace, packet, t);
967
968        while (1) {
969                // Check if we have packets ready
970                if(try_waiting_queue(libtrace, t, packet, &ret))
971                        return ret;
972
973                // We limit the number of packets we get to the size of the sliding window
974                // such that it is impossible for any given thread to fail to store a packet
975                ASSERT_RET(sem_wait(&libtrace->sem), == 0);
976                /*~~~~Single threaded read of a packet~~~~*/
977                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
978
979                /* Re-check our queue things we might have data waiting */
980                if(try_waiting_queue(libtrace, t, packet, &ret)) {
981                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
982                        ASSERT_RET(sem_post(&libtrace->sem), == 0);
983                        return ret;
984                }
985
986                // TODO put on *proper* condition variable
987                if (libtrace->perpkt_queue_full) {
988                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
989                        ASSERT_RET(sem_post(&libtrace->sem), == 0);
990                        contention_stats[t->perpkt_num].wait_for_fill_complete_hits++;
991                        continue;
992                }
993
994                if (!*packet)
995                        libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) packet, 1, 1);
996                assert(*packet);
997
998                if (libtrace_halt || ((*packet)->error = trace_read_packet(libtrace, *packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
999                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1000                        ASSERT_RET(sem_post(&libtrace->sem), == 0);
1001                        // Finish this thread ensuring that any data written later by another thread is retrieved also
1002                        if (libtrace_halt)
1003                                return 0;
1004                        else
1005                                return trace_finish_perpkt(libtrace, packet, t);
1006                }
1007                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1008
1009                /* ~~~~Multiple threads can run the hasher~~~~ */
1010                trace_packet_set_hash(*packet, (*libtrace->hasher)(*packet, libtrace->hasher_data));
1011
1012                /* Yes this is correct opposite read lock for a write operation */
1013                ASSERT_RET(pthread_rwlock_rdlock(&libtrace->window_lock), == 0);
1014                if (!libtrace_slidingwindow_try_write(&libtrace->sliding_window, trace_packet_get_order(*packet), *packet))
1015                        assert(!"Semaphore should stop us from ever overfilling the sliding window");
1016                ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0);
1017                *packet = NULL;
1018
1019                // Always try read any data from the sliding window
1020                while (libtrace_slidingwindow_read_ready(&libtrace->sliding_window)) {
1021                        ASSERT_RET(pthread_rwlock_wrlock(&libtrace->window_lock), == 0);
1022                        if (libtrace->perpkt_queue_full) {
1023                                // I might be the holdup in which case if I can read my queue I should do that and return
1024                                if(try_waiting_queue(libtrace, t, packet, &ret)) {
1025                                        ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0);
1026                                        return ret;
1027                                }
1028                                ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0);
1029                                continue;
1030                        }
1031                        // Read greedily as many as we can
1032                        while (libtrace_slidingwindow_try_read(&libtrace->sliding_window, (void **) packet, NULL)) {
1033                                thread = trace_packet_get_hash(*packet) % libtrace->perpkt_thread_count;
1034                                if (libtrace->perpkt_threads[thread].state != THREAD_FINISHED) {
1035                                        while (!libtrace_ringbuffer_try_swrite_bl(&libtrace->perpkt_threads[thread].rbuffer, *packet)) {
1036                                                if (t->perpkt_num == thread)
1037                                                {
1038                                                        // TODO think about this case more because we have to stop early if this were to happen on the last read
1039                                                        // before EOF/error we might not have emptied the sliding window
1040                                                        printf("!~!~!~!~!~!~In this Code~!~!~!~!\n");
1041                                                        // Its our queue we must have a packet to read out
1042                                                        if(try_waiting_queue(libtrace, t, packet, &ret)) {
1043                                                                // We must be able to write this now 100% without fail
1044                                                                libtrace_ringbuffer_write(&libtrace->perpkt_threads[thread].rbuffer, *packet);
1045                                                                ASSERT_RET(sem_post(&libtrace->sem), == 0);
1046                                                                ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0);
1047                                                                return ret;
1048                                                        } else {
1049                                                                assert(!"Our queue is full but I cannot read from it??");
1050                                                        }
1051                                                }
1052                                                // Not us we have to give the other threads a chance to write there packets then
1053                                                libtrace->perpkt_queue_full = true;
1054                                                ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0);
1055                                                for (i = 0; i < libtrace->perpkt_thread_count-1; i++) // Release all other threads to read there packets
1056                                                        ASSERT_RET(sem_post(&libtrace->sem), == 0);
1057
1058                                                contention_stats[t->perpkt_num].full_queue_hits++;
1059                                                ASSERT_RET(pthread_rwlock_wrlock(&libtrace->window_lock), == 0);
1060                                                // Grab these back
1061                                                for (i = 0; i < libtrace->perpkt_thread_count-1; i++) // Release all other threads to read there packets
1062                                                        ASSERT_RET(sem_wait(&libtrace->sem), == 0);
1063                                                libtrace->perpkt_queue_full = false;
1064                                        }
1065                                        ASSERT_RET(sem_post(&libtrace->sem), == 0);
1066                                        *packet = NULL;
1067                                } else {
1068                                        // Cannot write to a queue if no ones waiting (I think this is unreachable)
1069                                        // in the general case (unless the user ends early without proper clean up).
1070                                        assert (!"unreachable code??");
1071                                }
1072                        }
1073                        ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0);
1074                }
1075                // Now we go back to checking our queue anyways
1076        }
1077}
1078
1079
1080/**
1081 * For the first packet of each queue we keep a copy and note the system
1082 * time it was received at.
1083 *
1084 * This is used for finding the first packet when playing back a trace
1085 * in trace time. And can be used by real time applications to print
1086 * results out every XXX seconds.
1087 */
1088void store_first_packet(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t)
1089{
1090        if (!t->recorded_first) {
1091                struct timeval tv;
1092                libtrace_packet_t * dup;
1093                // For what it's worth we can call these outside of the lock
1094                gettimeofday(&tv, NULL);
1095                dup = trace_copy_packet(packet);
1096                ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
1097                libtrace->first_packets.packets[t->perpkt_num].packet = dup;
1098                //printf("Stored first packet time=%f\n", trace_get_seconds(dup));
1099                memcpy(&libtrace->first_packets.packets[t->perpkt_num].tv, &tv, sizeof(tv));
1100                // Now update the first
1101                libtrace->first_packets.count++;
1102                if (libtrace->first_packets.count == 1) {
1103                        // We the first entry hence also the first known packet
1104                        libtrace->first_packets.first = t->perpkt_num;
1105                } else {
1106                        // Check if we are newer than the previous 'first' packet
1107                        size_t first = libtrace->first_packets.first;
1108                        if (trace_get_seconds(dup) <
1109                                trace_get_seconds(libtrace->first_packets.packets[first].packet))
1110                                libtrace->first_packets.first = t->perpkt_num;
1111                }
1112                ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
1113                libtrace_message_t mesg = {0};
1114                mesg.code = MESSAGE_FIRST_PACKET;
1115                trace_send_message_to_reporter(libtrace, &mesg);
1116                t->recorded_first = true;
1117        }
1118}
1119
1120/**
1121 * Returns 1 if it's certain that the first packet is truly the first packet
1122 * rather than a best guess based upon threads that have published so far.
1123 * Otherwise 0 is returned.
1124 * It's recommended that this result is stored rather than calling this
1125 * function again.
1126 */
1127DLLEXPORT int retrive_first_packet(libtrace_t *libtrace, libtrace_packet_t **packet, struct timeval **tv)
1128{
1129        int ret = 0;
1130        ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
1131        if (libtrace->first_packets.count) {
1132                *packet = libtrace->first_packets.packets[libtrace->first_packets.first].packet;
1133                *tv = &libtrace->first_packets.packets[libtrace->first_packets.first].tv;
1134                if (libtrace->first_packets.count == (size_t) libtrace->perpkt_thread_count) {
1135                        ret = 1;
1136                } else {
1137                        struct timeval curr_tv;
1138                        // If a second has passed since the first entry we will assume this is the very first packet
1139                        gettimeofday(&curr_tv, NULL);
1140                        if (curr_tv.tv_sec > (*tv)->tv_sec) {
1141                                if(curr_tv.tv_usec > (*tv)->tv_usec || curr_tv.tv_sec - (*tv)->tv_sec > 1) {
1142                                        ret = 1;
1143                                }
1144                        }
1145                }
1146        } else {
1147                *packet = NULL;
1148                *tv = NULL;
1149        }
1150        ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
1151        return ret;
1152}
1153
1154
1155DLLEXPORT uint64_t tv_to_usec(struct timeval *tv)
1156{
1157        return (uint64_t) tv->tv_sec*1000000ull + (uint64_t) tv->tv_usec;
1158}
1159
1160inline static struct timeval usec_to_tv(uint64_t usec)
1161{
1162        struct timeval tv;
1163        tv.tv_sec = usec / 1000000;
1164        tv.tv_usec = usec % 1000000;
1165        return tv;
1166}
1167
1168/** Similar to delay_tracetime but send messages to all threads periodically */
1169static void* reporter_entry(void *data) {
1170        libtrace_message_t message = {0};
1171        libtrace_t *trace = (libtrace_t *)data;
1172        libtrace_thread_t *t = &trace->reporter_thread;
1173        libtrace_vector_t results;
1174        libtrace_vector_init(&results, sizeof(libtrace_result_t));
1175        fprintf(stderr, "Reporter thread starting\n");
1176
1177        message.code = MESSAGE_STARTING;
1178        message.sender = t;
1179        (*trace->reporter)(trace, NULL, &message);
1180        message.code = MESSAGE_RESUMING;
1181        (*trace->reporter)(trace, NULL, &message);
1182
1183        while (!trace_finished(trace)) {
1184                if (trace->config.reporter_polling) {
1185                        if (libtrace_message_queue_try_get(&t->messages, &message) == LIBTRACE_MQ_FAILED)
1186                                message.code = MESSAGE_POST_REPORTER;
1187                } else {
1188                        libtrace_message_queue_get(&t->messages, &message);
1189                }
1190                switch (message.code) {
1191                        // Check for results
1192                        case MESSAGE_POST_REPORTER:
1193                                trace->combiner.read(trace, &trace->combiner);
1194                                break;
1195                        case MESSAGE_DO_PAUSE:
1196                                assert(trace->combiner.pause);
1197                                trace->combiner.pause(trace, &trace->combiner);
1198                                message.code = MESSAGE_PAUSING;
1199                                message.sender = t;
1200                                (*trace->reporter)(trace, NULL, &message);
1201                                trace_thread_pause(trace, t);
1202                                message.code = MESSAGE_RESUMING;
1203                                (*trace->reporter)(trace, NULL, &message);
1204                                break;
1205                        default:
1206                                (*trace->reporter)(trace, NULL, &message);
1207                }
1208        }
1209
1210        // Flush out whats left now all our threads have finished
1211        trace->combiner.read_final(trace, &trace->combiner);
1212
1213        // GOODBYE
1214        message.code = MESSAGE_PAUSING;
1215        message.sender = t;
1216        (*trace->reporter)(trace, NULL, &message);
1217        message.code = MESSAGE_STOPPING;
1218        (*trace->reporter)(trace, NULL, &message);
1219
1220        thread_change_state(trace, &trace->reporter_thread, THREAD_FINISHED, true);
1221        print_memory_stats();
1222        return NULL;
1223}
1224
1225/** Similar to delay_tracetime but send messages to all threads periodically */
1226static void* keepalive_entry(void *data) {
1227        struct timeval prev, next;
1228        libtrace_message_t message = {0};
1229        libtrace_t *trace = (libtrace_t *)data;
1230        uint64_t next_release;
1231        fprintf(stderr, "keepalive thread is starting\n");
1232
1233        gettimeofday(&prev, NULL);
1234        message.code = MESSAGE_TICK;
1235        while (trace->state != STATE_FINSHED) {
1236                fd_set rfds;
1237                next_release = tv_to_usec(&prev) + (trace->config.tick_interval * 1000);
1238                gettimeofday(&next, NULL);
1239                if (next_release > tv_to_usec(&next)) {
1240                        next = usec_to_tv(next_release - tv_to_usec(&next));
1241                        // Wait for timeout or a message
1242                        FD_ZERO(&rfds);
1243                FD_SET(libtrace_message_queue_get_fd(&trace->keepalive_thread.messages), &rfds);
1244                        if (select(libtrace_message_queue_get_fd(&trace->keepalive_thread.messages)+1, &rfds, NULL, NULL, &next) == 1) {
1245                                libtrace_message_t msg;
1246                                libtrace_message_queue_get(&trace->keepalive_thread.messages, &msg);
1247                                assert(msg.code == MESSAGE_DO_STOP);
1248                                goto done;
1249                        }
1250                }
1251                prev = usec_to_tv(next_release);
1252                if (trace->state == STATE_RUNNING) {
1253                        message.additional.uint64 = tv_to_usec(&prev);
1254                        trace_send_message_to_perpkts(trace, &message);
1255                }
1256        }
1257done:
1258
1259        thread_change_state(trace, &trace->keepalive_thread, THREAD_FINISHED, true);
1260        return NULL;
1261}
1262
1263/**
1264 * Delays a packets playback so the playback will be in trace time
1265 */
1266static inline void delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t) {
1267        struct timeval curr_tv, pkt_tv;
1268        uint64_t next_release = t->tracetime_offset_usec; // Time at which to release the packet
1269        uint64_t curr_usec;
1270        /* Tracetime we might delay releasing this packet */
1271        if (!t->tracetime_offset_usec) {
1272                libtrace_packet_t * first_pkt;
1273                struct timeval *sys_tv;
1274                int64_t initial_offset;
1275                int stable = retrive_first_packet(libtrace, &first_pkt, &sys_tv);
1276                assert(first_pkt);
1277                pkt_tv = trace_get_timeval(first_pkt);
1278                initial_offset = (int64_t)tv_to_usec(sys_tv) - (int64_t)tv_to_usec(&pkt_tv);
1279                if (stable)
1280                        // 0->1 because 0 is used to mean unset
1281                        t->tracetime_offset_usec = initial_offset ? initial_offset: 1;
1282                next_release = initial_offset;
1283        }
1284        /* next_release == offset */
1285        pkt_tv = trace_get_timeval(packet);
1286        next_release += tv_to_usec(&pkt_tv);
1287        gettimeofday(&curr_tv, NULL);
1288        curr_usec = tv_to_usec(&curr_tv);
1289        if (next_release > curr_usec) {
1290                // We need to wait
1291                struct timeval delay_tv = usec_to_tv(next_release-curr_usec);
1292                //printf("WAITING for %d.%d next=%"PRIu64" curr=%"PRIu64" seconds packettime %f\n", delay_tv.tv_sec, delay_tv.tv_usec, next_release, curr_usec, trace_get_seconds(packet));
1293                select(0, NULL, NULL, NULL, &delay_tv);
1294        }
1295}
1296
1297/* Read one packet from the trace into a buffer. Note that this function will
1298 * block until a packet is read (or EOF is reached).
1299 *
1300 * @param libtrace      the libtrace opaque pointer
1301 * @param packet        the packet opaque pointer
1302 * @returns 0 on EOF, negative value on error
1303 *
1304 * Note this is identical to read_packet but calls pread_packet instead of
1305 * read packet in the format.
1306 *
1307 */
1308static inline int trace_pread_packet_wrapper(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packet) {
1309
1310        assert(libtrace && "You called trace_read_packet() with a NULL libtrace parameter!\n");
1311        if (trace_is_err(libtrace))
1312                return -1;
1313        if (!libtrace->started) {
1314                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,"You must call libtrace_start() before trace_read_packet()\n");
1315                return -1;
1316        }
1317        if (!(packet->buf_control==TRACE_CTRL_PACKET || packet->buf_control==TRACE_CTRL_EXTERNAL)) {
1318                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,"Packet passed to trace_read_packet() is invalid\n");
1319                return -1;
1320        }
1321        assert(packet);
1322
1323        if (libtrace->format->read_packet) {
1324                do {
1325                        size_t ret;
1326                        /* Finalise the packet, freeing any resources the format module
1327                         * may have allocated it and zeroing all data associated with it.
1328                         */
1329                        trace_fin_packet(packet);
1330                        /* Store the trace we are reading from into the packet opaque
1331                         * structure */
1332                        packet->trace = libtrace;
1333                        ret=libtrace->format->pread_packet(libtrace, t, packet);
1334                        if (ret <= 0) {
1335                                return ret;
1336                        }
1337                        if (libtrace->filter) {
1338                                /* If the filter doesn't match, read another
1339                                 * packet
1340                                 */
1341                                if (!trace_apply_filter(libtrace->filter,packet)){
1342                                        ++libtrace->filtered_packets;
1343                                        continue;
1344                                }
1345                        }
1346                        if (libtrace->snaplen>0) {
1347                                /* Snap the packet */
1348                                trace_set_capture_length(packet,
1349                                                libtrace->snaplen);
1350                        }
1351                       
1352                        ++t->accepted_packets;
1353                        // TODO look into this better
1354                        trace_packet_set_order(packet, trace_get_erf_timestamp(packet));
1355                        //trace_packet_set_order(packet, libtrace->accepted_packets);
1356                        //++libtrace->accepted_packets;
1357                        return ret;
1358                } while(1);
1359        }
1360        trace_set_err(libtrace,TRACE_ERR_UNSUPPORTED,"This format does not support reading packets\n");
1361        return ~0U;
1362}
1363
1364/**
1365 * Read packets from the parallel trace
1366 * @return the number of packets read, null packets indicate messages. Check packet->error before
1367 * assuming a packet is valid.
1368 */
1369static size_t trace_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packets[], size_t nb_packets)
1370{
1371        size_t ret;
1372        size_t i;
1373        assert(nb_packets);
1374
1375        for (i = 0; i < nb_packets; i++) {
1376                // Cleanup the packet passed back
1377                if (packets[i])
1378                        trace_fin_packet(packets[i]);
1379        }
1380
1381        if (trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
1382                if (!packets[0])
1383                        libtrace_ocache_alloc(&libtrace->packet_freelist, (void **)packets, 1, 1);
1384                packets[0]->error = trace_pread_packet_wrapper(libtrace, t, *packets);
1385                ret = 1;
1386        } else if (trace_has_dedicated_hasher(libtrace)) {
1387                ret = trace_pread_packet_hasher_thread(libtrace, t, packets, nb_packets);
1388        } else if (!trace_has_dedicated_hasher(libtrace)) {
1389                /* We don't care about which core a packet goes to */
1390                ret = trace_pread_packet_first_in_first_served(libtrace, t, packets, nb_packets);
1391        } /* else {
1392                ret = trace_pread_packet_hash_locked(libtrace, packet);
1393        }*/
1394
1395        // Formats can also optionally do this internally to ensure the first
1396        // packet is always reported correctly
1397        assert(ret);
1398        assert(ret <= nb_packets);
1399        if (packets[0]->error > 0) {
1400                store_first_packet(libtrace, packets[0], t);
1401                if (libtrace->tracetime)
1402                        delay_tracetime(libtrace, packets[0], t);
1403        }
1404
1405        return ret;
1406}
1407
1408/* Starts perpkt threads
1409 * @return threads_started
1410 */
1411static inline int trace_start_perpkt_threads (libtrace_t *libtrace) {
1412        int i;
1413        char name[16];
1414        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1415                libtrace_thread_t *t = &libtrace->perpkt_threads[i];
1416                ASSERT_RET(pthread_create(&t->tid, NULL, perpkt_threads_entry, (void *) libtrace), == 0);
1417                snprintf(name, 16, "perpkt-%d", i);
1418                pthread_setname_np(t->tid, name);
1419        }
1420        return libtrace->perpkt_thread_count;
1421}
1422
1423/* Start an input trace in a parallel fashion, or restart a paused trace.
1424 *
1425 * NOTE: libtrace lock is held for the majority of this function
1426 *
1427 * @param libtrace the input trace to start
1428 * @param global_blob some global data you can share with the new perpkt threads
1429 * @returns 0 on success
1430 */
1431DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob, fn_per_pkt per_pkt, fn_reporter reporter)
1432{
1433        int i;
1434        char name[16];
1435        sigset_t sig_before, sig_block_all;
1436        assert(libtrace);
1437        if (trace_is_err(libtrace)) {
1438                return -1;
1439        }
1440       
1441        // NOTE: Until the trace is started we wont have a libtrace_lock initialised
1442        if (libtrace->state != STATE_NEW) {
1443                int err = 0;
1444                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1445                if (libtrace->state != STATE_PAUSED) {
1446                        trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
1447                                "The trace(%s) has already been started and is not paused!!", libtrace->uridata);
1448                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1449                        return -1;
1450                }
1451               
1452                // Update the per_pkt function, or reuse the old one
1453                if (per_pkt)
1454                        libtrace->per_pkt = per_pkt;
1455
1456                if (reporter)
1457                        libtrace->reporter = reporter;
1458
1459                assert(libtrace_parallel);
1460                assert(!libtrace->perpkt_thread_states[THREAD_RUNNING]);
1461                assert(libtrace->per_pkt);
1462               
1463                if (libtrace->perpkt_thread_count > 1 && trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
1464                        fprintf(stderr, "Restarting trace pstart_input()\n");
1465                        err = libtrace->format->pstart_input(libtrace);
1466                } else {
1467                        if (libtrace->format->start_input) {
1468                                fprintf(stderr, "Restarting trace start_input()\n");
1469                                err = libtrace->format->start_input(libtrace);
1470                        }
1471                }
1472               
1473                if (err == 0) {
1474                        libtrace->started = true;
1475                        libtrace_change_state(libtrace, STATE_RUNNING, false);
1476                }
1477                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1478                return err;
1479        }
1480
1481        assert(libtrace->state == STATE_NEW);
1482        libtrace_parallel = 1;
1483
1484        // Store the user defined things against the trace
1485        libtrace->global_blob = global_blob;
1486        libtrace->per_pkt = per_pkt;
1487        libtrace->reporter = reporter;
1488
1489        ASSERT_RET(pthread_mutex_init(&libtrace->libtrace_lock, NULL), == 0);
1490        ASSERT_RET(pthread_cond_init(&libtrace->perpkt_cond, NULL), == 0);
1491        ASSERT_RET(pthread_rwlock_init(&libtrace->window_lock, NULL), == 0);
1492        // Grab the lock
1493        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1494
1495        // Set default buffer sizes
1496        if (libtrace->config.hasher_queue_size <= 0)
1497                libtrace->config.hasher_queue_size = 1000;
1498
1499        if (libtrace->config.perpkt_threads <= 0) {
1500                // TODO add BSD support
1501                libtrace->perpkt_thread_count = sysconf(_SC_NPROCESSORS_ONLN);
1502                if (libtrace->perpkt_thread_count <= 0)
1503                        // Lets just use one
1504                        libtrace->perpkt_thread_count = 1;
1505        } else {
1506                libtrace->perpkt_thread_count = libtrace->config.perpkt_threads;
1507        }
1508
1509        if (libtrace->config.reporter_thold <= 0)
1510                libtrace->config.reporter_thold = 100;
1511        if (libtrace->config.burst_size <= 0)
1512                libtrace->config.burst_size = 10;
1513        if (libtrace->config.packet_thread_cache_size <= 0)
1514                libtrace->config.packet_thread_cache_size = 20;
1515        if (libtrace->config.packet_cache_size <= 0)
1516                libtrace->config.packet_cache_size = (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count;
1517
1518        if (libtrace->config.packet_cache_size <
1519                (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count)
1520                fprintf(stderr, "WARNING deadlocks may occur and extra memory allocating buffer sizes (packet_freelist_size) mismatched\n");
1521
1522        libtrace->started = true; // Before we start the threads otherwise we could have issues
1523        libtrace_change_state(libtrace, STATE_RUNNING, false);
1524        /* Disable signals - Pthread signal handling */
1525
1526        sigemptyset(&sig_block_all);
1527
1528        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_block_all, &sig_before), == 0);
1529
1530        // If we are using a hasher start it
1531        // If single threaded we don't need a hasher
1532        if (libtrace->perpkt_thread_count > 1 && libtrace->hasher && libtrace->hasher_type != HASHER_HARDWARE) {
1533                libtrace_thread_t *t = &libtrace->hasher_thread;
1534                t->trace = libtrace;
1535                t->ret = NULL;
1536                t->type = THREAD_HASHER;
1537                t->state = THREAD_RUNNING;
1538                libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
1539                ASSERT_RET(pthread_create(&t->tid, NULL, hasher_entry, (void *) libtrace), == 0);
1540                snprintf(name, sizeof(name), "hasher-thread");
1541                pthread_setname_np(t->tid, name);
1542        } else {
1543                libtrace->hasher_thread.type = THREAD_EMPTY;
1544        }
1545
1546        libtrace_ocache_init(&libtrace->packet_freelist,
1547                                                 (void* (*)()) trace_create_packet,
1548                                                 (void (*)(void *))trace_destroy_packet,
1549                                                 libtrace->config.packet_thread_cache_size,
1550                                                 libtrace->config.packet_cache_size * 4,
1551                                                 libtrace->config.fixed_packet_count);
1552        // Unused slidingwindow code
1553        //libtrace_slidingwindow_init(&libtrace->sliding_window, libtrace->packet_freelist_size, 0);
1554        //ASSERT_RET(sem_init(&libtrace->sem, 0, libtrace->packet_freelist_size), == 0);
1555
1556        // This will be applied to every new thread that starts, i.e. they will block all signals
1557        // Lets start a fixed number of reading threads
1558
1559        /* Ready some storages */
1560        libtrace->first_packets.first = 0;
1561        libtrace->first_packets.count = 0;
1562        ASSERT_RET(pthread_spin_init(&libtrace->first_packets.lock, 0), == 0);
1563        libtrace->first_packets.packets = calloc(libtrace->perpkt_thread_count, sizeof(struct  __packet_storage_magic_type));
1564
1565
1566        /* Ready all of our perpkt threads - they are started later */
1567        libtrace->perpkt_threads = calloc(sizeof(libtrace_thread_t), libtrace->perpkt_thread_count);
1568        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1569                libtrace_thread_t *t = &libtrace->perpkt_threads[i];
1570                t->trace = libtrace;
1571                t->ret = NULL;
1572                t->type = THREAD_PERPKT;
1573                t->state = THREAD_RUNNING;
1574                t->user_data = NULL;
1575                // t->tid DONE on create
1576                t->perpkt_num = i;
1577                if (libtrace->hasher)
1578                        libtrace_ringbuffer_init(&t->rbuffer, libtrace->config.hasher_queue_size,
1579                                                 libtrace->config.hasher_polling?LIBTRACE_RINGBUFFER_POLLING:0);
1580                libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
1581                t->recorded_first = false;
1582                t->tracetime_offset_usec = 0;;
1583        }
1584
1585        int threads_started = 0;
1586        /* Setup the trace and start our threads */
1587        if (libtrace->perpkt_thread_count > 1 && trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
1588                printf("This format has direct support for p's\n");
1589                threads_started = libtrace->format->pstart_input(libtrace);
1590        } else {
1591                if (libtrace->format->start_input) {
1592                        threads_started=libtrace->format->start_input(libtrace);
1593                }
1594        }
1595        if (threads_started == 0)
1596                threads_started = trace_start_perpkt_threads(libtrace);
1597
1598        // No combiner set, use a default to reduce the chance of this breaking
1599        if (libtrace->combiner.initialise == NULL && libtrace->combiner.publish == NULL)
1600                libtrace->combiner = combiner_unordered;
1601
1602        if (libtrace->combiner.initialise)
1603                libtrace->combiner.initialise(libtrace, &libtrace->combiner);
1604
1605        libtrace->reporter_thread.type = THREAD_REPORTER;
1606        libtrace->reporter_thread.state = THREAD_RUNNING;
1607        libtrace_message_queue_init(&libtrace->reporter_thread.messages, sizeof(libtrace_message_t));
1608        if (reporter) {
1609                // Got a real reporter
1610                ASSERT_RET(pthread_create(&libtrace->reporter_thread.tid, NULL, reporter_entry, (void *) libtrace), == 0);
1611        } else {
1612                // Main thread is reporter
1613                libtrace->reporter_thread.tid = pthread_self();
1614        }
1615
1616        if (libtrace->config.tick_interval > 0) {
1617                libtrace->keepalive_thread.type = THREAD_KEEPALIVE;
1618                libtrace->keepalive_thread.state = THREAD_RUNNING;
1619                libtrace_message_queue_init(&libtrace->keepalive_thread.messages, sizeof(libtrace_message_t));
1620                ASSERT_RET(pthread_create(&libtrace->keepalive_thread.tid, NULL, keepalive_entry, (void *) libtrace), == 0);
1621        }
1622
1623        for (i = 0; i < THREAD_STATE_MAX; ++i) {
1624                libtrace->perpkt_thread_states[i] = 0;
1625        }
1626        libtrace->perpkt_thread_states[THREAD_RUNNING] = threads_started;
1627
1628        // Revert back - Allow signals again
1629        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_before, NULL), == 0);
1630        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1631
1632        if (threads_started < 0)
1633                // Error
1634                return threads_started;
1635
1636        // TODO fix these leaks etc
1637        if (libtrace->perpkt_thread_count != threads_started)
1638                fprintf(stderr, "Warning started threads not equal requested s=%d r=%d", threads_started, libtrace->perpkt_thread_count);
1639
1640
1641        return 0;
1642}
1643
1644/**
1645 * Pauses a trace, this should only be called by the main thread
1646 * 1. Set started = false
1647 * 2. All perpkt threads are paused waiting on a condition var
1648 * 3. Then call ppause on the underlying format if found
1649 * 4. The traces state is paused
1650 *
1651 * Once done you should be able to modify the trace setup and call pstart again
1652 * TODO handle changing thread numbers
1653 */
1654DLLEXPORT int trace_ppause(libtrace_t *libtrace)
1655{
1656        libtrace_thread_t *t;
1657        int i;
1658        assert(libtrace);
1659       
1660        t = get_thread_table(libtrace);
1661        // Check state from within the lock if we are going to change it
1662        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1663        if (!libtrace->started || libtrace->state != STATE_RUNNING) {
1664                fprintf(stderr, "pause failed started=%d state=%s (%d)\n", libtrace->started, get_trace_state_name(libtrace->state), libtrace->state);
1665                trace_set_err(libtrace,TRACE_ERR_BAD_STATE, "You must call trace_start() before calling trace_ppause()");
1666                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1667                return -1;
1668        }
1669
1670        libtrace_change_state(libtrace, STATE_PAUSING, false);
1671        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1672
1673        // Special case handle the hasher thread case
1674        if (trace_has_dedicated_hasher(libtrace)) {
1675                if (libtrace->config.debug_state)
1676                        fprintf(stderr, "Hasher thread is running, asking it to pause ...");
1677                libtrace_message_t message = {0};
1678                message.code = MESSAGE_DO_PAUSE;
1679                trace_send_message_to_thread(libtrace, &libtrace->hasher_thread, &message);
1680                // Wait for it to pause
1681                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1682                while (libtrace->hasher_thread.state == THREAD_RUNNING) {
1683                        ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
1684                }
1685                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1686                if (libtrace->config.debug_state)
1687                        fprintf(stderr, " DONE\n");
1688        }
1689
1690        if (libtrace->config.debug_state)
1691                fprintf(stderr, "Asking perpkt threads to pause ...");
1692        // Stop threads, skip this one if it's a perpkt
1693        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1694                if (&libtrace->perpkt_threads[i] != t) {
1695                        libtrace_message_t message = {0};
1696                        message.code = MESSAGE_DO_PAUSE;
1697                        trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message);
1698                        if(trace_has_dedicated_hasher(libtrace)) {
1699                                // The hasher has stopped and other threads have messages waiting therefore
1700                                // If the queues are empty the other threads would have no data
1701                                // So send some message packets to simply ask the threads to check
1702                                // We are the only writer since hasher has paused
1703                                libtrace_packet_t *pkt;
1704                                libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &pkt, 1, 1);
1705                                pkt->error = READ_MESSAGE;
1706                                libtrace_ringbuffer_write(&libtrace->perpkt_threads[i].rbuffer, pkt);
1707                        }
1708                } else {
1709                        fprintf(stderr, "Mapper threads should not be used to pause a trace this could cause any number of problems!!\n");
1710                }
1711        }
1712
1713        if (t) {
1714                // A perpkt is doing the pausing, interesting, fake an extra thread paused
1715                // We rely on the user to *not* return before starting the trace again
1716                thread_change_state(libtrace, t, THREAD_PAUSED, true);
1717        }
1718
1719        // Wait for all threads to pause
1720        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1721        while(libtrace->perpkt_thread_states[THREAD_RUNNING]) {
1722                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
1723        }
1724        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1725
1726        if (libtrace->config.debug_state)
1727                fprintf(stderr, " DONE\n");
1728
1729        // Deal with the reporter
1730        if (trace_has_dedicated_reporter(libtrace)) {
1731                if (libtrace->config.debug_state)
1732                        fprintf(stderr, "Reporter thread is running, asking it to pause ...");
1733                libtrace_message_t message = {0};
1734                message.code = MESSAGE_DO_PAUSE;
1735                trace_send_message_to_thread(libtrace, &libtrace->reporter_thread, &message);
1736                // Wait for it to pause
1737                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1738                while (libtrace->reporter_thread.state == THREAD_RUNNING) {
1739                        ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
1740                }
1741                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1742                if (libtrace->config.debug_state)
1743                        fprintf(stderr, " DONE\n");
1744        }
1745
1746        if (trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
1747                uint64_t tmp_stats;
1748                libtrace->dropped_packets = trace_get_dropped_packets(libtrace);
1749                libtrace->received_packets = trace_get_received_packets(libtrace);
1750                if (libtrace->format->get_filtered_packets) {
1751                        if ((tmp_stats = libtrace->format->get_filtered_packets(libtrace)) != UINT64_MAX) {
1752                                libtrace->filtered_packets += tmp_stats;
1753                        }
1754                }
1755                libtrace->started = false;
1756                if (libtrace->format->ppause_input)
1757                        libtrace->format->ppause_input(libtrace);
1758                // TODO What happens if we don't have pause input??
1759        } else {
1760                int err;
1761                fprintf(stderr, "Trace is not parallel so we are doing a normal pause %s\n", libtrace->uridata);
1762                err = trace_pause(libtrace);
1763                // We should handle this a bit better
1764                if (err)
1765                        return err;
1766        }
1767
1768        // Only set as paused after the pause has been called on the trace
1769        libtrace_change_state(libtrace, STATE_PAUSED, true);
1770        return 0;
1771}
1772
1773/**
1774 * Stop trace finish prematurely as though it meet an EOF
1775 * This should only be called by the main thread
1776 * 1. Calls ppause
1777 * 2. Sends a message asking for threads to finish
1778 * 3. Releases threads which will pause
1779 */
1780DLLEXPORT int trace_pstop(libtrace_t *libtrace)
1781{
1782        int i, err;
1783        libtrace_message_t message = {0};
1784        assert(libtrace);
1785
1786        // Ensure all threads have paused and the underlying trace format has
1787        // been closed and all packets associated are cleaned up
1788        // Pause will do any state checks for us
1789        err = trace_ppause(libtrace);
1790        if (err)
1791                return err;
1792
1793        // Now send a message asking the threads to stop
1794        // This will be retrieved before trying to read another packet
1795       
1796        message.code = MESSAGE_DO_STOP;
1797        trace_send_message_to_perpkts(libtrace, &message);
1798        if (trace_has_dedicated_hasher(libtrace))
1799                trace_send_message_to_thread(libtrace, &libtrace->hasher_thread, &message);
1800       
1801        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1802                trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message);
1803        }
1804
1805        // Now release the threads and let them stop
1806        libtrace_change_state(libtrace, STATE_FINSHED, true);
1807        return 0;
1808}
1809
1810/**
1811 * Set the hasher type along with a selected function, if hardware supports
1812 * that generic type of hashing it will be used otherwise the supplied
1813 * hasher function will be used and passed data when called.
1814 *
1815 * @return 0 if successful otherwise -1 on error
1816 */
1817DLLEXPORT int trace_set_hasher(libtrace_t *trace, enum hasher_types type, fn_hasher hasher, void *data) {
1818        int ret = -1;
1819        if (type == HASHER_HARDWARE || (type == HASHER_CUSTOM && !hasher) || (type == HASHER_BALANCE && hasher)) {
1820                return -1;
1821        }
1822
1823        // Save the requirements
1824        trace->hasher_type = type;
1825        if (hasher) {
1826                trace->hasher = hasher;
1827                trace->hasher_data = data;
1828        } else {
1829                trace->hasher = NULL;
1830                // TODO consider how to handle freeing this
1831                trace->hasher_data = NULL;
1832        }
1833
1834        // Try push this to hardware - NOTE hardware could do custom if
1835        // there is a more efficient way to apply it, in this case
1836        // it will simply grab the function out of libtrace_t
1837        if (trace->format->pconfig_input)
1838                ret = trace->format->pconfig_input(trace, TRACE_OPTION_SET_HASHER, &type);
1839
1840        if (ret == -1) {
1841                // We have to deal with this ourself
1842                // This most likely means single threaded reading of the trace
1843                if (!hasher) {
1844                        switch (type)
1845                        {
1846                                case HASHER_CUSTOM:
1847                                case HASHER_BALANCE:
1848                                        return 0;
1849                                case HASHER_BIDIRECTIONAL:
1850                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
1851                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
1852                                        toeplitz_init_config(trace->hasher_data, 1);
1853                                        return 0;
1854                                case HASHER_UNIDIRECTIONAL:
1855                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
1856                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
1857                                        toeplitz_init_config(trace->hasher_data, 0);
1858                                        return 0;
1859                                case HASHER_HARDWARE:
1860                                        return -1;
1861                        }
1862                        return -1;
1863                }
1864        } else {
1865                // The hardware is dealing with this yay
1866                trace->hasher_type = HASHER_HARDWARE;
1867        }
1868
1869        return 0;
1870}
1871
1872// Waits for all threads to finish
1873DLLEXPORT void trace_join(libtrace_t *libtrace) {
1874        int i;
1875
1876        /* Firstly wait for the perpkt threads to finish, since these are
1877         * user controlled */
1878        for (i=0; i< libtrace->perpkt_thread_count; i++) {
1879                //printf("Waiting to join with perpkt #%d\n", i);
1880                ASSERT_RET(pthread_join(libtrace->perpkt_threads[i].tid, NULL), == 0);
1881                //printf("Joined with perpkt #%d\n", i);
1882                // So we must do our best effort to empty the queue - so
1883                // the producer (or any other threads) don't block.
1884                libtrace_packet_t * packet;
1885                assert(libtrace->perpkt_threads[i].state == THREAD_FINISHED);
1886                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
1887                        if (packet) // This could be NULL iff the perpkt finishes early
1888                                trace_destroy_packet(packet);
1889        }
1890
1891        /* Now the hasher */
1892        if (trace_has_dedicated_hasher(libtrace)) {
1893                pthread_join(libtrace->hasher_thread.tid, NULL);
1894                assert(libtrace->hasher_thread.state == THREAD_FINISHED);
1895        }
1896
1897        // Now that everything is finished nothing can be touching our
1898        // buffers so clean them up
1899        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1900                // Its possible 1 packet got added by the reporter (or 1 per any other thread) since we cleaned up
1901                // if they lost timeslice before-during a write
1902                libtrace_packet_t * packet;
1903                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
1904                        trace_destroy_packet(packet);
1905                if (libtrace->hasher) {
1906                        assert(libtrace_ringbuffer_is_empty(&libtrace->perpkt_threads[i].rbuffer));
1907                        libtrace_ringbuffer_destroy(&libtrace->perpkt_threads[i].rbuffer);
1908                }
1909                // Cannot destroy vector yet, this happens with trace_destroy
1910        }
1911        // TODO consider perpkt threads marking trace as finished before join is called
1912        libtrace_change_state(libtrace, STATE_FINSHED, true);
1913
1914        if (trace_has_dedicated_reporter(libtrace)) {
1915                pthread_join(libtrace->reporter_thread.tid, NULL);
1916                assert(libtrace->reporter_thread.state == THREAD_FINISHED);
1917        }
1918       
1919        // Wait for the tick (keepalive) thread if it has been started
1920        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
1921                libtrace_message_t msg = {0};
1922                msg.code = MESSAGE_DO_STOP;
1923                trace_send_message_to_thread(libtrace, &libtrace->keepalive_thread, &msg);
1924                pthread_join(libtrace->keepalive_thread.tid, NULL);
1925        }
1926       
1927        libtrace_change_state(libtrace, STATE_JOINED, true);
1928        print_memory_stats();
1929}
1930
1931DLLEXPORT int libtrace_thread_get_message_count(libtrace_t * libtrace)
1932{
1933        libtrace_thread_t * t = get_thread_descriptor(libtrace);
1934        assert(t);
1935        return libtrace_message_queue_count(&t->messages);
1936}
1937
1938DLLEXPORT int libtrace_thread_get_message(libtrace_t * libtrace, libtrace_message_t * message)
1939{
1940        libtrace_thread_t * t = get_thread_descriptor(libtrace);
1941        assert(t);
1942        return libtrace_message_queue_get(&t->messages, message);
1943}
1944
1945DLLEXPORT int libtrace_thread_try_get_message(libtrace_t * libtrace, libtrace_message_t * message)
1946{
1947        libtrace_thread_t * t = get_thread_descriptor(libtrace);
1948        assert(t);
1949        return libtrace_message_queue_try_get(&t->messages, message);
1950}
1951
1952/**
1953 * Return backlog indicator
1954 */
1955DLLEXPORT int trace_post_reporter(libtrace_t *libtrace)
1956{
1957        libtrace_message_t message = {0};
1958        message.code = MESSAGE_POST_REPORTER;
1959        message.sender = get_thread_descriptor(libtrace);
1960        return libtrace_message_queue_put(&libtrace->reporter_thread.messages, (void *) &message);
1961}
1962
1963/**
1964 * Return backlog indicator
1965 */
1966DLLEXPORT int trace_send_message_to_reporter(libtrace_t * libtrace, libtrace_message_t * message)
1967{
1968        //printf("Sending message code=%d to reporter\n", message->code);
1969        message->sender = get_thread_descriptor(libtrace);
1970        return libtrace_message_queue_put(&libtrace->reporter_thread.messages, message);
1971}
1972
1973/**
1974 *
1975 */
1976DLLEXPORT int trace_send_message_to_thread(libtrace_t * libtrace, libtrace_thread_t *t, libtrace_message_t * message)
1977{
1978        //printf("Sending message code=%d to reporter\n", message->code);
1979        message->sender = get_thread_descriptor(libtrace);
1980        return libtrace_message_queue_put(&t->messages, message);
1981}
1982
1983DLLEXPORT int trace_send_message_to_perpkts(libtrace_t * libtrace, libtrace_message_t * message)
1984{
1985        int i;
1986        message->sender = get_thread_descriptor(libtrace);
1987        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1988                libtrace_message_queue_put(&libtrace->perpkt_threads[i].messages, message);
1989        }
1990        //printf("Sending message code=%d to reporter\n", message->code);
1991        return 0;
1992}
1993
1994DLLEXPORT void libtrace_result_set_key(libtrace_result_t * result, uint64_t key) {
1995        result->key = key;
1996}
1997DLLEXPORT uint64_t libtrace_result_get_key(libtrace_result_t * result) {
1998        return result->key;
1999}
2000DLLEXPORT void libtrace_result_set_value(libtrace_result_t * result, void * value) {
2001        result->value = value;
2002}
2003DLLEXPORT void* libtrace_result_get_value(libtrace_result_t * result) {
2004        return result->value;
2005}
2006DLLEXPORT void libtrace_result_set_key_value(libtrace_result_t * result, uint64_t key, void * value) {
2007        result->key = key;
2008        result->value = value;
2009}
2010DLLEXPORT void trace_destroy_result(libtrace_result_t ** result) {
2011        free(*result);
2012        result = NULL;
2013        // TODO automatically back with a free list!!
2014}
2015
2016DLLEXPORT void * trace_get_global(libtrace_t *trace)
2017{
2018        return trace->global_blob;
2019}
2020
2021DLLEXPORT void * trace_set_global(libtrace_t *trace, void * data)
2022{
2023        if (trace->global_blob && trace->global_blob != data) {
2024                void * ret = trace->global_blob;
2025                trace->global_blob = data;
2026                return ret;
2027        } else {
2028                trace->global_blob = data;
2029                return NULL;
2030        }
2031}
2032
2033DLLEXPORT void * trace_get_tls(libtrace_thread_t *t)
2034{
2035        return t->user_data;
2036}
2037
2038DLLEXPORT void * trace_set_tls(libtrace_thread_t *t, void * data)
2039{
2040        if(t->user_data && t->user_data != data) {
2041                void *ret = t->user_data;
2042                t->user_data = data;
2043                return ret;
2044        } else {
2045                t->user_data = data;
2046                return NULL;
2047        }
2048}
2049
2050/**
2051 * Publishes a result to the reduce queue
2052 * Should only be called by a perpkt thread, i.e. from a perpkt handler
2053 */
2054DLLEXPORT void trace_publish_result(libtrace_t *libtrace, libtrace_thread_t *t, uint64_t key, void * value, int type) {
2055        libtrace_result_t res;
2056        res.type = type;
2057        res.key = key;
2058        res.value = value;
2059        assert(libtrace->combiner.publish);
2060        libtrace->combiner.publish(libtrace, t->perpkt_num, &libtrace->combiner, &res);
2061        return;
2062}
2063
2064DLLEXPORT uint64_t trace_packet_get_order(libtrace_packet_t * packet) {
2065        return packet->order;
2066}
2067
2068DLLEXPORT uint64_t trace_packet_get_hash(libtrace_packet_t * packet) {
2069        return packet->hash;
2070}
2071
2072DLLEXPORT void trace_packet_set_order(libtrace_packet_t * packet, uint64_t order) {
2073        packet->order = order;
2074}
2075
2076DLLEXPORT void trace_packet_set_hash(libtrace_packet_t * packet, uint64_t hash) {
2077        packet->hash = hash;
2078}
2079
2080DLLEXPORT int trace_finished(libtrace_t * libtrace) {
2081        // TODO I don't like using this so much, we could use state!!!
2082        return libtrace->perpkt_thread_states[THREAD_FINISHED] == libtrace->perpkt_thread_count;
2083}
2084
2085DLLEXPORT int trace_parallel_config(libtrace_t *libtrace, trace_parallel_option_t option, void *value)
2086{
2087        UNUSED int ret = -1;
2088        switch (option) {
2089                case TRACE_OPTION_TICK_INTERVAL:
2090                        libtrace->config.tick_interval = *((int *) value);
2091                        return 1;
2092                case TRACE_OPTION_SET_HASHER:
2093                        return trace_set_hasher(libtrace, (enum hasher_types) *((int *) value), NULL, NULL);
2094                case TRACE_OPTION_SET_PERPKT_THREAD_COUNT:
2095                        libtrace->config.perpkt_threads = *((int *) value);
2096                        return 1;
2097                case TRACE_DROP_OUT_OF_ORDER:
2098                        if (*((int *) value))
2099                                libtrace->reporter_flags |= REDUCE_DROP_OOO;
2100                        else
2101                                libtrace->reporter_flags &= ~REDUCE_DROP_OOO;
2102                        return 1;
2103                case TRACE_OPTION_SEQUENTIAL:
2104                        libtrace->combiner = combiner_ordered;
2105                        if (*((int *) value))
2106                                libtrace->reporter_flags |= REDUCE_SEQUENTIAL;
2107                        else
2108                                libtrace->reporter_flags &= ~REDUCE_SEQUENTIAL;
2109                        return 1;
2110                case TRACE_OPTION_ORDERED:
2111                        libtrace->combiner = combiner_ordered;
2112                        if (*((int *) value))
2113                                libtrace->reporter_flags |= REDUCE_ORDERED;
2114                        else
2115                                libtrace->reporter_flags &= ~REDUCE_ORDERED;
2116                        return 1;
2117                case TRACE_OPTION_TRACETIME:
2118                        if(*((int *) value))
2119                                libtrace->tracetime = 1;
2120                        else
2121                                libtrace->tracetime = 0;
2122                        return 0;
2123                case TRACE_OPTION_SET_CONFIG:
2124                        libtrace->config = *((struct user_configuration *) value);
2125                case TRACE_OPTION_GET_CONFIG:
2126                        *((struct user_configuration *) value) = libtrace->config;
2127        }
2128        return 0;
2129}
2130
2131static bool config_bool_parse(char *value, size_t nvalue) {
2132        if (strncmp(value, "true", nvalue) == 0)
2133                return true;
2134        else if (strncmp(value, "false", nvalue) == 0)
2135                return false;
2136        else
2137                return strtoll(value, NULL, 10) != 0;
2138}
2139
2140static void config_string(struct user_configuration *uc, char *key, size_t nkey, char *value, size_t nvalue) {
2141        assert(key);
2142        assert(value);
2143        assert(uc);
2144        if (strncmp(key, "packet_cache_size", nkey) == 0
2145                || strncmp(key, "pcs", nkey) == 0) {
2146                uc->packet_cache_size = strtoll(value, NULL, 10);
2147        } else if (strncmp(key, "packet_thread_cache_size", nkey) == 0
2148                           || strncmp(key, "ptcs", nkey) == 0) {
2149                uc->packet_thread_cache_size = strtoll(value, NULL, 10);
2150        } else if (strncmp(key, "fixed_packet_count", nkey) == 0
2151                  || strncmp(key, "fpc", nkey) == 0) {
2152                uc->fixed_packet_count = config_bool_parse(value, nvalue);
2153        } else if (strncmp(key, "burst_size", nkey) == 0
2154                   || strncmp(key, "bs", nkey) == 0) {
2155                uc->burst_size = strtoll(value, NULL, 10);
2156        } else if (strncmp(key, "tick_interval", nkey) == 0
2157                   || strncmp(key, "ti", nkey) == 0) {
2158                uc->tick_interval = strtoll(value, NULL, 10);
2159        } else if (strncmp(key, "tick_count", nkey) == 0
2160                   || strncmp(key, "tc", nkey) == 0) {
2161                uc->tick_count = strtoll(value, NULL, 10);
2162        } else if (strncmp(key, "perpkt_threads", nkey) == 0
2163                   || strncmp(key, "pt", nkey) == 0) {
2164                uc->perpkt_threads = strtoll(value, NULL, 10);
2165        } else if (strncmp(key, "hasher_queue_size", nkey) == 0
2166                   || strncmp(key, "hqs", nkey) == 0) {
2167                uc->hasher_queue_size = strtoll(value, NULL, 10);
2168        } else if (strncmp(key, "hasher_polling", nkey) == 0
2169                   || strncmp(key, "hp", nkey) == 0) {
2170                uc->hasher_polling = config_bool_parse(value, nvalue);
2171        } else if (strncmp(key, "reporter_polling", nkey) == 0
2172                   || strncmp(key, "rp", nkey) == 0) {
2173                uc->reporter_polling = config_bool_parse(value, nvalue);
2174        } else if (strncmp(key, "reporter_thold", nkey) == 0
2175                   || strncmp(key, "rt", nkey) == 0) {
2176                uc->reporter_thold = strtoll(value, NULL, 10);
2177        } else if (strncmp(key, "debug_state", nkey) == 0
2178                   || strncmp(key, "ds", nkey) == 0) {
2179                uc->debug_state = config_bool_parse(value, nvalue);
2180        } else {
2181                fprintf(stderr, "No matching value %s(=%s)\n", key, value);
2182        }
2183}
2184
2185DLLEXPORT void parse_user_config(struct user_configuration* uc, char * str) {
2186        char *pch;
2187        char key[100];
2188        char value[100];
2189        assert(str);
2190        assert(uc);
2191        printf ("Splitting string \"%s\" into tokens:\n",str);
2192        pch = strtok (str," ,.-");
2193        while (pch != NULL)
2194        {
2195                if (sscanf(pch, "%99[^=]=%99s", key, value) == 2) {
2196                        config_string(uc, key, sizeof(key), value, sizeof(value));
2197                } else {
2198                        fprintf(stderr, "Error parsing %s\n", pch);
2199                }
2200                pch = strtok (NULL," ,.-");
2201        }
2202}
2203
2204DLLEXPORT void parse_user_config_file(struct user_configuration* uc, FILE *file) {
2205        char line[1024];
2206        while (fgets(line, sizeof(line), file) != NULL)
2207        {
2208                parse_user_config(uc, line);
2209        }
2210}
2211
2212DLLEXPORT libtrace_packet_t* trace_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet) {
2213        libtrace_packet_t* result;
2214        libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &result, 1, 1);
2215        assert(result);
2216        swap_packets(result, packet); // Move the current packet into our copy
2217        return result;
2218}
2219
2220DLLEXPORT void trace_free_result_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
2221        // Try write back the packet
2222        assert(packet);
2223        // Always release any resources this might be holding such as a slot in a ringbuffer
2224        trace_fin_packet(packet);
2225        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packet, 1, 1);
2226}
2227
2228DLLEXPORT libtrace_info_t *trace_get_information(libtrace_t * libtrace) {
2229        if (libtrace->format)
2230                return &libtrace->format->info;
2231        else
2232                return NULL;
2233}
Note: See TracBrowser for help on using the repository browser.