source: lib/trace_parallel.c @ 10c47a0

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 10c47a0 was 10c47a0, checked in by Richard Sanger <rsangerarj@…>, 6 years ago

Fixes DAG DUCK reporting for parallel libtrace.
In parallel libtrace DUCK is only ever sent to the first thread.

It is now up each formats pread_packet to tag the trace along with
the error (AKA bytes read) to each packet.

Change logic in parallel libtrace to alwaus prefer pread over read if
it exists.

Fix some unresolved conflict in DPDK that I missed, that was ifdef'd out.

  • Property mode set to 100644
File size: 79.7 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * All rights reserved.
8 *
9 * This code has been developed by the University of Waikato WAND
10 * research group. For further information please see http://www.wand.net.nz/
11 *
12 * libtrace is free software; you can redistribute it and/or modify
13 * it under the terms of the GNU General Public License as published by
14 * the Free Software Foundation; either version 2 of the License, or
15 * (at your option) any later version.
16 *
17 * libtrace is distributed in the hope that it will be useful,
18 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 * GNU General Public License for more details.
21 *
22 * You should have received a copy of the GNU General Public License
23 * along with libtrace; if not, write to the Free Software
24 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25 *
26 * $Id$
27 *
28 */
29
30
31#define _GNU_SOURCE
32#include "common.h"
33#include "config.h"
34#include <assert.h>
35#include <errno.h>
36#include <fcntl.h>
37#include <stdio.h>
38#include <stdlib.h>
39#include <string.h>
40#include <sys/stat.h>
41#include <sys/types.h>
42#ifndef WIN32
43#include <sys/socket.h>
44#endif
45#include <stdarg.h>
46#include <sys/param.h>
47
48#ifdef HAVE_LIMITS_H
49#  include <limits.h>
50#endif
51
52#ifdef HAVE_SYS_LIMITS_H
53#  include <sys/limits.h>
54#endif
55
56#ifdef HAVE_NET_IF_ARP_H
57#  include <net/if_arp.h>
58#endif
59
60#ifdef HAVE_NET_IF_H
61#  include <net/if.h>
62#endif
63
64#ifdef HAVE_NETINET_IN_H
65#  include <netinet/in.h>
66#endif
67
68#ifdef HAVE_NET_ETHERNET_H
69#  include <net/ethernet.h>
70#endif
71
72#ifdef HAVE_NETINET_IF_ETHER_H
73#  include <netinet/if_ether.h>
74#endif
75
76#include <time.h>
77#ifdef WIN32
78#include <sys/timeb.h>
79#endif
80
81#include "libtrace.h"
82#include "libtrace_int.h"
83
84#ifdef HAVE_PCAP_BPF_H
85#  include <pcap-bpf.h>
86#else
87#  ifdef HAVE_NET_BPF_H
88#    include <net/bpf.h>
89#  endif
90#endif
91
92
93#include "libtrace_int.h"
94#include "format_helper.h"
95#include "rt_protocol.h"
96#include "hash_toeplitz.h"
97#include "combiners.h"
98
99#include <pthread.h>
100#include <signal.h>
101#include <unistd.h>
102
103static inline int delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t);
104extern int libtrace_parallel;
105
106struct multithreading_stats {
107        uint64_t full_queue_hits;
108        uint64_t wait_for_fill_complete_hits;
109} contention_stats[1024];
110
111struct mem_stats {
112        struct memfail {
113           uint64_t cache_hit;
114           uint64_t ring_hit;
115           uint64_t miss;
116           uint64_t recycled;
117        } readbulk, read, write, writebulk;
118};
119
120// Grrr gcc wants this spelt out
121__thread struct mem_stats mem_hits = {{0},{0},{0},{0}};
122
123static void print_memory_stats() {
124#if 0
125        char t_name[50];
126        uint64_t total;
127        pthread_getname_np(pthread_self(), t_name, sizeof(t_name));
128
129        fprintf(stderr, "Thread ID#%d - %s\n", (int) pthread_self(), t_name);
130
131        total = mem_hits.read.cache_hit + mem_hits.read.ring_hit + mem_hits.read.miss;
132        if (total) {
133                fprintf(stderr, "\tRead:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
134                                mem_hits.read.cache_hit, mem_hits.read.ring_hit, mem_hits.read.miss, mem_hits.read.recycled);
135                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
136                                total, (double) mem_hits.read.miss / (double) total * 100.0);
137        }
138
139        total = mem_hits.readbulk.cache_hit + mem_hits.readbulk.ring_hit + mem_hits.readbulk.miss;
140        if (total) {
141                fprintf(stderr, "\tReadbulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
142                                mem_hits.readbulk.cache_hit, mem_hits.readbulk.ring_hit, mem_hits.readbulk.miss, mem_hits.readbulk.recycled);
143
144
145                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
146                                total, (double) mem_hits.readbulk.miss / (double) total * 100.0);
147        }
148
149        total = mem_hits.write.cache_hit + mem_hits.write.ring_hit + mem_hits.write.miss;
150        if (total) {
151                fprintf(stderr, "\tWrite:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
152                                mem_hits.write.cache_hit, mem_hits.write.ring_hit, mem_hits.write.miss, mem_hits.write.recycled);
153
154                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
155                                total, (double) mem_hits.write.miss / (double) total * 100.0);
156        }
157
158        total = mem_hits.writebulk.cache_hit + mem_hits.writebulk.ring_hit + mem_hits.writebulk.miss;
159        if (total) {
160                fprintf(stderr, "\tWritebulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
161                                mem_hits.writebulk.cache_hit, mem_hits.writebulk.ring_hit, mem_hits.writebulk.miss, mem_hits.writebulk.recycled);
162
163                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
164                                total, (double) mem_hits.writebulk.miss / (double) total * 100.0);
165        }
166#endif
167}
168
169/**
170 * This can be used once the hasher thread has been started and internally after
171 * verfiy_configuration.
172 *
173 * @return true if the trace has dedicated hasher thread otherwise false.
174 */
175static inline bool trace_has_dedicated_hasher(libtrace_t * libtrace)
176{
177        return libtrace->hasher_thread.type == THREAD_HASHER;
178}
179
180/**
181 * True if the trace has dedicated hasher thread otherwise false,
182 * to be used after the trace is running
183 */
184static inline int trace_has_dedicated_reporter(libtrace_t * libtrace)
185{
186        assert(libtrace->state != STATE_NEW);
187        return libtrace->reporter_thread.type == THREAD_REPORTER && libtrace->reporter;
188}
189
190/**
191 * When running the number of perpkt threads in use.
192 * TODO what if the trace is not running yet, or has finished??
193 *
194 * @brief libtrace_perpkt_thread_nb
195 * @param t The trace
196 * @return
197 */
198DLLEXPORT int libtrace_get_perpkt_count(libtrace_t * t) {
199        return t->perpkt_thread_count;
200}
201
202/**
203 * Changes a thread's state and broadcasts the condition variable. This
204 * should always be done when the lock is held.
205 *
206 * Additionally for perpkt threads the state counts are updated.
207 *
208 * @param trace A pointer to the trace
209 * @param t A pointer to the thread to modify
210 * @param new_state The new state of the thread
211 * @param need_lock Set to true if libtrace_lock is not held, otherwise
212 *        false in the case the lock is currently held by this thread.
213 */
214static inline void thread_change_state(libtrace_t *trace, libtrace_thread_t *t,
215        const enum thread_states new_state, const bool need_lock)
216{
217        enum thread_states prev_state;
218        if (need_lock)
219                pthread_mutex_lock(&trace->libtrace_lock);
220        prev_state = t->state;
221        t->state = new_state;
222        if (t->type == THREAD_PERPKT) {
223                --trace->perpkt_thread_states[prev_state];
224                ++trace->perpkt_thread_states[new_state];
225        }
226
227        if (trace->config.debug_state)
228                fprintf(stderr, "Thread %d state changed from %d to %d\n",
229                        (int) t->tid, prev_state, t->state);
230
231        pthread_cond_broadcast(&trace->perpkt_cond);
232        if (need_lock)
233                pthread_mutex_unlock(&trace->libtrace_lock);
234}
235
236/**
237 * Changes the overall traces state and signals the condition.
238 *
239 * @param trace A pointer to the trace
240 * @param new_state The new state of the trace
241 * @param need_lock Set to true if libtrace_lock is not held, otherwise
242 *        false in the case the lock is currently held by this thread.
243 */
244static inline void libtrace_change_state(libtrace_t *trace,
245        const enum trace_state new_state, const bool need_lock)
246{
247        UNUSED enum trace_state prev_state;
248        if (need_lock)
249                pthread_mutex_lock(&trace->libtrace_lock);
250        prev_state = trace->state;
251        trace->state = new_state;
252
253        if (trace->config.debug_state)
254                fprintf(stderr, "Trace(%s) state changed from %s to %s\n",
255                        trace->uridata, get_trace_state_name(prev_state),
256                        get_trace_state_name(trace->state));
257
258        pthread_cond_broadcast(&trace->perpkt_cond);
259        if (need_lock)
260                pthread_mutex_unlock(&trace->libtrace_lock);
261}
262
263/**
264 * This is valid once a trace is initialised
265 *
266 * @return True if the format supports parallel threads.
267 */
268static inline bool trace_supports_parallel(libtrace_t *trace)
269{
270        assert(trace);
271        assert(trace->format);
272        if (trace->format->pstart_input)
273                return true;
274        else
275                return false;
276}
277
278DLLEXPORT void print_contention_stats(libtrace_t *libtrace) {
279        int i;
280        struct multithreading_stats totals = {0};
281        for (i = 0; i < libtrace->perpkt_thread_count ; i++) {
282                fprintf(stderr, "\nStats for perpkt thread#%d\n", i);
283                fprintf(stderr, "\tfull_queue_hits: %"PRIu64"\n", contention_stats[i].full_queue_hits);
284                totals.full_queue_hits += contention_stats[i].full_queue_hits;
285                fprintf(stderr, "\twait_for_fill_complete_hits: %"PRIu64"\n", contention_stats[i].wait_for_fill_complete_hits);
286                totals.wait_for_fill_complete_hits += contention_stats[i].wait_for_fill_complete_hits;
287        }
288        fprintf(stderr, "\nTotals for perpkt threads\n");
289        fprintf(stderr, "\tfull_queue_hits: %"PRIu64"\n", totals.full_queue_hits);
290        fprintf(stderr, "\twait_for_fill_complete_hits: %"PRIu64"\n", totals.wait_for_fill_complete_hits);
291
292        return;
293}
294
295void libtrace_zero_thread(libtrace_thread_t * t) {
296        t->accepted_packets = 0;
297        t->filtered_packets = 0;
298        t->recorded_first = false;
299        t->tracetime_offset_usec = 0;
300        t->user_data = 0;
301        t->format_data = 0;
302        libtrace_zero_ringbuffer(&t->rbuffer);
303        t->trace = NULL;
304        t->ret = NULL;
305        t->type = THREAD_EMPTY;
306        t->perpkt_num = -1;
307}
308
309// Ints are aligned int is atomic so safe to read and write at same time
310// However write must be locked, read doesn't (We never try read before written to table)
311libtrace_thread_t * get_thread_table(libtrace_t *libtrace) {
312        int i = 0;
313        pthread_t tid = pthread_self();
314
315        for (;i<libtrace->perpkt_thread_count ;++i) {
316                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
317                        return &libtrace->perpkt_threads[i];
318        }
319        return NULL;
320}
321
322int get_thread_table_num(libtrace_t *libtrace) {
323        int i = 0;
324        pthread_t tid = pthread_self();
325        for (;i<libtrace->perpkt_thread_count; ++i) {
326                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
327                        return i;
328        }
329        return -1;
330}
331
332static libtrace_thread_t * get_thread_descriptor(libtrace_t *libtrace) {
333        libtrace_thread_t *ret;
334        if (!(ret = get_thread_table(libtrace))) {
335                pthread_t tid = pthread_self();
336                // Check if we are reporter or something else
337                if (pthread_equal(tid, libtrace->reporter_thread.tid))
338                        ret = &libtrace->reporter_thread;
339                else if (pthread_equal(tid, libtrace->hasher_thread.tid))
340                        ret = &libtrace->hasher_thread;
341                else
342                        ret = NULL;
343        }
344        return ret;
345}
346
347/** Makes a packet safe, a packet may become invaild after a
348 * pause (or stop/destroy) of a trace. This copies a packet
349 * in such a way that it will be able to survive a pause.
350 *
351 * However this will not allow the packet to be used after
352 * the format is destroyed. Or while the trace is still paused.
353 */
354DLLEXPORT void libtrace_make_packet_safe(libtrace_packet_t *pkt) {
355        // Duplicate the packet in standard malloc'd memory and free the
356        // original, This is a 1:1 exchange so is ocache count remains unchanged.
357        if (pkt->buf_control != TRACE_CTRL_PACKET) {
358                libtrace_packet_t *dup;
359                dup = trace_copy_packet(pkt);
360                /* Release the external buffer */
361                trace_fin_packet(pkt);
362                /* Copy the duplicated packet over the existing */
363                memcpy(pkt, dup, sizeof(libtrace_packet_t));
364        }
365}
366
367/**
368 * Makes a libtrace_result_t safe, used when pausing a trace.
369 * This will call libtrace_make_packet_safe if the result is
370 * a packet.
371 */
372DLLEXPORT void libtrace_make_result_safe(libtrace_result_t *res) {
373        if (res->type == RESULT_PACKET) {
374                libtrace_make_packet_safe(res->value.pkt);
375        }
376}
377
378/**
379 * Holds threads in a paused state, until released by broadcasting
380 * the condition mutex.
381 */
382static void trace_thread_pause(libtrace_t *trace, libtrace_thread_t *t) {
383        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
384        thread_change_state(trace, t, THREAD_PAUSED, false);
385        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
386                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
387        }
388        thread_change_state(trace, t, THREAD_RUNNING, false);
389        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
390}
391
392/**
393 * Sends a packet to the user, expects either a valid packet or a TICK packet.
394 *
395 * Note READ_MESSAGE will only be returned if tracetime is true.
396 *
397 * @brief dispatch_packet
398 * @param trace
399 * @param t
400 * @param packet A pointer to the packet storage, which may be set to null upon
401 *               return, or a packet to be finished.
402 * @return 0 is successful, otherwise if playing back in tracetime
403 *         READ_MESSAGE(-2) can be returned in which case the packet is not sent.
404 */
405static inline int dispatch_packet(libtrace_t *trace,
406                                                 libtrace_thread_t *t,
407                                                 libtrace_packet_t **packet,
408                                                 bool tracetime) {
409        if ((*packet)->error > 0) {
410                if (tracetime) {
411                        if (delay_tracetime(trace, packet[0], t) == READ_MESSAGE)
412                                return READ_MESSAGE;
413                }
414                *packet = (*trace->per_pkt)(trace, *packet, NULL, t);
415                trace_fin_packet(*packet);
416        } else {
417                libtrace_message_t message;
418                assert((*packet)->error == READ_TICK);
419                message.code = MESSAGE_TICK;
420                message.additional.uint64 = trace_packet_get_order(*packet);
421                message.sender = t;
422                (*trace->per_pkt)(trace, NULL, &message, t);
423        }
424        return 0;
425}
426
427/**
428 * Pauses a per packet thread, messages will not be processed when the thread
429 * is paused.
430 *
431 * This process involves reading packets if a hasher thread is used. As such
432 * this function can fail to pause due to errors when reading in which case
433 * the thread should be stopped instead.
434 *
435 *
436 * @brief trace_perpkt_thread_pause
437 * @return READ_ERROR(-1) or READ_EOF(0) or 1 if successfull
438 */
439static int trace_perpkt_thread_pause(libtrace_t *trace, libtrace_thread_t *t,
440                                     libtrace_packet_t *packets[],
441                                     int *nb_packets, int *empty, int *offset) {
442        libtrace_message_t message = {0};
443        libtrace_packet_t * packet = NULL;
444
445        /* Let the user thread know we are going to pause */
446        message.code = MESSAGE_PAUSING;
447        message.sender = t;
448        (*trace->per_pkt)(trace, NULL, &message, t);
449
450        /* Send through any remaining packets (or messages) without delay */
451
452        /* First send those packets already read, as fast as possible
453         * This should never fail or check for messages etc. */
454        for (;*offset < *nb_packets; ++*offset) {
455                ASSERT_RET(dispatch_packet(trace, t, &packets[*offset], false), == 0);
456                /* Move full slots to front as we go */
457                if (packets[*offset]) {
458                        if (*empty != *offset) {
459                                packets[*empty] = packets[*offset];
460                                packets[*offset] = NULL;
461                        }
462                        ++*empty;
463                }
464        }
465
466        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
467        /* If a hasher thread is running, empty input queues so we don't lose data */
468        if (trace_has_dedicated_hasher(trace)) {
469                fprintf(stderr, "Trace is using a hasher thread emptying queues\n");
470                // The hasher has stopped by this point, so the queue shouldn't be filling
471                while(!libtrace_ringbuffer_is_empty(&t->rbuffer) || t->format_data) {
472                        int ret = trace->pread(trace, t, &packet, 1);
473                        if (ret == 1) {
474                                if (packet->error > 0) {
475                                        store_first_packet(trace, packet, t);
476                                }
477                                ASSERT_RET(dispatch_packet(trace, t, &packet, 1), == 0);
478                                if (packet == NULL)
479                                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
480                        } else if (ret != READ_MESSAGE) {
481                                /* Ignore messages we pick these up next loop */
482                                assert (ret == READ_EOF || ret == READ_ERROR);
483                                /* Verify no packets are remaining */
484                                /* TODO refactor this sanity check out!! */
485                                while (!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
486                                        ASSERT_RET(trace->pread(trace, t, &packet, 1), <= 0);
487                                        // No packets after this should have any data in them
488                                        assert(packet->error <= 0);
489                                }
490                                fprintf(stderr, "PREAD_FAILED %d\n", ret);
491                                libtrace_ocache_free(&trace->packet_freelist, (void **) &packet, 1, 1);
492                                return -1;
493                        }
494                }
495        }
496        libtrace_ocache_free(&trace->packet_freelist, (void **) &packet, 1, 1);
497
498        /* Now we do the actual pause, this returns when we resumed */
499        trace_thread_pause(trace, t);
500        message.code = MESSAGE_RESUMING;
501        (*trace->per_pkt)(trace, NULL, &message, t);
502        return 1;
503}
504
505/**
506 * The is the entry point for our packet processing threads.
507 */
508static void* perpkt_threads_entry(void *data) {
509        libtrace_t *trace = (libtrace_t *)data;
510        libtrace_thread_t *t;
511        libtrace_message_t message = {0};
512        libtrace_packet_t *packets[trace->config.burst_size];
513        size_t i;
514        //int ret;
515        /* The current reading position into the packets */
516        int offset = 0;
517        /* The number of packets last read */
518        int nb_packets = 0;
519        /* The offset to the first NULL packet upto offset */
520        int empty = 0;
521
522        /* Wait until trace_pstart has been completed */
523        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
524        t = get_thread_table(trace);
525        assert(t);
526        if (trace->state == STATE_ERROR) {
527                thread_change_state(trace, t, THREAD_FINISHED, false);
528                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
529                pthread_exit(NULL);
530        }
531        //printf("Yay Started perpkt thread #%d\n", (int) get_thread_table_num(trace));
532        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
533
534        if (trace->format->pregister_thread) {
535                trace->format->pregister_thread(trace, t, !trace_has_dedicated_hasher(trace));
536        }
537
538        /* Fill our buffer with empty packets */
539        memset(&packets, 0, sizeof(void*) * trace->config.burst_size);
540        libtrace_ocache_alloc(&trace->packet_freelist, (void **) packets,
541                              trace->config.burst_size,
542                              trace->config.burst_size);
543
544        /* ~~~~~~~~~~~ Setup complete now we loop ~~~~~~~~~~~~~~~ */
545        // Send a message to say we've started
546
547        // Let the per_packet function know we have started
548        message.code = MESSAGE_STARTING;
549        message.sender = t;
550        (*trace->per_pkt)(trace, NULL, &message, t);
551        message.code = MESSAGE_RESUMING;
552        (*trace->per_pkt)(trace, NULL, &message, t);
553
554
555        for (;;) {
556
557                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
558                        int ret;
559                        switch (message.code) {
560                                case MESSAGE_DO_PAUSE: // This is internal
561                                        ret = trace_perpkt_thread_pause(trace, t, packets, &nb_packets, &empty, &offset);
562                                        if (ret == READ_EOF) {
563                                                fprintf(stderr, "PAUSE stop eof!!\n");
564                                                goto eof;
565                                        } else if (ret == READ_ERROR) {
566                                                fprintf(stderr, "PAUSE stop error!!\n");
567                                                goto error;
568                                        }
569                                        assert(ret == 1);
570                                        continue;
571                                case MESSAGE_DO_STOP: // This is internal
572                                        fprintf(stderr, "DO_STOP stop!!\n");
573                                        goto eof;
574                        }
575                        (*trace->per_pkt)(trace, NULL, &message, t);
576                        /* Continue and the empty messages out before packets */
577                        continue;
578                }
579
580
581                /* Do we need to read a new set of packets MOST LIKELY we do */
582                if (offset == nb_packets) {
583                        /* Refill the packet buffer */
584                        if (empty != nb_packets) {
585                                // Refill the empty packets
586                                libtrace_ocache_alloc(&trace->packet_freelist,
587                                                      (void **) &packets[empty],
588                                                      nb_packets - empty,
589                                                      nb_packets - empty);
590                        }
591                        if (!trace->pread) {
592                                assert(packets[0]);
593                                nb_packets = trace_read_packet(trace, packets[0]);
594                                packets[0]->error = nb_packets;
595                                if (nb_packets > 0)
596                                        nb_packets = 1;
597                        } else {
598                                nb_packets = trace->pread(trace, t, packets, trace->config.burst_size);
599                        }
600                        offset = 0;
601                        empty = 0;
602                }
603
604                /* Handle error/message cases */
605                if (nb_packets > 0) {
606                        /* Store the first packet */
607                        if (packets[0]->error > 0) {
608                                store_first_packet(trace, packets[0], t);
609                        }
610                        for (;offset < nb_packets; ++offset) {
611                                int ret;
612                                ret = dispatch_packet(trace, t, &packets[offset], trace->tracetime);
613                                if (ret == 0) {
614                                        /* Move full slots to front as we go */
615                                        if (packets[offset]) {
616                                                if (empty != offset) {
617                                                        packets[empty] = packets[offset];
618                                                        packets[offset] = NULL;
619                                                }
620                                                ++empty;
621                                        }
622                                } else {
623                                        assert(ret == READ_MESSAGE);
624                                        /* Loop around and process the message, note */
625                                        continue;
626                                }
627                        }
628                } else {
629                        switch (nb_packets) {
630                        case READ_EOF:
631                                fprintf(stderr, "EOF stop %d!!\n", nb_packets);
632                                goto eof;
633                        case READ_ERROR:
634                                fprintf(stderr, "ERROR stop %d!!\n", nb_packets);
635                                goto error;
636                        case READ_MESSAGE:
637                                nb_packets = 0;
638                                continue;
639                        default:
640                                fprintf(stderr, "Unexpected error %d!!\n", nb_packets);
641                                goto error;
642                        }
643                }
644
645        }
646
647error:
648        fprintf(stderr, "An error occured in trace\n");
649        message.code = MESSAGE_DO_STOP;
650        message.sender = t;
651        message.additional.uint64 = 0;
652        trace_send_message_to_perpkts(trace, &message);
653eof:
654        fprintf(stderr, "An eof occured in trace\n");
655        /* ~~~~~~~~~~~~~~ Trace is finished do tear down ~~~~~~~~~~~~~~~~~~~~~ */
656
657        // Let the per_packet function know we have stopped
658        message.code = MESSAGE_PAUSING;
659        message.sender = t;
660        (*trace->per_pkt)(trace, NULL, &message, t);
661        message.code = MESSAGE_STOPPING;
662        message.additional.uint64 = 0;
663        (*trace->per_pkt)(trace, NULL, &message, t);
664
665        // Free any remaining packets
666        for (i = 0; i < trace->config.burst_size; i++) {
667                if (packets[i]) {
668                        libtrace_ocache_free(&trace->packet_freelist, (void **) &packets[i], 1, 1);
669                        packets[i] = NULL;
670                }
671        }
672
673
674        thread_change_state(trace, t, THREAD_FINISHED, true);
675
676        // Notify only after we've defiantly set the state to finished
677        message.code = MESSAGE_PERPKT_ENDED;
678        message.additional.uint64 = 0;
679        trace_send_message_to_reporter(trace, &message);
680
681        // Release all ocache memory before unregistering with the format
682        // because this might(it does in DPDK) unlink the formats mempool
683        // causing destroy/finish packet to fail.
684        libtrace_ocache_unregister_thread(&trace->packet_freelist);
685        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
686        if (trace->format->punregister_thread) {
687                trace->format->punregister_thread(trace, t);
688        }
689        print_memory_stats();
690
691        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
692
693        pthread_exit(NULL);
694};
695
696/**
697 * The start point for our single threaded hasher thread, this will read
698 * and hash a packet from a data source and queue it against the correct
699 * core to process it.
700 */
701static void* hasher_entry(void *data) {
702        libtrace_t *trace = (libtrace_t *)data;
703        libtrace_thread_t * t;
704        int i;
705        libtrace_packet_t * packet;
706        libtrace_message_t message = {0};
707        int pkt_skipped = 0;
708
709        assert(trace_has_dedicated_hasher(trace));
710        /* Wait until all threads are started and objects are initialised (ring buffers) */
711        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
712        t = &trace->hasher_thread;
713        assert(t->type == THREAD_HASHER && pthread_equal(pthread_self(), t->tid));
714        if (trace->state == STATE_ERROR) {
715                thread_change_state(trace, t, THREAD_FINISHED, false);
716                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
717                pthread_exit(NULL);
718        }
719
720        printf("Hasher Thread started\n");
721        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
722
723        if (trace->format->pregister_thread) {
724                trace->format->pregister_thread(trace, t, true);
725        }
726
727        /* Read all packets in then hash and queue against the correct thread */
728        while (1) {
729                int thread;
730                if (!pkt_skipped)
731                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
732                assert(packet);
733
734                if (libtrace_halt) // Signal to die has been sent - TODO
735                        break;
736
737                // Check for messages that we expect MESSAGE_DO_PAUSE, (internal messages only)
738                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
739                        switch(message.code) {
740                                case MESSAGE_DO_PAUSE:
741                                        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
742                                        thread_change_state(trace, t, THREAD_PAUSED, false);
743                                        pthread_cond_broadcast(&trace->perpkt_cond);
744                                        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
745                                                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
746                                        }
747                                        thread_change_state(trace, t, THREAD_RUNNING, false);
748                                        pthread_cond_broadcast(&trace->perpkt_cond);
749                                        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
750                                        break;
751                                case MESSAGE_DO_STOP:
752                                        // Stop called after pause
753                                        assert(trace->started == false);
754                                        assert(trace->state == STATE_FINSHED);
755                                        break;
756                                default:
757                                        fprintf(stderr, "Hasher thread didn't expect message code=%d\n", message.code);
758                        }
759                        pkt_skipped = 1;
760                        continue;
761                }
762
763                if ((packet->error = trace_read_packet(trace, packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
764                        break; /* We are EOF or error'd either way we stop  */
765                }
766
767                /* We are guaranteed to have a hash function i.e. != NULL */
768                trace_packet_set_hash(packet, (*trace->hasher)(packet, trace->hasher_data));
769                thread = trace_packet_get_hash(packet) % trace->perpkt_thread_count;
770                /* Blocking write to the correct queue - I'm the only writer */
771                if (trace->perpkt_threads[thread].state != THREAD_FINISHED) {
772                        uint64_t order = trace_packet_get_order(packet);
773                        libtrace_ringbuffer_write(&trace->perpkt_threads[thread].rbuffer, packet);
774                        if (trace->config.tick_count && order % trace->config.tick_count == 0) {
775                                // Write ticks to everyone else
776                                libtrace_packet_t * pkts[trace->perpkt_thread_count];
777                                memset(pkts, 0, sizeof(void *) * trace->perpkt_thread_count);
778                                libtrace_ocache_alloc(&trace->packet_freelist, (void **) pkts, trace->perpkt_thread_count, trace->perpkt_thread_count);
779                                for (i = 0; i < trace->perpkt_thread_count; i++) {
780                                        pkts[i]->error = READ_TICK;
781                                        trace_packet_set_order(pkts[i], order);
782                                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, pkts[i]);
783                                }
784                        }
785                        pkt_skipped = 0;
786                } else {
787                        assert(!"Dropping a packet!!");
788                        pkt_skipped = 1; // Reuse that packet no one read it
789                }
790        }
791
792        /* Broadcast our last failed read to all threads */
793        for (i = 0; i < trace->perpkt_thread_count; i++) {
794                libtrace_packet_t * bcast;
795                fprintf(stderr, "Broadcasting error/EOF now the trace is over\n");
796                if (i == trace->perpkt_thread_count - 1) {
797                        bcast = packet;
798                } else {
799                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &bcast, 1, 1);
800                        bcast->error = packet->error;
801                }
802                ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
803                if (trace->perpkt_threads[i].state != THREAD_FINISHED) {
804                        // Unlock early otherwise we could deadlock
805                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, bcast);
806                        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
807                } else {
808                        fprintf(stderr, "SKIPPING THREAD !!!%d!!!/n", (int) i);
809                        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
810                }
811        }
812
813        // We don't need to free the packet
814        thread_change_state(trace, t, THREAD_FINISHED, true);
815
816        // Notify only after we've defiantly set the state to finished
817        message.code = MESSAGE_PERPKT_ENDED;
818        message.additional.uint64 = 0;
819        trace_send_message_to_reporter(trace, &message);
820        libtrace_ocache_unregister_thread(&trace->packet_freelist);
821        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
822        if (trace->format->punregister_thread) {
823                trace->format->punregister_thread(trace, t);
824        }
825        print_memory_stats();
826        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
827
828        // TODO remove from TTABLE t sometime
829        pthread_exit(NULL);
830};
831
832/**
833 * Moves src into dest(Complete copy) and copies the memory buffer and
834 * its flags from dest into src ready for reuse without needing extra mallocs.
835 */
836static inline void swap_packets(libtrace_packet_t *dest, libtrace_packet_t *src) {
837        // Save the passed in buffer status
838        assert(dest->trace == NULL); // Must be a empty packet
839        void * temp_buf = dest->buffer;
840        buf_control_t temp_buf_control = dest->buf_control;
841        // Completely copy StoredPacket into packet
842        memcpy(dest, src, sizeof(libtrace_packet_t));
843        // Set the buffer settings on the returned packet
844        src->buffer = temp_buf;
845        src->buf_control = temp_buf_control;
846        src->trace = NULL;
847}
848
849/* Our simplest case when a thread becomes ready it can obtain an exclusive
850 * lock to read packets from the underlying trace.
851 */
852static int trace_pread_packet_first_in_first_served(libtrace_t *libtrace,
853                                                    libtrace_thread_t *t,
854                                                    libtrace_packet_t *packets[],
855                                                    size_t nb_packets) {
856        size_t i = 0;
857        //bool tick_hit = false;
858
859        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
860        /* Read nb_packets */
861        for (i = 0; i < nb_packets; ++i) {
862                packets[i]->error = trace_read_packet(libtrace, packets[i]);
863
864                if (packets[i]->error <= 0) {
865                        /* We'll catch this next time if we have already got packets */
866                        if ( i==0 ) {
867                                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
868                                return packets[i]->error;
869                        } else {
870                                break;
871                        }
872                }
873                /*
874                if (libtrace->config.tick_count && trace_packet_get_order(packets[i]) % libtrace->config.tick_count == 0) {
875                        tick_hit = true;
876                }*/
877        }
878        // Doing this inside the lock ensures the first packet is always
879        // recorded first
880        if (packets[0]->error > 0) {
881                store_first_packet(libtrace, packets[0], t);
882        }
883        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
884        /* XXX TODO this needs to be inband with packets, or we don't bother in this case
885        if (tick_hit) {
886                libtrace_message_t tick;
887                tick.additional.uint64 = trace_packet_get_order(packets[i]);
888                tick.code = MESSAGE_TICK;
889                trace_send_message_to_perpkts(libtrace, &tick);
890        } */
891        return i;
892}
893
894/**
895 * For the case that we have a dedicated hasher thread
896 * 1. We read a packet from our buffer
897 * 2. Move that into the packet provided (packet)
898 */
899inline static int trace_pread_packet_hasher_thread(libtrace_t *libtrace,
900                                                      libtrace_thread_t *t,
901                                                      libtrace_packet_t *packets[],
902                                                      size_t nb_packets) {
903        size_t i;
904
905        /* We store the last error message here */
906        if (t->format_data) {
907                fprintf(stderr, "Hit me, ohh yeah got error %d\n",
908                        ((libtrace_packet_t *)t->format_data)->error);
909                return ((libtrace_packet_t *)t->format_data)->error;
910        }
911
912        // Always grab at least one
913        if (packets[0]) // Recycle the old get the new
914                libtrace_ocache_free(&libtrace->packet_freelist, (void **) packets, 1, 1);
915        packets[0] = libtrace_ringbuffer_read(&t->rbuffer);
916
917        if (packets[0]->error <= 0 && packets[0]->error != READ_TICK) {
918                fprintf(stderr, "Hit me, ohh yeah returning error %d\n", packets[0]->error);
919                return packets[0]->error;
920        }
921
922        for (i = 1; i < nb_packets; i++) {
923                if (packets[i]) // Recycle the old get the new
924                        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packets[i], 1, 1);
925                if (!libtrace_ringbuffer_try_read(&t->rbuffer, (void **) &packets[i])) {
926                        packets[i] = NULL;
927                        break;
928                }
929
930                /* We will return an error or EOF the next time around */
931                if (packets[i]->error <= 0 && packets[0]->error != READ_TICK) {
932                        /* The message case will be checked automatically -
933                           However other cases like EOF and error will only be
934                           sent once*/
935                        if (packets[i]->error != READ_MESSAGE) {
936                                assert(t->format_data == NULL);
937                                t->format_data = packets[i];
938                                fprintf(stderr, "Hit me, ohh yeah set error %d\n",
939                                        ((libtrace_packet_t *)t->format_data)->error);
940                        }
941                        break;
942                }
943        }
944
945        return i;
946}
947
948/**
949 * Tries to read from our queue and returns 1 if a packet was retrieved
950 */
951static inline int try_waiting_queue(libtrace_t *libtrace, libtrace_thread_t * t, libtrace_packet_t **packet, int * ret)
952{
953        libtrace_packet_t* retrived_packet;
954
955        /* Lets see if we have one waiting */
956        if (libtrace_ringbuffer_try_read(&t->rbuffer, (void **) &retrived_packet)) {
957                /* Copy paste from trace_pread_packet_hasher_thread() except that we try read (non-blocking) */
958                assert(retrived_packet);
959
960                if (*packet) // Recycle the old get the new
961                        libtrace_ocache_free(&libtrace->packet_freelist, (void **) packet, 1, 1);
962                *packet = retrived_packet;
963                *ret = (*packet)->error;
964                return 1;
965        }
966        return 0;
967}
968
969/**
970 * Allows us to ensure all threads are finished writing to our threads ring_buffer
971 * before returning EOF/error.
972 */
973inline static int trace_handle_finishing_perpkt(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t)
974{
975        /* We are waiting for the condition that another thread ends to check
976         * our queue for new data, once all threads end we can go to finished */
977        bool complete = false;
978        int ret;
979
980        do {
981                // Wait for a thread to end
982                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
983
984                // Check before
985                if (libtrace->perpkt_thread_states[THREAD_FINISHING] == libtrace->perpkt_thread_count) {
986                        complete = true;
987                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
988                        continue;
989                }
990
991                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
992
993                // Check after
994                if (libtrace->perpkt_thread_states[THREAD_FINISHING] == libtrace->perpkt_thread_count) {
995                        complete = true;
996                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
997                        continue;
998                }
999
1000                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1001
1002                // Always trying to keep our buffer empty for the unlikely case more threads than buffer space want to write into our queue
1003                if(try_waiting_queue(libtrace, t, packet, &ret))
1004                        return ret;
1005        } while (!complete);
1006
1007        // We can only end up here once all threads complete
1008        try_waiting_queue(libtrace, t, packet, &ret);
1009
1010        return ret;
1011        // TODO rethink this logic fix bug here
1012}
1013
1014/**
1015 * Expects the libtrace_lock to not be held
1016 */
1017inline static int trace_finish_perpkt(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t)
1018{
1019        thread_change_state(libtrace, t, THREAD_FINISHING, true);
1020        return trace_handle_finishing_perpkt(libtrace, packet, t);
1021}
1022
1023/**
1024 * This case is much like the dedicated hasher, except that we will become
1025 * hasher if we don't have a a packet waiting.
1026 *
1027 * Note: This is only every used if we have are doing hashing.
1028 *
1029 * TODO: Can block on zero copy formats such as ring: and dpdk: if the
1030 * queue sizes in total are larger than the ring size.
1031 *
1032 * 1. We read a packet from our buffer
1033 * 2. Move that into the packet provided (packet)
1034 */
1035inline static int trace_pread_packet_hash_locked(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packet)
1036{
1037        int thread, ret/*, psize*/;
1038
1039        while (1) {
1040                if(try_waiting_queue(libtrace, t, packet, &ret))
1041                        return ret;
1042                // Can still block here if another thread is writing to a full queue
1043                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1044
1045                // Its impossible for our own queue to overfill, because no one can write
1046                // when we are in the lock
1047                if(try_waiting_queue(libtrace, t, packet, &ret)) {
1048                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1049                        return ret;
1050                }
1051
1052                // Another thread cannot write a packet because a queue has filled up. Is it ours?
1053                if (libtrace->perpkt_queue_full) {
1054                        contention_stats[t->perpkt_num].wait_for_fill_complete_hits++;
1055                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1056                        continue;
1057                }
1058
1059                if (!*packet)
1060                        libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) packet, 1, 1);
1061                assert(*packet);
1062
1063                // If we fail here we can guarantee that our queue is empty (and no new data will be added because we hold the lock)
1064                if (libtrace_halt || ((*packet)->error = trace_read_packet(libtrace, *packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
1065                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1066                        if (libtrace_halt)
1067                                return 0;
1068                        else
1069                                return (*packet)->error;
1070                }
1071
1072                trace_packet_set_hash(*packet, (*libtrace->hasher)(*packet, libtrace->hasher_data));
1073                thread = trace_packet_get_hash(*packet) % libtrace->perpkt_thread_count;
1074                if (thread == t->perpkt_num) {
1075                        // If it's this thread we must be in order because we checked the buffer once we got the lock
1076                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1077                        return (*packet)->error;
1078                }
1079
1080                if (libtrace->perpkt_threads[thread].state != THREAD_FINISHED) {
1081                        while (!libtrace_ringbuffer_try_swrite_bl(&libtrace->perpkt_threads[thread].rbuffer, *packet)) {
1082                                libtrace->perpkt_queue_full = true;
1083                                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1084                                contention_stats[t->perpkt_num].full_queue_hits++;
1085                                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1086                        }
1087                        *packet = NULL;
1088                        libtrace->perpkt_queue_full = false;
1089                } else {
1090                        /* We can get here if the user closes the thread before natural completion/or error */
1091                        assert (!"packet_hash_locked() The user terminated the trace in a abnormal manner");
1092                }
1093                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1094        }
1095}
1096
1097/**
1098 * For the first packet of each queue we keep a copy and note the system
1099 * time it was received at.
1100 *
1101 * This is used for finding the first packet when playing back a trace
1102 * in trace time. And can be used by real time applications to print
1103 * results out every XXX seconds.
1104 */
1105void store_first_packet(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t)
1106{
1107        if (!t->recorded_first) {
1108                struct timeval tv;
1109                libtrace_packet_t * dup;
1110                // For what it's worth we can call these outside of the lock
1111                gettimeofday(&tv, NULL);
1112                dup = trace_copy_packet(packet);
1113                ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
1114                libtrace->first_packets.packets[t->perpkt_num].packet = dup;
1115                //printf("Stored first packet time=%f\n", trace_get_seconds(dup));
1116                memcpy(&libtrace->first_packets.packets[t->perpkt_num].tv, &tv, sizeof(tv));
1117                // Now update the first
1118                libtrace->first_packets.count++;
1119                if (libtrace->first_packets.count == 1) {
1120                        // We the first entry hence also the first known packet
1121                        libtrace->first_packets.first = t->perpkt_num;
1122                } else {
1123                        // Check if we are newer than the previous 'first' packet
1124                        size_t first = libtrace->first_packets.first;
1125                        if (trace_get_seconds(dup) <
1126                                trace_get_seconds(libtrace->first_packets.packets[first].packet))
1127                                libtrace->first_packets.first = t->perpkt_num;
1128                }
1129                ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
1130                libtrace_message_t mesg = {0};
1131                mesg.code = MESSAGE_FIRST_PACKET;
1132                trace_send_message_to_reporter(libtrace, &mesg);
1133                t->recorded_first = true;
1134        }
1135}
1136
1137/**
1138 * Returns 1 if it's certain that the first packet is truly the first packet
1139 * rather than a best guess based upon threads that have published so far.
1140 * Otherwise 0 is returned.
1141 * It's recommended that this result is stored rather than calling this
1142 * function again.
1143 */
1144DLLEXPORT int retrive_first_packet(libtrace_t *libtrace, libtrace_packet_t **packet, struct timeval **tv)
1145{
1146        int ret = 0;
1147        ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
1148        if (libtrace->first_packets.count) {
1149                *packet = libtrace->first_packets.packets[libtrace->first_packets.first].packet;
1150                *tv = &libtrace->first_packets.packets[libtrace->first_packets.first].tv;
1151                if (libtrace->first_packets.count == (size_t) libtrace->perpkt_thread_count) {
1152                        ret = 1;
1153                } else {
1154                        struct timeval curr_tv;
1155                        // If a second has passed since the first entry we will assume this is the very first packet
1156                        gettimeofday(&curr_tv, NULL);
1157                        if (curr_tv.tv_sec > (*tv)->tv_sec) {
1158                                if(curr_tv.tv_usec > (*tv)->tv_usec || curr_tv.tv_sec - (*tv)->tv_sec > 1) {
1159                                        ret = 1;
1160                                }
1161                        }
1162                }
1163        } else {
1164                *packet = NULL;
1165                *tv = NULL;
1166        }
1167        ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
1168        return ret;
1169}
1170
1171
1172DLLEXPORT uint64_t tv_to_usec(struct timeval *tv)
1173{
1174        return (uint64_t) tv->tv_sec*1000000ull + (uint64_t) tv->tv_usec;
1175}
1176
1177inline static struct timeval usec_to_tv(uint64_t usec)
1178{
1179        struct timeval tv;
1180        tv.tv_sec = usec / 1000000;
1181        tv.tv_usec = usec % 1000000;
1182        return tv;
1183}
1184
1185/** Similar to delay_tracetime but send messages to all threads periodically */
1186static void* reporter_entry(void *data) {
1187        libtrace_message_t message = {0};
1188        libtrace_t *trace = (libtrace_t *)data;
1189        libtrace_thread_t *t = &trace->reporter_thread;
1190
1191        fprintf(stderr, "Reporter thread starting\n");
1192
1193        /* Wait until all threads are started */
1194        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
1195        if (trace->state == STATE_ERROR) {
1196                thread_change_state(trace, t, THREAD_FINISHED, false);
1197                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1198                pthread_exit(NULL);
1199        }
1200        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1201
1202        message.code = MESSAGE_STARTING;
1203        message.sender = t;
1204        (*trace->reporter)(trace, NULL, &message);
1205        message.code = MESSAGE_RESUMING;
1206        (*trace->reporter)(trace, NULL, &message);
1207
1208        while (!trace_finished(trace)) {
1209                if (trace->config.reporter_polling) {
1210                        if (libtrace_message_queue_try_get(&t->messages, &message) == LIBTRACE_MQ_FAILED)
1211                                message.code = MESSAGE_POST_REPORTER;
1212                } else {
1213                        libtrace_message_queue_get(&t->messages, &message);
1214                }
1215                switch (message.code) {
1216                        // Check for results
1217                        case MESSAGE_POST_REPORTER:
1218                                trace->combiner.read(trace, &trace->combiner);
1219                                break;
1220                        case MESSAGE_DO_PAUSE:
1221                                assert(trace->combiner.pause);
1222                                trace->combiner.pause(trace, &trace->combiner);
1223                                message.code = MESSAGE_PAUSING;
1224                                message.sender = t;
1225                                (*trace->reporter)(trace, NULL, &message);
1226                                trace_thread_pause(trace, t);
1227                                message.code = MESSAGE_RESUMING;
1228                                (*trace->reporter)(trace, NULL, &message);
1229                                break;
1230                        default:
1231                                (*trace->reporter)(trace, NULL, &message);
1232                }
1233        }
1234
1235        // Flush out whats left now all our threads have finished
1236        trace->combiner.read_final(trace, &trace->combiner);
1237
1238        // GOODBYE
1239        message.code = MESSAGE_PAUSING;
1240        message.sender = t;
1241        (*trace->reporter)(trace, NULL, &message);
1242        message.code = MESSAGE_STOPPING;
1243        (*trace->reporter)(trace, NULL, &message);
1244
1245        thread_change_state(trace, &trace->reporter_thread, THREAD_FINISHED, true);
1246        print_memory_stats();
1247        return NULL;
1248}
1249
1250/** Similar to delay_tracetime but send messages to all threads periodically */
1251static void* keepalive_entry(void *data) {
1252        struct timeval prev, next;
1253        libtrace_message_t message = {0};
1254        libtrace_t *trace = (libtrace_t *)data;
1255        uint64_t next_release;
1256        fprintf(stderr, "keepalive thread is starting\n");
1257
1258        /* Wait until all threads are started */
1259        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
1260        if (trace->state == STATE_ERROR) {
1261                thread_change_state(trace, &trace->keepalive_thread, THREAD_FINISHED, false);
1262                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1263                pthread_exit(NULL);
1264        }
1265        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1266
1267        gettimeofday(&prev, NULL);
1268        message.code = MESSAGE_TICK;
1269        while (trace->state != STATE_FINSHED) {
1270                fd_set rfds;
1271                next_release = tv_to_usec(&prev) + (trace->config.tick_interval * 1000);
1272                gettimeofday(&next, NULL);
1273                if (next_release > tv_to_usec(&next)) {
1274                        next = usec_to_tv(next_release - tv_to_usec(&next));
1275                        // Wait for timeout or a message
1276                        FD_ZERO(&rfds);
1277                        FD_SET(libtrace_message_queue_get_fd(&trace->keepalive_thread.messages), &rfds);
1278                        if (select(libtrace_message_queue_get_fd(&trace->keepalive_thread.messages)+1, &rfds, NULL, NULL, &next) == 1) {
1279                                libtrace_message_t msg;
1280                                libtrace_message_queue_get(&trace->keepalive_thread.messages, &msg);
1281                                assert(msg.code == MESSAGE_DO_STOP);
1282                                goto done;
1283                        }
1284                }
1285                prev = usec_to_tv(next_release);
1286                if (trace->state == STATE_RUNNING) {
1287                        message.additional.uint64 = tv_to_usec(&prev);
1288                        trace_send_message_to_perpkts(trace, &message);
1289                }
1290        }
1291done:
1292
1293        thread_change_state(trace, &trace->keepalive_thread, THREAD_FINISHED, true);
1294        return NULL;
1295}
1296
1297/**
1298 * Delays a packets playback so the playback will be in trace time.
1299 * This may break early if a message becomes available.
1300 *
1301 * Requires the first packet for this thread to be received.
1302 * @param libtrace  The trace
1303 * @param packet    The packet to delay
1304 * @param t         The current thread
1305 * @return Either READ_MESSAGE(-2) or 0 is successful
1306 */
1307static inline int delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t) {
1308        struct timeval curr_tv, pkt_tv;
1309        uint64_t next_release = t->tracetime_offset_usec;
1310        uint64_t curr_usec;
1311
1312        if (!t->tracetime_offset_usec) {
1313                libtrace_packet_t *first_pkt;
1314                struct timeval *sys_tv;
1315                int64_t initial_offset;
1316                int stable = retrive_first_packet(libtrace, &first_pkt, &sys_tv);
1317                assert(first_pkt);
1318                pkt_tv = trace_get_timeval(first_pkt);
1319                initial_offset = (int64_t)tv_to_usec(sys_tv) - (int64_t)tv_to_usec(&pkt_tv);
1320                /* In the unlikely case offset is 0, change it to 1 */
1321                if (stable)
1322                        t->tracetime_offset_usec = initial_offset ? initial_offset: 1;
1323                next_release = initial_offset;
1324        }
1325        /* next_release == offset */
1326        pkt_tv = trace_get_timeval(packet);
1327        next_release += tv_to_usec(&pkt_tv);
1328        gettimeofday(&curr_tv, NULL);
1329        curr_usec = tv_to_usec(&curr_tv);
1330        if (next_release > curr_usec) {
1331                int ret, mesg_fd = libtrace_message_queue_get_fd(&t->messages);
1332                struct timeval delay_tv = usec_to_tv(next_release-curr_usec);
1333                fd_set rfds;
1334                FD_ZERO(&rfds);
1335                FD_SET(mesg_fd, &rfds);
1336                // We need to wait
1337
1338                //printf("WAITING for %d.%d next=%"PRIu64" curr=%"PRIu64" seconds packettime %f\n", delay_tv.tv_sec, delay_tv.tv_usec, next_release, curr_usec, trace_get_seconds(packet));
1339                ret = select(mesg_fd+1, &rfds, NULL, NULL, &delay_tv);
1340                if (ret == 0) {
1341                        return 0;
1342                } else if (ret > 0) {
1343                        return READ_MESSAGE;
1344                } else {
1345                        fprintf(stderr, "I thnik we broke select\n");
1346                }
1347        }
1348        return 0;
1349}
1350
1351/* Discards packets that don't match the filter.
1352 * Discarded packets are emptied and then moved to the end of the packet list.
1353 *
1354 * @param trace       The trace format, containing the filter
1355 * @param packets     An array of packets
1356 * @param nb_packets  The number of valid items in packets
1357 *
1358 * @return The number of packets that passed the filter, which are moved to
1359 *          the start of the packets array
1360 */
1361static inline size_t filter_packets(libtrace_t *trace,
1362                                    libtrace_packet_t **packets,
1363                                    size_t nb_packets) {
1364        size_t offset = 0;
1365        size_t i;
1366
1367        for (i = 0; i < nb_packets; ++i) {
1368                // The filter needs the trace attached to receive the link type
1369                packets[i]->trace = trace;
1370                if (trace_apply_filter(trace->filter, packets[i])) {
1371                        libtrace_packet_t *tmp;
1372                        tmp = packets[offset];
1373                        packets[offset++] = packets[i];
1374                        packets[i] = tmp;
1375                } else {
1376                        trace_fin_packet(packets[i]);
1377                }
1378        }
1379
1380        return offset;
1381}
1382
1383/* Read a batch of packets from the trace into a buffer.
1384 * Note that this function will block until a packet is read (or EOF is reached)
1385 *
1386 * @param libtrace    The trace
1387 * @param t           The thread
1388 * @param packets     An array of packets
1389 * @param nb_packets  The number of empty packets in packets
1390 * @return The number of packets read, 0 on EOF (or an error/message -1,-2).
1391 */
1392static int trace_pread_packet_wrapper(libtrace_t *libtrace,
1393                                      libtrace_thread_t *t,
1394                                      libtrace_packet_t *packets[],
1395                                      size_t nb_packets) {
1396        int i;
1397        assert(nb_packets);
1398        assert(libtrace && "libtrace is NULL in trace_read_packet()");
1399        if (trace_is_err(libtrace))
1400                return -1;
1401        if (!libtrace->started) {
1402                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
1403                              "You must call libtrace_start() before trace_read_packet()\n");
1404                return -1;
1405        }
1406
1407        if (libtrace->format->pread_packets) {
1408                int ret;
1409                for (i = 0; i < (int) nb_packets; ++i) {
1410                        assert(i[packets]);
1411                        if (!(packets[i]->buf_control==TRACE_CTRL_PACKET ||
1412                              packets[i]->buf_control==TRACE_CTRL_EXTERNAL)) {
1413                                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,
1414                                              "Packet passed to trace_read_packet() is invalid\n");
1415                                return -1;
1416                        }
1417                }
1418                do {
1419                        ret=libtrace->format->pread_packets(libtrace, t,
1420                                                            packets,
1421                                                            nb_packets);
1422                        /* Error, EOF or message? */
1423                        if (ret <= 0) {
1424                                return ret;
1425                        }
1426
1427                        if (libtrace->filter) {
1428                                int remaining;
1429                                remaining = filter_packets(libtrace,
1430                                                           packets, ret);
1431                                t->filtered_packets += ret - remaining;
1432                                ret = remaining;
1433                        }
1434                        for (i = 0; i < ret; ++i) {
1435                                /* We do not mark the packet against the trace,
1436                                 * before hand or after. After breaks DAG meta
1437                                 * packets and before is inefficient */
1438                                //packets[i]->trace = libtrace;
1439                                /* TODO IN FORMAT?? Like traditional libtrace */
1440                                if (libtrace->snaplen>0)
1441                                        trace_set_capture_length(packets[i],
1442                                                        libtrace->snaplen);
1443                                trace_packet_set_order(packets[i], trace_get_erf_timestamp(packets[i]));
1444                        }
1445                        t->accepted_packets += ret;
1446                } while(ret == 0);
1447                return ret;
1448        }
1449        trace_set_err(libtrace, TRACE_ERR_UNSUPPORTED,
1450                      "This format does not support reading packets\n");
1451        return ~0U;
1452}
1453
1454/* Restarts a parallel trace, this is called from trace_pstart.
1455 * The libtrace lock is held upon calling this function.
1456 * Typically with a parallel trace the threads are not
1457 * killed rather.
1458 */
1459static int trace_prestart(libtrace_t * libtrace, void *global_blob,
1460                          fn_per_pkt per_pkt, fn_reporter reporter) {
1461        int err = 0;
1462        if (libtrace->state != STATE_PAUSED) {
1463                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
1464                        "trace(%s) is not currently paused",
1465                              libtrace->uridata);
1466                return -1;
1467        }
1468
1469        /* Update functions if requested */
1470        if (per_pkt)
1471                libtrace->per_pkt = per_pkt;
1472        if (reporter)
1473                libtrace->reporter = reporter;
1474        if(global_blob)
1475                libtrace->global_blob = global_blob;
1476
1477        assert(libtrace_parallel);
1478        assert(!libtrace->perpkt_thread_states[THREAD_RUNNING]);
1479        assert(libtrace->per_pkt);
1480
1481        if (libtrace->perpkt_thread_count > 1 &&
1482            trace_supports_parallel(libtrace) &&
1483            !trace_has_dedicated_hasher(libtrace)) {
1484                fprintf(stderr, "Restarting trace pstart_input()\n");
1485                err = libtrace->format->pstart_input(libtrace);
1486        } else {
1487                if (libtrace->format->start_input) {
1488                        fprintf(stderr, "Restarting trace start_input()\n");
1489                        err = libtrace->format->start_input(libtrace);
1490                }
1491        }
1492
1493        if (err == 0) {
1494                libtrace->started = true;
1495                libtrace_change_state(libtrace, STATE_RUNNING, false);
1496        }
1497        return err;
1498}
1499
1500/**
1501 * Verifies the configuration and sets default values for any values not
1502 * specified by the user.
1503 * @return
1504 */
1505static void verify_configuration(libtrace_t *libtrace) {
1506        bool require_hasher = false;
1507
1508        /* Might we need a dedicated hasher thread? */
1509        if (libtrace->hasher && libtrace->hasher_type != HASHER_HARDWARE) {
1510                require_hasher = true;
1511        }
1512
1513        if (libtrace->config.hasher_queue_size <= 0)
1514                libtrace->config.hasher_queue_size = 1000;
1515
1516        if (libtrace->config.perpkt_threads <= 0) {
1517                // TODO add BSD support
1518                libtrace->perpkt_thread_count = sysconf(_SC_NPROCESSORS_ONLN);
1519                if (libtrace->perpkt_thread_count <= 0)
1520                        // Lets just use one
1521                        libtrace->perpkt_thread_count = 1;
1522        } else {
1523                libtrace->perpkt_thread_count = libtrace->config.perpkt_threads;
1524        }
1525
1526        if (libtrace->config.reporter_thold <= 0)
1527                libtrace->config.reporter_thold = 100;
1528        if (libtrace->config.burst_size <= 0)
1529                libtrace->config.burst_size = 10;
1530        if (libtrace->config.packet_thread_cache_size <= 0)
1531                libtrace->config.packet_thread_cache_size = 20;
1532        if (libtrace->config.packet_cache_size <= 0)
1533                libtrace->config.packet_cache_size = (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count;
1534
1535        if (libtrace->config.packet_cache_size <
1536                (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count)
1537                fprintf(stderr, "WARNING deadlocks may occur and extra memory allocating buffer sizes (packet_freelist_size) mismatched\n");
1538
1539        if (libtrace->combiner.initialise == NULL && libtrace->combiner.publish == NULL)
1540                libtrace->combiner = combiner_unordered;
1541
1542
1543        /* Figure out if we are using a dedicated hasher thread? */
1544        if (require_hasher && libtrace->config.perpkt_threads > 1) {
1545                libtrace->hasher_thread.type = THREAD_HASHER;
1546        }
1547}
1548
1549/**
1550 * Starts a libtrace_thread, including allocating memory for messaging.
1551 * Threads are expected to wait until the libtrace look is released.
1552 * Hence why we don't init structures until later.
1553 *
1554 * @param trace The trace the thread is associated with
1555 * @param t The thread that is filled when the thread is started
1556 * @param type The type of thread
1557 * @param start_routine The entry location of the thread
1558 * @param perpkt_num The perpkt thread number (should be set -1 if not perpkt)
1559 * @param name For debugging purposes set the threads name (Optional)
1560 *
1561 * @return 0 on success or -1 upon error in which case the libtrace error is set.
1562 *         In this situation the thread structure is zeroed.
1563 */
1564static int trace_start_thread(libtrace_t *trace,
1565                       libtrace_thread_t *t,
1566                       enum thread_types type,
1567                       void *(*start_routine) (void *),
1568                       int perpkt_num,
1569                       const char *name) {
1570        int ret;
1571        assert(t->type == THREAD_EMPTY);
1572        t->trace = trace;
1573        t->ret = NULL;
1574        t->user_data = NULL;
1575        t->type = type;
1576        t->state = THREAD_RUNNING;
1577        ret = pthread_create(&t->tid, NULL, start_routine, (void *) trace);
1578        if (ret != 0) {
1579                libtrace_zero_thread(t);
1580                trace_set_err(trace, ret, "Failed to create a thread");
1581                return -1;
1582        }
1583        libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
1584        if (trace_has_dedicated_hasher(trace) && type == THREAD_PERPKT) {
1585                libtrace_ringbuffer_init(&t->rbuffer,
1586                                         trace->config.hasher_queue_size,
1587                                         trace->config.hasher_polling?
1588                                                 LIBTRACE_RINGBUFFER_POLLING:
1589                                                 LIBTRACE_RINGBUFFER_BLOCKING);
1590        }
1591        if(name)
1592                pthread_setname_np(t->tid, name);
1593        t->perpkt_num = perpkt_num;
1594        return 0;
1595}
1596
1597/* Start an input trace in the parallel libtrace framework.
1598 * This can also be used to restart an existing parallel.
1599 *
1600 * NOTE: libtrace lock is held for the majority of this function
1601 *
1602 * @param libtrace the input trace to start
1603 * @param global_blob some global data you can share with the new perpkt threads
1604 * @returns 0 on success, otherwise -1 to indicate an error has occured
1605 */
1606DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob,
1607                           fn_per_pkt per_pkt, fn_reporter reporter) {
1608        int i;
1609        int ret = -1;
1610        char name[16];
1611        sigset_t sig_before, sig_block_all;
1612        assert(libtrace);
1613
1614        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1615        if (trace_is_err(libtrace)) {
1616                goto cleanup_none;
1617        }
1618
1619        if (libtrace->state == STATE_PAUSED) {
1620                ret = trace_prestart(libtrace, global_blob, per_pkt, reporter);
1621                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1622                return ret;
1623        }
1624
1625        if (libtrace->state != STATE_NEW) {
1626                trace_set_err(libtrace, TRACE_ERR_BAD_STATE, "trace_pstart "
1627                              "should be called on a NEW or PAUSED trace but "
1628                              "instead was called from %s",
1629                              get_trace_state_name(libtrace->state));
1630                goto cleanup_none;
1631        }
1632
1633        /* Store the user defined things against the trace */
1634        libtrace->global_blob = global_blob;
1635        libtrace->per_pkt = per_pkt;
1636        libtrace->reporter = reporter;
1637        /* And zero other fields */
1638        for (i = 0; i < THREAD_STATE_MAX; ++i) {
1639                libtrace->perpkt_thread_states[i] = 0;
1640        }
1641        libtrace->first_packets.first = 0;
1642        libtrace->first_packets.count = 0;
1643        libtrace->first_packets.packets = NULL;
1644        libtrace->perpkt_threads = NULL;
1645        /* Set a global which says we are using a parallel trace. This is
1646         * for backwards compatability due to changes when destroying packets */
1647        libtrace_parallel = 1;
1648
1649        verify_configuration(libtrace);
1650
1651        /* Try start the format - we prefer parallel over single threaded, as
1652         * these formats should support messages better */
1653        if (trace_supports_parallel(libtrace) &&
1654            !trace_has_dedicated_hasher(libtrace)) {
1655                printf("Using the parallel trace format\n");
1656                ret = libtrace->format->pstart_input(libtrace);
1657                libtrace->pread = trace_pread_packet_wrapper;
1658        } else {
1659                printf("Using single threaded interface\n");
1660                if (libtrace->format->start_input) {
1661                        ret = libtrace->format->start_input(libtrace);
1662                }
1663                if (libtrace->perpkt_thread_count > 1)
1664                        libtrace->pread = trace_pread_packet_first_in_first_served;
1665                else
1666                        /* Use standard read_packet */
1667                        libtrace->pread = NULL;
1668        }
1669
1670        if (ret != 0) {
1671                goto cleanup_none;
1672        }
1673
1674        /* --- Start all the threads we need --- */
1675        /* Disable signals because it is inherited by the threads we start */
1676        sigemptyset(&sig_block_all);
1677        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_block_all, &sig_before), == 0);
1678
1679        /* If we need a hasher thread start it
1680         * Special Case: If single threaded we don't need a hasher
1681         */
1682        if (trace_has_dedicated_hasher(libtrace)) {
1683                ret = trace_start_thread(libtrace, &libtrace->hasher_thread,
1684                                   THREAD_HASHER, hasher_entry, -1,
1685                                   "hasher-thread");
1686                if (ret != 0) {
1687                        trace_set_err(libtrace, errno, "trace_pstart "
1688                                      "failed to start a hasher thread.");
1689                        goto cleanup_started;
1690                }
1691                libtrace->pread = trace_pread_packet_hasher_thread;
1692        } else {
1693                libtrace->hasher_thread.type = THREAD_EMPTY;
1694        }
1695
1696        /* Start up our perpkt threads */
1697        libtrace->perpkt_threads = calloc(sizeof(libtrace_thread_t),
1698                                          libtrace->perpkt_thread_count);
1699        if (!libtrace->perpkt_threads) {
1700                trace_set_err(libtrace, errno, "trace_pstart "
1701                              "failed to allocate memory.");
1702                goto cleanup_threads;
1703        }
1704        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1705                snprintf(name, sizeof(name), "perpkt-%d", i);
1706                libtrace_zero_thread(&libtrace->perpkt_threads[i]);
1707                ret = trace_start_thread(libtrace, &libtrace->perpkt_threads[i],
1708                                   THREAD_PERPKT, perpkt_threads_entry, i,
1709                                   name);
1710                if (ret != 0) {
1711                        trace_set_err(libtrace, errno, "trace_pstart "
1712                                      "failed to start a perpkt thread.");
1713                        goto cleanup_threads;
1714                }
1715        }
1716
1717        /* Start the reporter thread */
1718        if (reporter) {
1719                if (libtrace->combiner.initialise)
1720                        libtrace->combiner.initialise(libtrace, &libtrace->combiner);
1721                ret = trace_start_thread(libtrace, &libtrace->reporter_thread,
1722                                   THREAD_REPORTER, reporter_entry, -1,
1723                                   "reporter_thread");
1724                if (ret != 0) {
1725                        trace_set_err(libtrace, errno, "trace_pstart "
1726                                      "failed to start reporter thread.");
1727                        goto cleanup_threads;
1728                }
1729        }
1730
1731        /* Start the keepalive thread */
1732        if (libtrace->config.tick_interval > 0) {
1733                ret = trace_start_thread(libtrace, &libtrace->keepalive_thread,
1734                                   THREAD_KEEPALIVE, keepalive_entry, -1,
1735                                   "keepalive_thread");
1736                if (ret != 0) {
1737                        trace_set_err(libtrace, errno, "trace_pstart "
1738                                      "failed to start keepalive thread.");
1739                        goto cleanup_threads;
1740                }
1741        }
1742
1743        /* Init other data structures */
1744        libtrace->perpkt_thread_states[THREAD_RUNNING] = libtrace->perpkt_thread_count;
1745        ASSERT_RET(pthread_spin_init(&libtrace->first_packets.lock, 0), == 0);
1746        libtrace->first_packets.packets = calloc(libtrace->perpkt_thread_count,
1747                                                 sizeof(struct  __packet_storage_magic_type));
1748        if (libtrace->first_packets.packets == NULL) {
1749                trace_set_err(libtrace, errno, "trace_pstart "
1750                              "failed to allocate memory.");
1751                goto cleanup_threads;
1752        }
1753
1754        if (libtrace_ocache_init(&libtrace->packet_freelist,
1755                             (void* (*)()) trace_create_packet,
1756                             (void (*)(void *))trace_destroy_packet,
1757                             libtrace->config.packet_thread_cache_size,
1758                             libtrace->config.packet_cache_size * 4,
1759                             libtrace->config.fixed_packet_count) != 0) {
1760                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "trace_pstart "
1761                              "failed to allocate ocache.");
1762                goto cleanup_threads;
1763        }
1764
1765        /* Threads don't start */
1766        libtrace->started = true;
1767        libtrace_change_state(libtrace, STATE_RUNNING, false);
1768
1769        ret = 0;
1770        goto success;
1771cleanup_threads:
1772        if (libtrace->first_packets.packets) {
1773                free(libtrace->first_packets.packets);
1774                libtrace->first_packets.packets = NULL;
1775        }
1776        libtrace_change_state(libtrace, STATE_ERROR, false);
1777        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1778        if (libtrace->hasher_thread.type == THREAD_HASHER) {
1779                pthread_join(libtrace->hasher_thread.tid, NULL);
1780                libtrace_zero_thread(&libtrace->hasher_thread);
1781        }
1782
1783        if (libtrace->perpkt_threads) {
1784                for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1785                        if (libtrace->perpkt_threads[i].type == THREAD_PERPKT) {
1786                                pthread_join(libtrace->perpkt_threads[i].tid, NULL);
1787                                libtrace_zero_thread(&libtrace->perpkt_threads[i]);
1788                        } else break;
1789                }
1790                free(libtrace->perpkt_threads);
1791                libtrace->perpkt_threads = NULL;
1792        }
1793
1794        if (libtrace->reporter_thread.type == THREAD_REPORTER) {
1795                pthread_join(libtrace->reporter_thread.tid, NULL);
1796                libtrace_zero_thread(&libtrace->reporter_thread);
1797        }
1798
1799        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
1800                pthread_join(libtrace->keepalive_thread.tid, NULL);
1801                libtrace_zero_thread(&libtrace->keepalive_thread);
1802        }
1803        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1804        libtrace_change_state(libtrace, STATE_NEW, false);
1805        assert(libtrace->perpkt_thread_states[THREAD_RUNNING] == 0);
1806        libtrace->perpkt_thread_states[THREAD_FINISHED] = 0;
1807cleanup_started:
1808        if (trace_supports_parallel(libtrace) &&
1809            !trace_has_dedicated_hasher(libtrace)
1810            && libtrace->perpkt_thread_count > 1) {
1811                if (libtrace->format->ppause_input)
1812                        libtrace->format->ppause_input(libtrace);
1813        } else {
1814                if (libtrace->format->pause_input)
1815                        libtrace->format->pause_input(libtrace);
1816        }
1817        ret = -1;
1818success:
1819        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_before, NULL), == 0);
1820cleanup_none:
1821        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1822        return ret;
1823}
1824
1825/**
1826 * Pauses a trace, this should only be called by the main thread
1827 * 1. Set started = false
1828 * 2. All perpkt threads are paused waiting on a condition var
1829 * 3. Then call ppause on the underlying format if found
1830 * 4. The traces state is paused
1831 *
1832 * Once done you should be able to modify the trace setup and call pstart again
1833 * TODO handle changing thread numbers
1834 */
1835DLLEXPORT int trace_ppause(libtrace_t *libtrace)
1836{
1837        libtrace_thread_t *t;
1838        int i;
1839        assert(libtrace);
1840
1841        t = get_thread_table(libtrace);
1842        // Check state from within the lock if we are going to change it
1843        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1844        if (!libtrace->started || libtrace->state != STATE_RUNNING) {
1845                fprintf(stderr, "pause failed started=%d state=%s (%d)\n", libtrace->started, get_trace_state_name(libtrace->state), libtrace->state);
1846                trace_set_err(libtrace,TRACE_ERR_BAD_STATE, "You must call trace_start() before calling trace_ppause()");
1847                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1848                return -1;
1849        }
1850
1851        libtrace_change_state(libtrace, STATE_PAUSING, false);
1852        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1853
1854        // Special case handle the hasher thread case
1855        if (trace_has_dedicated_hasher(libtrace)) {
1856                if (libtrace->config.debug_state)
1857                        fprintf(stderr, "Hasher thread is running, asking it to pause ...");
1858                libtrace_message_t message = {0};
1859                message.code = MESSAGE_DO_PAUSE;
1860                trace_send_message_to_thread(libtrace, &libtrace->hasher_thread, &message);
1861                // Wait for it to pause
1862                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1863                while (libtrace->hasher_thread.state == THREAD_RUNNING) {
1864                        ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
1865                }
1866                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1867                if (libtrace->config.debug_state)
1868                        fprintf(stderr, " DONE\n");
1869        }
1870
1871        if (libtrace->config.debug_state)
1872                fprintf(stderr, "Asking perpkt threads to pause ...");
1873        // Stop threads, skip this one if it's a perpkt
1874        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1875                if (&libtrace->perpkt_threads[i] != t) {
1876                        libtrace_message_t message = {0};
1877                        message.code = MESSAGE_DO_PAUSE;
1878                        trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message);
1879                        if(trace_has_dedicated_hasher(libtrace)) {
1880                                // The hasher has stopped and other threads have messages waiting therefore
1881                                // If the queues are empty the other threads would have no data
1882                                // So send some message packets to simply ask the threads to check
1883                                // We are the only writer since hasher has paused
1884                                libtrace_packet_t *pkt;
1885                                libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &pkt, 1, 1);
1886                                pkt->error = READ_MESSAGE;
1887                                libtrace_ringbuffer_write(&libtrace->perpkt_threads[i].rbuffer, pkt);
1888                        }
1889                } else {
1890                        fprintf(stderr, "Mapper threads should not be used to pause a trace this could cause any number of problems!!\n");
1891                }
1892        }
1893
1894        if (t) {
1895                // A perpkt is doing the pausing, interesting, fake an extra thread paused
1896                // We rely on the user to *not* return before starting the trace again
1897                thread_change_state(libtrace, t, THREAD_PAUSED, true);
1898        }
1899
1900        // Wait for all threads to pause
1901        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1902        while(libtrace->perpkt_thread_states[THREAD_RUNNING]) {
1903                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
1904        }
1905        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1906
1907        if (libtrace->config.debug_state)
1908                fprintf(stderr, " DONE\n");
1909
1910        // Deal with the reporter
1911        if (trace_has_dedicated_reporter(libtrace)) {
1912                if (libtrace->config.debug_state)
1913                        fprintf(stderr, "Reporter thread is running, asking it to pause ...");
1914                libtrace_message_t message = {0};
1915                message.code = MESSAGE_DO_PAUSE;
1916                trace_send_message_to_thread(libtrace, &libtrace->reporter_thread, &message);
1917                // Wait for it to pause
1918                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1919                while (libtrace->reporter_thread.state == THREAD_RUNNING) {
1920                        ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
1921                }
1922                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1923                if (libtrace->config.debug_state)
1924                        fprintf(stderr, " DONE\n");
1925        }
1926
1927        /* Cache values before we pause */
1928        if (libtrace->stats == NULL)
1929                libtrace->stats = trace_create_statistics();
1930        // Save the statistics against the trace
1931        trace_get_statistics(libtrace, NULL);
1932        if (trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace) && libtrace->perpkt_thread_count > 1) {
1933                libtrace->started = false;
1934                if (libtrace->format->ppause_input)
1935                        libtrace->format->ppause_input(libtrace);
1936                // TODO What happens if we don't have pause input??
1937        } else {
1938                int err;
1939                fprintf(stderr, "Trace is not parallel so we are doing a normal pause %s\n", libtrace->uridata);
1940                err = trace_pause(libtrace);
1941                // We should handle this a bit better
1942                if (err)
1943                        return err;
1944        }
1945
1946        // Only set as paused after the pause has been called on the trace
1947        libtrace_change_state(libtrace, STATE_PAUSED, true);
1948        return 0;
1949}
1950
1951/**
1952 * Stop trace finish prematurely as though it meet an EOF
1953 * This should only be called by the main thread
1954 * 1. Calls ppause
1955 * 2. Sends a message asking for threads to finish
1956 * 3. Releases threads which will pause
1957 */
1958DLLEXPORT int trace_pstop(libtrace_t *libtrace)
1959{
1960        int i, err;
1961        libtrace_message_t message = {0};
1962        assert(libtrace);
1963
1964        // Ensure all threads have paused and the underlying trace format has
1965        // been closed and all packets associated are cleaned up
1966        // Pause will do any state checks for us
1967        err = trace_ppause(libtrace);
1968        if (err)
1969                return err;
1970
1971        // Now send a message asking the threads to stop
1972        // This will be retrieved before trying to read another packet
1973
1974        message.code = MESSAGE_DO_STOP;
1975        trace_send_message_to_perpkts(libtrace, &message);
1976        if (trace_has_dedicated_hasher(libtrace))
1977                trace_send_message_to_thread(libtrace, &libtrace->hasher_thread, &message);
1978
1979        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1980                trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message);
1981        }
1982
1983        // Now release the threads and let them stop
1984        libtrace_change_state(libtrace, STATE_FINSHED, true);
1985        return 0;
1986}
1987
1988/**
1989 * Set the hasher type along with a selected function, if hardware supports
1990 * that generic type of hashing it will be used otherwise the supplied
1991 * hasher function will be used and passed data when called.
1992 *
1993 * @return 0 if successful otherwise -1 on error
1994 */
1995DLLEXPORT int trace_set_hasher(libtrace_t *trace, enum hasher_types type, fn_hasher hasher, void *data) {
1996        int ret = -1;
1997        if (type == HASHER_HARDWARE || (type == HASHER_CUSTOM && !hasher) || (type == HASHER_BALANCE && hasher)) {
1998                return -1;
1999        }
2000
2001        // Save the requirements
2002        trace->hasher_type = type;
2003        if (hasher) {
2004                trace->hasher = hasher;
2005                trace->hasher_data = data;
2006        } else {
2007                trace->hasher = NULL;
2008                trace->hasher_data = NULL;
2009        }
2010
2011        // Try push this to hardware - NOTE hardware could do custom if
2012        // there is a more efficient way to apply it, in this case
2013        // it will simply grab the function out of libtrace_t
2014        if (trace->format->pconfig_input)
2015                ret = trace->format->pconfig_input(trace, TRACE_OPTION_SET_HASHER, &type);
2016
2017        if (ret == -1) {
2018                // We have to deal with this ourself
2019                // This most likely means single threaded reading of the trace
2020                if (!hasher) {
2021                        switch (type)
2022                        {
2023                                case HASHER_CUSTOM:
2024                                case HASHER_BALANCE:
2025                                        return 0;
2026                                case HASHER_BIDIRECTIONAL:
2027                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
2028                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
2029                                        toeplitz_init_config(trace->hasher_data, 1);
2030                                        return 0;
2031                                case HASHER_UNIDIRECTIONAL:
2032                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
2033                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
2034                                        toeplitz_init_config(trace->hasher_data, 0);
2035                                        return 0;
2036                                case HASHER_HARDWARE:
2037                                        return -1;
2038                        }
2039                        return -1;
2040                }
2041        } else {
2042                // The hardware is dealing with this yay
2043                trace->hasher_type = HASHER_HARDWARE;
2044        }
2045
2046        return 0;
2047}
2048
2049// Waits for all threads to finish
2050DLLEXPORT void trace_join(libtrace_t *libtrace) {
2051        int i;
2052
2053        /* Firstly wait for the perpkt threads to finish, since these are
2054         * user controlled */
2055        for (i=0; i< libtrace->perpkt_thread_count; i++) {
2056                //printf("Waiting to join with perpkt #%d\n", i);
2057                ASSERT_RET(pthread_join(libtrace->perpkt_threads[i].tid, NULL), == 0);
2058                //printf("Joined with perpkt #%d\n", i);
2059                // So we must do our best effort to empty the queue - so
2060                // the producer (or any other threads) don't block.
2061                libtrace_packet_t * packet;
2062                assert(libtrace->perpkt_threads[i].state == THREAD_FINISHED);
2063                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
2064                        if (packet) // This could be NULL iff the perpkt finishes early
2065                                trace_destroy_packet(packet);
2066        }
2067
2068        /* Now the hasher */
2069        if (trace_has_dedicated_hasher(libtrace)) {
2070                pthread_join(libtrace->hasher_thread.tid, NULL);
2071                assert(libtrace->hasher_thread.state == THREAD_FINISHED);
2072        }
2073
2074        // Now that everything is finished nothing can be touching our
2075        // buffers so clean them up
2076        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2077                // Its possible 1 packet got added by the reporter (or 1 per any other thread) since we cleaned up
2078                // if they lost timeslice before-during a write
2079                libtrace_packet_t * packet;
2080                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
2081                        trace_destroy_packet(packet);
2082                if (libtrace->hasher) {
2083                        assert(libtrace_ringbuffer_is_empty(&libtrace->perpkt_threads[i].rbuffer));
2084                        libtrace_ringbuffer_destroy(&libtrace->perpkt_threads[i].rbuffer);
2085                }
2086                // Cannot destroy vector yet, this happens with trace_destroy
2087        }
2088        // TODO consider perpkt threads marking trace as finished before join is called
2089        libtrace_change_state(libtrace, STATE_FINSHED, true);
2090
2091        if (trace_has_dedicated_reporter(libtrace)) {
2092                pthread_join(libtrace->reporter_thread.tid, NULL);
2093                assert(libtrace->reporter_thread.state == THREAD_FINISHED);
2094        }
2095
2096        // Wait for the tick (keepalive) thread if it has been started
2097        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
2098                libtrace_message_t msg = {0};
2099                msg.code = MESSAGE_DO_STOP;
2100                trace_send_message_to_thread(libtrace, &libtrace->keepalive_thread, &msg);
2101                pthread_join(libtrace->keepalive_thread.tid, NULL);
2102        }
2103
2104        libtrace_change_state(libtrace, STATE_JOINED, true);
2105        print_memory_stats();
2106}
2107
2108DLLEXPORT int libtrace_thread_get_message_count(libtrace_t * libtrace)
2109{
2110        libtrace_thread_t * t = get_thread_descriptor(libtrace);
2111        assert(t);
2112        return libtrace_message_queue_count(&t->messages);
2113}
2114
2115DLLEXPORT int libtrace_thread_get_message(libtrace_t * libtrace, libtrace_message_t * message)
2116{
2117        libtrace_thread_t * t = get_thread_descriptor(libtrace);
2118        assert(t);
2119        return libtrace_message_queue_get(&t->messages, message);
2120}
2121
2122DLLEXPORT int libtrace_thread_try_get_message(libtrace_t * libtrace, libtrace_message_t * message)
2123{
2124        libtrace_thread_t * t = get_thread_descriptor(libtrace);
2125        assert(t);
2126        return libtrace_message_queue_try_get(&t->messages, message);
2127}
2128
2129/**
2130 * Return backlog indicator
2131 */
2132DLLEXPORT int trace_post_reporter(libtrace_t *libtrace)
2133{
2134        libtrace_message_t message = {0};
2135        message.code = MESSAGE_POST_REPORTER;
2136        message.sender = get_thread_descriptor(libtrace);
2137        return libtrace_message_queue_put(&libtrace->reporter_thread.messages, (void *) &message);
2138}
2139
2140/**
2141 * Return backlog indicator
2142 */
2143DLLEXPORT int trace_send_message_to_reporter(libtrace_t * libtrace, libtrace_message_t * message)
2144{
2145        //printf("Sending message code=%d to reporter\n", message->code);
2146        message->sender = get_thread_descriptor(libtrace);
2147        return libtrace_message_queue_put(&libtrace->reporter_thread.messages, message);
2148}
2149
2150/**
2151 *
2152 */
2153DLLEXPORT int trace_send_message_to_thread(libtrace_t * libtrace, libtrace_thread_t *t, libtrace_message_t * message)
2154{
2155        //printf("Sending message code=%d to reporter\n", message->code);
2156        message->sender = get_thread_descriptor(libtrace);
2157        return libtrace_message_queue_put(&t->messages, message);
2158}
2159
2160DLLEXPORT int trace_send_message_to_perpkts(libtrace_t * libtrace, libtrace_message_t * message)
2161{
2162        int i;
2163        message->sender = get_thread_descriptor(libtrace);
2164        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2165                libtrace_message_queue_put(&libtrace->perpkt_threads[i].messages, message);
2166        }
2167        //printf("Sending message code=%d to reporter\n", message->code);
2168        return 0;
2169}
2170
2171DLLEXPORT void libtrace_result_set_key(libtrace_result_t * result, uint64_t key) {
2172        result->key = key;
2173}
2174DLLEXPORT uint64_t libtrace_result_get_key(libtrace_result_t * result) {
2175        return result->key;
2176}
2177DLLEXPORT void libtrace_result_set_value(libtrace_result_t * result, libtrace_generic_types_t value) {
2178        result->value = value;
2179}
2180DLLEXPORT libtrace_generic_types_t libtrace_result_get_value(libtrace_result_t * result) {
2181        return result->value;
2182}
2183DLLEXPORT void libtrace_result_set_key_value(libtrace_result_t * result, uint64_t key, libtrace_generic_types_t value) {
2184        result->key = key;
2185        result->value = value;
2186}
2187DLLEXPORT void trace_destroy_result(libtrace_result_t ** result) {
2188        free(*result);
2189        result = NULL;
2190        // TODO automatically back with a free list!!
2191}
2192
2193DLLEXPORT void * trace_get_global(libtrace_t *trace)
2194{
2195        return trace->global_blob;
2196}
2197
2198DLLEXPORT void * trace_set_global(libtrace_t *trace, void * data)
2199{
2200        if (trace->global_blob && trace->global_blob != data) {
2201                void * ret = trace->global_blob;
2202                trace->global_blob = data;
2203                return ret;
2204        } else {
2205                trace->global_blob = data;
2206                return NULL;
2207        }
2208}
2209
2210DLLEXPORT void * trace_get_tls(libtrace_thread_t *t)
2211{
2212        return t->user_data;
2213}
2214
2215DLLEXPORT void * trace_set_tls(libtrace_thread_t *t, void * data)
2216{
2217        if(t->user_data && t->user_data != data) {
2218                void *ret = t->user_data;
2219                t->user_data = data;
2220                return ret;
2221        } else {
2222                t->user_data = data;
2223                return NULL;
2224        }
2225}
2226
2227/**
2228 * Publishes a result to the reduce queue
2229 * Should only be called by a perpkt thread, i.e. from a perpkt handler
2230 */
2231DLLEXPORT void trace_publish_result(libtrace_t *libtrace, libtrace_thread_t *t, uint64_t key, libtrace_generic_types_t value, int type) {
2232        libtrace_result_t res;
2233        res.type = type;
2234        res.key = key;
2235        res.value = value;
2236        assert(libtrace->combiner.publish);
2237        libtrace->combiner.publish(libtrace, t->perpkt_num, &libtrace->combiner, &res);
2238        return;
2239}
2240
2241/**
2242 * Sets a combiner function against the trace.
2243 */
2244DLLEXPORT void trace_set_combiner(libtrace_t *trace, const libtrace_combine_t *combiner, libtrace_generic_types_t config){
2245        if (combiner) {
2246                trace->combiner = *combiner;
2247                trace->combiner.configuration = config;
2248        } else {
2249                // No combiner, so don't try use it
2250                memset(&trace->combiner, 0, sizeof(trace->combiner));
2251        }
2252}
2253
2254DLLEXPORT uint64_t trace_packet_get_order(libtrace_packet_t * packet) {
2255        return packet->order;
2256}
2257
2258DLLEXPORT uint64_t trace_packet_get_hash(libtrace_packet_t * packet) {
2259        return packet->hash;
2260}
2261
2262DLLEXPORT void trace_packet_set_order(libtrace_packet_t * packet, uint64_t order) {
2263        packet->order = order;
2264}
2265
2266DLLEXPORT void trace_packet_set_hash(libtrace_packet_t * packet, uint64_t hash) {
2267        packet->hash = hash;
2268}
2269
2270DLLEXPORT int trace_finished(libtrace_t * libtrace) {
2271        // TODO I don't like using this so much, we could use state!!!
2272        return libtrace->perpkt_thread_states[THREAD_FINISHED] == libtrace->perpkt_thread_count;
2273}
2274
2275DLLEXPORT int trace_parallel_config(libtrace_t *libtrace, trace_parallel_option_t option, void *value)
2276{
2277        UNUSED int ret = -1;
2278        switch (option) {
2279                case TRACE_OPTION_TICK_INTERVAL:
2280                        libtrace->config.tick_interval = *((int *) value);
2281                        return 1;
2282                case TRACE_OPTION_SET_HASHER:
2283                        return trace_set_hasher(libtrace, (enum hasher_types) *((int *) value), NULL, NULL);
2284                case TRACE_OPTION_SET_PERPKT_THREAD_COUNT:
2285                        libtrace->config.perpkt_threads = *((int *) value);
2286                        return 1;
2287                case TRACE_OPTION_TRACETIME:
2288                        if(*((int *) value))
2289                                libtrace->tracetime = 1;
2290                        else
2291                                libtrace->tracetime = 0;
2292                        return 0;
2293                case TRACE_OPTION_SET_CONFIG:
2294                        libtrace->config = *((struct user_configuration *) value);
2295                case TRACE_OPTION_GET_CONFIG:
2296                        *((struct user_configuration *) value) = libtrace->config;
2297        }
2298        return 0;
2299}
2300
2301static bool config_bool_parse(char *value, size_t nvalue) {
2302        if (strncmp(value, "true", nvalue) == 0)
2303                return true;
2304        else if (strncmp(value, "false", nvalue) == 0)
2305                return false;
2306        else
2307                return strtoll(value, NULL, 10) != 0;
2308}
2309
2310static void config_string(struct user_configuration *uc, char *key, size_t nkey, char *value, size_t nvalue) {
2311        assert(key);
2312        assert(value);
2313        assert(uc);
2314        if (strncmp(key, "packet_cache_size", nkey) == 0
2315            || strncmp(key, "pcs", nkey) == 0) {
2316                uc->packet_cache_size = strtoll(value, NULL, 10);
2317        } else if (strncmp(key, "packet_thread_cache_size", nkey) == 0
2318                   || strncmp(key, "ptcs", nkey) == 0) {
2319                uc->packet_thread_cache_size = strtoll(value, NULL, 10);
2320        } else if (strncmp(key, "fixed_packet_count", nkey) == 0
2321                   || strncmp(key, "fpc", nkey) == 0) {
2322                uc->fixed_packet_count = config_bool_parse(value, nvalue);
2323        } else if (strncmp(key, "burst_size", nkey) == 0
2324                   || strncmp(key, "bs", nkey) == 0) {
2325                uc->burst_size = strtoll(value, NULL, 10);
2326        } else if (strncmp(key, "tick_interval", nkey) == 0
2327                   || strncmp(key, "ti", nkey) == 0) {
2328                uc->tick_interval = strtoll(value, NULL, 10);
2329        } else if (strncmp(key, "tick_count", nkey) == 0
2330                   || strncmp(key, "tc", nkey) == 0) {
2331                uc->tick_count = strtoll(value, NULL, 10);
2332        } else if (strncmp(key, "perpkt_threads", nkey) == 0
2333                   || strncmp(key, "pt", nkey) == 0) {
2334                uc->perpkt_threads = strtoll(value, NULL, 10);
2335        } else if (strncmp(key, "hasher_queue_size", nkey) == 0
2336                   || strncmp(key, "hqs", nkey) == 0) {
2337                uc->hasher_queue_size = strtoll(value, NULL, 10);
2338        } else if (strncmp(key, "hasher_polling", nkey) == 0
2339                   || strncmp(key, "hp", nkey) == 0) {
2340                uc->hasher_polling = config_bool_parse(value, nvalue);
2341        } else if (strncmp(key, "reporter_polling", nkey) == 0
2342                   || strncmp(key, "rp", nkey) == 0) {
2343                uc->reporter_polling = config_bool_parse(value, nvalue);
2344        } else if (strncmp(key, "reporter_thold", nkey) == 0
2345                   || strncmp(key, "rt", nkey) == 0) {
2346                uc->reporter_thold = strtoll(value, NULL, 10);
2347        } else if (strncmp(key, "debug_state", nkey) == 0
2348                   || strncmp(key, "ds", nkey) == 0) {
2349                uc->debug_state = config_bool_parse(value, nvalue);
2350        } else {
2351                fprintf(stderr, "No matching value %s(=%s)\n", key, value);
2352        }
2353}
2354
2355DLLEXPORT void parse_user_config(struct user_configuration* uc, char * str) {
2356        char *pch;
2357        char key[100];
2358        char value[100];
2359        assert(str);
2360        assert(uc);
2361        pch = strtok (str," ,.-");
2362        while (pch != NULL)
2363        {
2364                if (sscanf(pch, "%99[^=]=%99s", key, value) == 2) {
2365                        config_string(uc, key, sizeof(key), value, sizeof(value));
2366                } else {
2367                        fprintf(stderr, "Error parsing %s\n", pch);
2368                }
2369                pch = strtok (NULL," ,.-");
2370        }
2371}
2372
2373DLLEXPORT void parse_user_config_file(struct user_configuration* uc, FILE *file) {
2374        char line[1024];
2375        while (fgets(line, sizeof(line), file) != NULL)
2376        {
2377                parse_user_config(uc, line);
2378        }
2379}
2380
2381DLLEXPORT libtrace_packet_t* trace_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet) {
2382        libtrace_packet_t* result;
2383        libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &result, 1, 1);
2384        assert(result);
2385        swap_packets(result, packet); // Move the current packet into our copy
2386        return result;
2387}
2388
2389DLLEXPORT void trace_free_result_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
2390        // Try write back the packet
2391        assert(packet);
2392        // Always release any resources this might be holding such as a slot in a ringbuffer
2393        trace_fin_packet(packet);
2394        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packet, 1, 1);
2395}
2396
2397DLLEXPORT libtrace_info_t *trace_get_information(libtrace_t * libtrace) {
2398        if (libtrace->format)
2399                return &libtrace->format->info;
2400        else
2401                return NULL;
2402}
Note: See TracBrowser for help on using the repository browser.