source: lib/trace_parallel.c @ 5b4d121

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 5b4d121 was 5b4d121, checked in by Richard Sanger <rsangerarj@…>, 7 years ago

Adds a configuration parser to make it easy to change the parallel configuration.
Adds more configuration options (Tidies some verbose debugging output).
Implements tick packets for the hasher thread case.
Some other minor bug fixes

  • Property mode set to 100644
File size: 80.7 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * All rights reserved.
8 *
9 * This code has been developed by the University of Waikato WAND
10 * research group. For further information please see http://www.wand.net.nz/
11 *
12 * libtrace is free software; you can redistribute it and/or modify
13 * it under the terms of the GNU General Public License as published by
14 * the Free Software Foundation; either version 2 of the License, or
15 * (at your option) any later version.
16 *
17 * libtrace is distributed in the hope that it will be useful,
18 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 * GNU General Public License for more details.
21 *
22 * You should have received a copy of the GNU General Public License
23 * along with libtrace; if not, write to the Free Software
24 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25 *
26 * $Id$
27 *
28 */
29
30
31#define _GNU_SOURCE
32#include "common.h"
33#include "config.h"
34#include <assert.h>
35#include <errno.h>
36#include <fcntl.h>
37#include <stdio.h>
38#include <stdlib.h>
39#include <string.h>
40#include <sys/stat.h>
41#include <sys/types.h>
42#ifndef WIN32
43#include <sys/socket.h>
44#endif
45#include <stdarg.h>
46#include <sys/param.h>
47
48#ifdef HAVE_LIMITS_H
49#  include <limits.h>
50#endif
51
52#ifdef HAVE_SYS_LIMITS_H
53#  include <sys/limits.h>
54#endif
55
56#ifdef HAVE_NET_IF_ARP_H
57#  include <net/if_arp.h>
58#endif
59
60#ifdef HAVE_NET_IF_H
61#  include <net/if.h>
62#endif
63
64#ifdef HAVE_NETINET_IN_H
65#  include <netinet/in.h>
66#endif
67
68#ifdef HAVE_NET_ETHERNET_H
69#  include <net/ethernet.h>
70#endif
71
72#ifdef HAVE_NETINET_IF_ETHER_H
73#  include <netinet/if_ether.h>
74#endif
75
76#include <time.h>
77#ifdef WIN32
78#include <sys/timeb.h>
79#endif
80
81#include "libtrace.h"
82#include "libtrace_int.h"
83
84#ifdef HAVE_PCAP_BPF_H
85#  include <pcap-bpf.h>
86#else
87#  ifdef HAVE_NET_BPF_H
88#    include <net/bpf.h>
89#  endif
90#endif
91
92
93#include "libtrace_int.h"
94#include "format_helper.h"
95#include "rt_protocol.h"
96#include "hash_toeplitz.h"
97
98#include <pthread.h>
99#include <signal.h>
100#include <unistd.h>
101
102
103static size_t trace_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packets[], size_t nb_packets);
104
105extern int libtrace_parallel;
106
107struct multithreading_stats {
108        uint64_t full_queue_hits;
109        uint64_t wait_for_fill_complete_hits;
110} contention_stats[1024];
111
112struct mem_stats {
113        struct memfail {
114           uint64_t cache_hit;
115           uint64_t ring_hit;
116           uint64_t miss;
117           uint64_t recycled;
118        } readbulk, read, write, writebulk;
119};
120
121// Grrr gcc wants this spelt out
122__thread struct mem_stats mem_hits = {{0},{0},{0},{0}};
123
124static void print_memory_stats() {
125        char t_name[50];
126        uint64_t total;
127        pthread_getname_np(pthread_self(), t_name, sizeof(t_name));
128
129        fprintf(stderr, "Thread ID#%d - %s\n", (int) pthread_self(), t_name);
130
131        total = mem_hits.read.cache_hit + mem_hits.read.ring_hit + mem_hits.read.miss;
132        if (total) {
133                fprintf(stderr, "\tRead:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
134                                mem_hits.read.cache_hit, mem_hits.read.ring_hit, mem_hits.read.miss, mem_hits.read.recycled);
135                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
136                                total, (double) mem_hits.read.miss / (double) total * 100.0);
137        }
138
139        total = mem_hits.readbulk.cache_hit + mem_hits.readbulk.ring_hit + mem_hits.readbulk.miss;
140        if (total) {
141                fprintf(stderr, "\tReadbulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
142                                mem_hits.readbulk.cache_hit, mem_hits.readbulk.ring_hit, mem_hits.readbulk.miss, mem_hits.readbulk.recycled);
143
144
145                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
146                                total, (double) mem_hits.readbulk.miss / (double) total * 100.0);
147        }
148
149        total = mem_hits.write.cache_hit + mem_hits.write.ring_hit + mem_hits.write.miss;
150        if (total) {
151                fprintf(stderr, "\tWrite:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
152                                mem_hits.write.cache_hit, mem_hits.write.ring_hit, mem_hits.write.miss, mem_hits.write.recycled);
153
154                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
155                                total, (double) mem_hits.write.miss / (double) total * 100.0);
156        }
157
158        total = mem_hits.writebulk.cache_hit + mem_hits.writebulk.ring_hit + mem_hits.writebulk.miss;
159        if (total) {
160                fprintf(stderr, "\tWritebulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
161                                mem_hits.writebulk.cache_hit, mem_hits.writebulk.ring_hit, mem_hits.writebulk.miss, mem_hits.writebulk.recycled);
162
163                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
164                                total, (double) mem_hits.writebulk.miss / (double) total * 100.0);
165        }
166
167}
168
169/**
170 * True if the trace has dedicated hasher thread otherwise false,
171 * to be used after the trace is running
172 */
173static inline int trace_has_dedicated_hasher(libtrace_t * libtrace)
174{
175        assert(libtrace->state != STATE_NEW);
176        return libtrace->hasher_thread.type == THREAD_HASHER;
177}
178
179/**
180 * True if the trace has dedicated hasher thread otherwise false,
181 * to be used after the trace is running
182 */
183static inline int trace_has_dedicated_reporter(libtrace_t * libtrace)
184{
185        assert(libtrace->state != STATE_NEW);
186        return libtrace->reporter_thread.type == THREAD_REPORTER && libtrace->reporter;
187}
188
189/**
190 * Changes a thread's state and broadcasts the condition variable. This
191 * should always be done when the lock is held.
192 *
193 * Additionally for perpkt threads the state counts are updated.
194 *
195 * @param trace A pointer to the trace
196 * @param t A pointer to the thread to modify
197 * @param new_state The new state of the thread
198 * @param need_lock Set to true if libtrace_lock is not held, otherwise
199 *        false in the case the lock is currently held by this thread.
200 */
201static inline void thread_change_state(libtrace_t *trace, libtrace_thread_t *t,
202        const enum thread_states new_state, const bool need_lock)
203{
204        enum thread_states prev_state;
205        if (need_lock)
206                pthread_mutex_lock(&trace->libtrace_lock);
207        prev_state = t->state;
208        t->state = new_state;
209        if (t->type == THREAD_PERPKT) {
210                --trace->perpkt_thread_states[prev_state];
211                ++trace->perpkt_thread_states[new_state];
212        }
213
214        if (trace->config.debug_state)
215                fprintf(stderr, "Thread %d state changed from %d to %d\n", (int) t->tid,
216                        prev_state, t->state);
217
218        if (need_lock)
219                pthread_mutex_unlock(&trace->libtrace_lock);
220        pthread_cond_broadcast(&trace->perpkt_cond);
221}
222
223/**
224 * Changes the overall traces state and signals the condition.
225 *
226 * @param trace A pointer to the trace
227 * @param new_state The new state of the trace
228 * @param need_lock Set to true if libtrace_lock is not held, otherwise
229 *        false in the case the lock is currently held by this thread.
230 */
231static inline void libtrace_change_state(libtrace_t *trace, 
232        const enum trace_state new_state, const bool need_lock)
233{
234        UNUSED enum trace_state prev_state;
235        if (need_lock)
236                pthread_mutex_lock(&trace->libtrace_lock);
237        prev_state = trace->state;
238        trace->state = new_state;
239
240        if (trace->config.debug_state)
241                fprintf(stderr, "Trace(%s) state changed from %s to %s\n",
242                        trace->uridata, get_trace_state_name(prev_state),
243                        get_trace_state_name(trace->state));
244
245        if (need_lock)
246                pthread_mutex_unlock(&trace->libtrace_lock);
247        pthread_cond_broadcast(&trace->perpkt_cond);
248}
249
250/**
251 * @return True if the format supports parallel threads.
252 */
253static inline bool trace_supports_parallel(libtrace_t *trace)
254{
255        assert(trace);
256        assert(trace->format);
257        if (trace->format->pstart_input)
258                return true;
259        else
260                return false;
261}
262
263DLLEXPORT void print_contention_stats(libtrace_t *libtrace) {
264        int i;
265        struct multithreading_stats totals = {0};
266        for (i = 0; i < libtrace->perpkt_thread_count ; i++) {
267                fprintf(stderr, "\nStats for perpkt thread#%d\n", i);
268                fprintf(stderr, "\tfull_queue_hits: %"PRIu64"\n", contention_stats[i].full_queue_hits);
269                totals.full_queue_hits += contention_stats[i].full_queue_hits;
270                fprintf(stderr, "\twait_for_fill_complete_hits: %"PRIu64"\n", contention_stats[i].wait_for_fill_complete_hits);
271                totals.wait_for_fill_complete_hits += contention_stats[i].wait_for_fill_complete_hits;
272        }
273        fprintf(stderr, "\nTotals for perpkt threads\n");
274        fprintf(stderr, "\tfull_queue_hits: %"PRIu64"\n", totals.full_queue_hits);
275        fprintf(stderr, "\twait_for_fill_complete_hits: %"PRIu64"\n", totals.wait_for_fill_complete_hits);
276
277        return;
278}
279
280void libtrace_zero_thread(libtrace_thread_t * t) {
281        t->trace = NULL;
282        t->ret = NULL;
283        t->type = THREAD_EMPTY;
284        libtrace_zero_ringbuffer(&t->rbuffer);
285        libtrace_zero_vector(&t->vector);
286        libtrace_zero_deque(&t->deque);
287        t->recorded_first = false;
288        t->perpkt_num = -1;
289        t->accepted_packets = 0;
290}
291
292// Ints are aligned int is atomic so safe to read and write at same time
293// However write must be locked, read doesn't (We never try read before written to table)
294libtrace_thread_t * get_thread_table(libtrace_t *libtrace) {
295        int i = 0;
296        pthread_t tid = pthread_self();
297
298        for (;i<libtrace->perpkt_thread_count ;++i) {
299                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
300                        return &libtrace->perpkt_threads[i];
301        }
302        return NULL;
303}
304
305int get_thread_table_num(libtrace_t *libtrace) {
306        int i = 0;
307        pthread_t tid = pthread_self();
308        for (;i<libtrace->perpkt_thread_count; ++i) {
309                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
310                        return i;
311        }
312        return -1;
313}
314
315static libtrace_thread_t * get_thread_descriptor(libtrace_t *libtrace) {
316        libtrace_thread_t *ret;
317        if (!(ret = get_thread_table(libtrace))) {
318                pthread_t tid = pthread_self();
319                // Check if we are reporter or something else
320                if (pthread_equal(tid, libtrace->reporter_thread.tid))
321                        ret = &libtrace->reporter_thread;
322                else if (pthread_equal(tid, libtrace->hasher_thread.tid))
323                        ret = &libtrace->hasher_thread;
324                else
325                        ret = NULL;
326        }
327        return ret;
328}
329
330/** Used below in trace_make_results_packets_safe*/
331static void do_copy_result_packet(void *data)
332{
333        libtrace_result_t *res = (libtrace_result_t *)data;
334        if (res->type == RESULT_PACKET) {
335                // Duplicate the packet in standard malloc'd memory and free the
336                // original, This is a 1:1 exchange so is ocache count remains unchanged.
337                libtrace_packet_t *oldpkt, *dup;
338                oldpkt = (libtrace_packet_t *) res->value;
339                dup = trace_copy_packet(oldpkt);
340                res->value = (void *)dup;
341                trace_destroy_packet(oldpkt);
342        }
343}
344
345/**
346 * Make a safe replacement copy of any result packets that are owned
347 * by the format in the result queue. Used when pausing traces.
348 */ 
349static void trace_make_results_packets_safe(libtrace_t *trace) {
350        libtrace_thread_t *t = get_thread_descriptor(trace);
351        if (trace->reporter_flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED))
352                libtrace_deque_apply_function(&t->deque, &do_copy_result_packet);
353        else 
354                libtrace_vector_apply_function(&t->vector, &do_copy_result_packet);
355}
356
357/**
358 * Holds threads in a paused state, until released by broadcasting
359 * the condition mutex.
360 */
361static void trace_thread_pause(libtrace_t *trace, libtrace_thread_t *t) {
362        if (t->type == THREAD_PERPKT)
363                trace_make_results_packets_safe(trace);
364        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
365        thread_change_state(trace, t, THREAD_PAUSED, false);
366        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
367                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
368        }
369        thread_change_state(trace, t, THREAD_RUNNING, false);
370        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
371}
372
373
374
375/**
376 * Dispatches packets to their correct place and applies any translations
377 * as needed
378 * @param trace
379 * @param t
380 * @param packet (in, out) this will be set to NULL if the user doesn't return the packet for reuse
381 * @return -1 if an error or EOF has occured and the trace should end otherwise 0 to continue as normal
382 */
383static inline int dispatch_packets(libtrace_t *trace, libtrace_thread_t *t, libtrace_packet_t **packets,
384                                   size_t nb_packets) {
385        libtrace_message_t message;
386        size_t i;
387        for (i = 0; i < nb_packets; ++i) {
388                if (packets[i]->error > 0) {
389                        packets[i] = (*trace->per_pkt)(trace, packets[i], NULL, t);
390                } else if (packets[i]->error == READ_TICK) {
391                        message.code = MESSAGE_TICK;
392                        message.additional.uint64 = trace_packet_get_order(packets[i]);
393                        message.sender = t;
394                        (*trace->per_pkt)(trace, NULL, &message, t);
395                } else if (packets[i]->error != READ_MESSAGE) {
396                        // An error this should be the last packet we read
397                        size_t z;
398                        // We could have an eof or error and a message such as pause
399                        for (z = i ; z < nb_packets; ++z) {
400                                fprintf(stderr, "i=%d nb_packet=%d err=%d\n", (int) z, (int) nb_packets, packets[z]->error);
401                                assert (packets[z]->error <= 0);
402                        }
403                        return -1;
404                }
405                // -2 is a message its not worth checking now just finish this lot and we'll check
406                // when we loop next
407        }
408        return 0;
409}
410
411/**
412 * The is the entry point for our packet processing threads.
413 */
414static void* perpkt_threads_entry(void *data) {
415        libtrace_t *trace = (libtrace_t *)data;
416        libtrace_thread_t * t;
417        libtrace_message_t message = {0};
418        libtrace_packet_t *packets[trace->config.burst_size];
419        size_t nb_packets;
420        size_t i;
421
422        memset(&packets, 0, sizeof(void*) * trace->config.burst_size);
423        // Force this thread to wait until trace_pstart has been completed
424        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
425        t = get_thread_table(trace);
426        assert(t);
427        //printf("Yay Started perpkt thread #%d\n", (int) get_thread_table_num(trace));
428        if (trace->format->pregister_thread) {
429                trace->format->pregister_thread(trace, t, !trace_has_dedicated_hasher(trace));
430        }
431        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
432
433        /* ~~~~~~~~~~~ Setup complete now we loop ~~~~~~~~~~~~~~~ */
434        // Send a message to say we've started
435
436        // Let the per_packet function know we have started
437        message.code = MESSAGE_STARTING;
438        message.sender = t;
439        (*trace->per_pkt)(trace, NULL, &message, t);
440        message.code = MESSAGE_RESUMING;
441        (*trace->per_pkt)(trace, NULL, &message, t);
442
443
444        for (;;) {
445
446                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
447                        switch (message.code) {
448                                case MESSAGE_DO_PAUSE: // This is internal
449                                        // Send message to say we are pausing, TODO consider sender
450                                        message.code = MESSAGE_PAUSING;
451                                        message.sender = t;
452                                        (*trace->per_pkt)(trace, NULL, &message, t);
453                                        // If a hasher thread is running empty input queues so we don't lose data
454                                        if (trace_has_dedicated_hasher(trace)) {
455                                                fprintf(stderr, "Trace is using a hasher thread emptying queues\n");
456                                                // The hasher has stopped by this point, so the queue shouldn't be filling
457                                                while(!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
458                                                        ASSERT_RET(trace_pread_packet(trace, t, packets, 1), == 1);
459                                                        if (dispatch_packets(trace, t, packets, 1) == -1) {
460                                                                // EOF or error, either way we'll stop
461                                                                while (!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
462                                                                        ASSERT_RET(trace_pread_packet(trace, t, packets, 1), == 1);
463                                                                        // No packets after this should have any data in them
464                                                                        assert(packets[0]->error <= 0);
465                                                                }
466                                                                goto stop;
467                                                        }
468                                                }
469                                        }
470                                        // Now we do the actual pause, this returns when we are done
471                                        trace_thread_pause(trace, t);
472                                        message.code = MESSAGE_RESUMING;
473                                        (*trace->per_pkt)(trace, NULL, &message, t);
474                                        // Check for new messages as soon as we return
475                                        continue;
476                                case MESSAGE_DO_STOP: // This is internal
477                                        goto stop;
478                        }
479                        (*trace->per_pkt)(trace, NULL, &message, t);
480                        continue;
481                }
482
483                if (trace->perpkt_thread_count == 1) {
484                        if (!packets[0]) {
485                                libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packets[0], 1, 1);
486                        }
487                        assert(packets[0]);
488                        packets[0]->error = trace_read_packet(trace, packets[0]);
489                        nb_packets = 1;
490                } else {
491                        nb_packets = trace_pread_packet(trace, t, packets, trace->config.burst_size);
492                }
493                // Loop through the packets we just read
494                if (dispatch_packets(trace, t, packets, nb_packets) == -1)
495                        break;
496        }
497
498
499stop:
500        /* ~~~~~~~~~~~~~~ Trace is finished do tear down ~~~~~~~~~~~~~~~~~~~~~ */
501
502        // Let the per_packet function know we have stopped
503        message.code = MESSAGE_PAUSING;
504        message.sender = t;
505        (*trace->per_pkt)(trace, NULL, &message, t);
506        message.code = MESSAGE_STOPPING;
507        message.additional.uint64 = 0;
508        (*trace->per_pkt)(trace, NULL, &message, t);
509
510        // Free any remaining packets
511        for (i = 0; i < trace->config.burst_size; i++) {
512                if (packets[i]) {
513                        libtrace_ocache_free(&trace->packet_freelist, (void **) &packets[i], 1, 1);
514                        packets[i] = NULL;
515                }
516        }
517
518       
519        thread_change_state(trace, t, THREAD_FINISHED, true);
520
521        // Notify only after we've defiantly set the state to finished
522        message.code = MESSAGE_PERPKT_ENDED;
523        message.additional.uint64 = 0;
524        trace_send_message_to_reporter(trace, &message);
525
526        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
527        if (trace->format->punregister_thread) {
528                trace->format->punregister_thread(trace, t);
529        }
530        print_memory_stats();
531
532        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
533
534        pthread_exit(NULL);
535};
536
537/**
538 * The start point for our single threaded hasher thread, this will read
539 * and hash a packet from a data source and queue it against the correct
540 * core to process it.
541 */
542static void* hasher_entry(void *data) {
543        libtrace_t *trace = (libtrace_t *)data;
544        libtrace_thread_t * t;
545        int i;
546        libtrace_packet_t * packet;
547        libtrace_message_t message = {0};
548
549        assert(trace_has_dedicated_hasher(trace));
550        /* Wait until all threads are started and objects are initialised (ring buffers) */
551        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
552        t = &trace->hasher_thread;
553        assert(t->type == THREAD_HASHER && pthread_equal(pthread_self(), t->tid));
554        printf("Hasher Thread started\n");
555        if (trace->format->pregister_thread) {
556                trace->format->pregister_thread(trace, t, true);
557        }
558        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
559        int pkt_skipped = 0;
560        /* Read all packets in then hash and queue against the correct thread */
561        while (1) {
562                int thread;
563                if (!pkt_skipped)
564                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
565                assert(packet);
566
567                if (libtrace_halt) // Signal to die has been sent - TODO
568                        break;
569
570                // Check for messages that we expect MESSAGE_DO_PAUSE, (internal messages only)
571                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
572                        switch(message.code) {
573                                case MESSAGE_DO_PAUSE:
574                                        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
575                                        thread_change_state(trace, t, THREAD_PAUSED, false);
576                                        pthread_cond_broadcast(&trace->perpkt_cond);
577                                        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
578                                                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
579                                        }
580                                        thread_change_state(trace, t, THREAD_RUNNING, false);
581                                        pthread_cond_broadcast(&trace->perpkt_cond);
582                                        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
583                                        break;
584                                case MESSAGE_DO_STOP:
585                                        // Stop called after pause
586                                        assert(trace->started == false);
587                                        assert(trace->state == STATE_FINSHED);
588                                        break;
589                                default:
590                                        fprintf(stderr, "Hasher thread didn't expect message code=%d\n", message.code);
591                        }
592                        pkt_skipped = 1;
593                        continue;
594                }
595
596                if ((packet->error = trace_read_packet(trace, packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
597                        break; /* We are EOF or error'd either way we stop  */
598                }
599
600                /* We are guaranteed to have a hash function i.e. != NULL */
601                trace_packet_set_hash(packet, (*trace->hasher)(packet, trace->hasher_data));
602                thread = trace_packet_get_hash(packet) % trace->perpkt_thread_count;
603                /* Blocking write to the correct queue - I'm the only writer */
604                if (trace->perpkt_threads[thread].state != THREAD_FINISHED) {
605                        uint64_t order = trace_packet_get_order(packet);
606                        libtrace_ringbuffer_write(&trace->perpkt_threads[thread].rbuffer, packet);
607                        if (trace->config.tick_count && order % trace->config.tick_count == 0) {
608                                // Write ticks to everyone else
609                                libtrace_packet_t * pkts[trace->perpkt_thread_count];
610                                memset(pkts, 0, sizeof(void *) * trace->perpkt_thread_count);
611                                libtrace_ocache_alloc(&trace->packet_freelist, (void **) pkts, trace->perpkt_thread_count, trace->perpkt_thread_count);
612                                for (i = 0; i < trace->perpkt_thread_count; i++) {
613                                        pkts[i]->error = READ_TICK;
614                                        trace_packet_set_order(pkts[i], order);
615                                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, pkts[i]);
616                                }
617                        }
618                        pkt_skipped = 0;
619                } else {
620                        assert(!"Dropping a packet!!");
621                        pkt_skipped = 1; // Reuse that packet no one read it
622                }
623        }
624
625        /* Broadcast our last failed read to all threads */
626        for (i = 0; i < trace->perpkt_thread_count; i++) {
627                libtrace_packet_t * bcast;
628                fprintf(stderr, "Broadcasting error/EOF now the trace is over\n");
629                if (i == trace->perpkt_thread_count - 1) {
630                        bcast = packet;
631                } else {
632                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &bcast, 1, 1);
633                        bcast->error = packet->error;
634                }
635                ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
636                if (trace->perpkt_threads[i].state != THREAD_FINISHED) {
637                        // Unlock early otherwise we could deadlock
638                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, bcast);
639                        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
640                } else {
641                        fprintf(stderr, "SKIPPING THREAD !!!%d!!!/n", (int) i);
642                        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
643                }
644        }
645
646        // We don't need to free the packet
647        thread_change_state(trace, t, THREAD_FINISHED, true);
648
649        // Notify only after we've defiantly set the state to finished
650        message.code = MESSAGE_PERPKT_ENDED;
651        message.additional.uint64 = 0;
652        trace_send_message_to_reporter(trace, &message);
653        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
654        if (trace->format->punregister_thread) {
655                trace->format->punregister_thread(trace, t);
656        }
657        print_memory_stats();
658        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
659
660        // TODO remove from TTABLE t sometime
661        pthread_exit(NULL);
662};
663
664/**
665 * Moves src into dest(Complete copy) and copies the memory buffer and
666 * its flags from dest into src ready for reuse without needing extra mallocs.
667 */
668static inline void swap_packets(libtrace_packet_t *dest, libtrace_packet_t *src) {
669        // Save the passed in buffer status
670        assert(dest->trace == NULL); // Must be a empty packet
671        void * temp_buf = dest->buffer;
672        buf_control_t temp_buf_control = dest->buf_control;
673        // Completely copy StoredPacket into packet
674        memcpy(dest, src, sizeof(libtrace_packet_t));
675        // Set the buffer settings on the returned packet
676        src->buffer = temp_buf;
677        src->buf_control = temp_buf_control;
678        src->trace = NULL;
679}
680
681/**
682 * @brief Move NULLs to the end of an array.
683 * @param values
684 * @param len
685 * @return The location the first NULL, aka the number of non NULL elements
686 */
687static inline size_t move_nulls_back(void *arr[], size_t len) {
688        size_t fr=0, en = len-1;
689        // Shift all non NULL elements to the front of the array, and NULLs to the
690        // end, traverses every element at most once
691        for (;fr < en; ++fr) {
692                if (arr[fr] == NULL) {
693                        for (;en > fr; --en) {
694                                if(arr[en]) {
695                                        arr[fr] = arr[en];
696                                        arr[en] = NULL;
697                                        break;
698                                }
699                        }
700                }
701        }
702        // This is the index of the first NULL
703        en = MIN(fr, en);
704        // Or the end of the array if this special case
705        if (arr[en])
706                en++;
707        return en;
708}
709
710/** returns the number of packets successfully allocated in the final array
711 these will all be at the front of the array */
712inline static size_t fill_array_with_empty_packets(libtrace_t *libtrace, libtrace_packet_t *packets[], size_t nb_packets) {
713        size_t nb;
714        nb = move_nulls_back((void **) packets, nb_packets);
715        mem_hits.read.recycled += nb;
716        nb += libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &packets[nb], nb_packets - nb, nb_packets - nb);
717        assert(nb_packets == nb);
718        return nb;
719}
720
721
722inline static size_t empty_array_of_packets(libtrace_t *libtrace, libtrace_packet_t *packets[], size_t nb_packets) {
723        size_t nb;
724        nb = move_nulls_back((void **) packets, nb_packets);
725        mem_hits.write.recycled += nb_packets - nb;
726        nb += nb_packets - libtrace_ocache_free(&libtrace->packet_freelist, (void **)packets, nb, nb);
727        memset(packets, 0, nb); // XXX make better, maybe do this in ocache??
728        return nb;
729}
730
731/* Our simplest case when a thread becomes ready it can obtain an exclusive
732 * lock to read packets from the underlying trace.
733 */
734inline static size_t trace_pread_packet_first_in_first_served(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packets[], size_t nb_packets)
735{
736        size_t i = 0;
737        bool tick_hit = false;
738
739        nb_packets = fill_array_with_empty_packets(libtrace, packets, nb_packets);
740
741        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
742        /* Read nb_packets */
743        for (i = 0; i < nb_packets; ++i) {
744                packets[i]->error = trace_read_packet(libtrace, packets[i]);
745                if (packets[i]->error <= 0) {
746                        ++i;
747                        break;
748                }
749                /*
750                if (libtrace->config.tick_count && trace_packet_get_order(packets[i]) % libtrace->config.tick_count == 0) {
751                        tick_hit = true;
752                }*/
753        }
754        // Doing this inside the lock ensures the first packet is always
755        // recorded first
756        if (packets[0]->error > 0) {
757                store_first_packet(libtrace, packets[0], t);
758        }
759        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
760        /* XXX TODO this needs to be inband with packets, or we don't bother in this case
761        if (tick_hit) {
762                libtrace_message_t tick;
763                tick.additional.uint64 = trace_packet_get_order(packets[i]);
764                tick.code = MESSAGE_TICK;
765                trace_send_message_to_perpkts(libtrace, &tick);
766        } */
767        return i;
768}
769
770/**
771 * For the case that we have a dedicated hasher thread
772 * 1. We read a packet from our buffer
773 * 2. Move that into the packet provided (packet)
774 */
775inline static size_t trace_pread_packet_hasher_thread(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packets, size_t nb_packets)
776{
777        size_t i;
778
779        // Always grab at least one
780        if (packets[0]) // Recycle the old get the new
781                libtrace_ocache_free(&libtrace->packet_freelist, (void **) packets, 1, 1);
782        packets[0] = libtrace_ringbuffer_read(&t->rbuffer);
783
784        if (packets[0]->error < 0)
785                return 1;
786
787        for (i = 1; i < nb_packets; i++) {
788                if (packets[i]) // Recycle the old get the new
789                        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packets[i], 1, 1);
790                if (!libtrace_ringbuffer_try_read(&t->rbuffer, (void **) &packets[i])) {
791                        packets[i] = NULL;
792                        break;
793                }
794                // These are typically urgent
795                if (packets[i]->error < 0)
796                        break;
797        }
798       
799        return i;
800}
801
802/**
803 * Tries to read from our queue and returns 1 if a packet was retrieved
804 */
805static inline int try_waiting_queue(libtrace_t *libtrace, libtrace_thread_t * t, libtrace_packet_t **packet, int * ret)
806{
807        libtrace_packet_t* retrived_packet;
808
809        /* Lets see if we have one waiting */
810        if (libtrace_ringbuffer_try_read(&t->rbuffer, (void **) &retrived_packet)) {
811                /* Copy paste from trace_pread_packet_hasher_thread() except that we try read (non-blocking) */
812                assert(retrived_packet);
813
814                if (*packet) // Recycle the old get the new
815                        libtrace_ocache_free(&libtrace->packet_freelist, (void **) packet, 1, 1);
816                *packet = retrived_packet;
817                *ret = (*packet)->error;
818                return 1;
819        }
820        return 0;
821}
822
823/**
824 * Allows us to ensure all threads are finished writing to our threads ring_buffer
825 * before returning EOF/error.
826 */
827inline static int trace_handle_finishing_perpkt(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t)
828{
829        /* We are waiting for the condition that another thread ends to check
830         * our queue for new data, once all threads end we can go to finished */
831        bool complete = false;
832        int ret;
833
834        do {
835                // Wait for a thread to end
836                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
837
838                // Check before
839                if (libtrace->perpkt_thread_states[THREAD_FINISHING] == libtrace->perpkt_thread_count) {
840                        complete = true;
841                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
842                        continue;
843                }
844
845                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
846
847                // Check after
848                if (libtrace->perpkt_thread_states[THREAD_FINISHING] == libtrace->perpkt_thread_count) {
849                        complete = true;
850                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
851                        continue;
852                }
853
854                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
855
856                // Always trying to keep our buffer empty for the unlikely case more threads than buffer space want to write into our queue
857                if(try_waiting_queue(libtrace, t, packet, &ret))
858                        return ret;
859        } while (!complete);
860
861        // We can only end up here once all threads complete
862        try_waiting_queue(libtrace, t, packet, &ret);
863
864        return ret;
865        // TODO rethink this logic fix bug here
866}
867
868/**
869 * Expects the libtrace_lock to not be held
870 */
871inline static int trace_finish_perpkt(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t)
872{
873        thread_change_state(libtrace, t, THREAD_FINISHING, true);
874        return trace_handle_finishing_perpkt(libtrace, packet, t);
875}
876
877/**
878 * This case is much like the dedicated hasher, except that we will become
879 * hasher if we don't have a a packet waiting.
880 *
881 * Note: This is only every used if we have are doing hashing.
882 *
883 * TODO: Can block on zero copy formats such as ring: and dpdk: if the
884 * queue sizes in total are larger than the ring size.
885 *
886 * 1. We read a packet from our buffer
887 * 2. Move that into the packet provided (packet)
888 */
889inline static int trace_pread_packet_hash_locked(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packet)
890{
891        int thread, ret/*, psize*/;
892
893        while (1) {
894                if(try_waiting_queue(libtrace, t, packet, &ret))
895                        return ret;
896                // Can still block here if another thread is writing to a full queue
897                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
898
899                // Its impossible for our own queue to overfill, because no one can write
900                // when we are in the lock
901                if(try_waiting_queue(libtrace, t, packet, &ret)) {
902                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
903                        return ret;
904                }
905
906                // Another thread cannot write a packet because a queue has filled up. Is it ours?
907                if (libtrace->perpkt_queue_full) {
908                        contention_stats[t->perpkt_num].wait_for_fill_complete_hits++;
909                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
910                        continue;
911                }
912
913                if (!*packet)
914                        libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) packet, 1, 1);
915                assert(*packet);
916
917                // If we fail here we can guarantee that our queue is empty (and no new data will be added because we hold the lock)
918                if (libtrace_halt || ((*packet)->error = trace_read_packet(libtrace, *packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
919                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
920                        if (libtrace_halt)
921                                return 0;
922                        else
923                                return (*packet)->error;
924                }
925
926                trace_packet_set_hash(*packet, (*libtrace->hasher)(*packet, libtrace->hasher_data));
927                thread = trace_packet_get_hash(*packet) % libtrace->perpkt_thread_count;
928                if (thread == t->perpkt_num) {
929                        // If it's this thread we must be in order because we checked the buffer once we got the lock
930                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
931                        return (*packet)->error;
932                }
933
934                if (libtrace->perpkt_threads[thread].state != THREAD_FINISHED) {
935                        while (!libtrace_ringbuffer_try_swrite_bl(&libtrace->perpkt_threads[thread].rbuffer, *packet)) {
936                                libtrace->perpkt_queue_full = true;
937                                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
938                                contention_stats[t->perpkt_num].full_queue_hits++;
939                                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
940                        }
941                        *packet = NULL;
942                        libtrace->perpkt_queue_full = false;
943                } else {
944                        /* We can get here if the user closes the thread before natural completion/or error */
945                        assert (!"packet_hash_locked() The user terminated the trace in a abnormal manner");
946                }
947                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
948        }
949}
950
951/**
952 * This case is much like the dedicated hasher, except that we will become
953 * hasher if we don't have a packet waiting.
954 *
955 * TODO: You can lose the tail of a trace if the final thread
956 * fills its own queue and therefore breaks early and doesn't empty the sliding window.
957 *
958 * TODO: Can block on zero copy formats such as ring: and dpdk: if the
959 * queue sizes in total are larger than the ring size.
960 *
961 * 1. We read a packet from our buffer
962 * 2. Move that into the packet provided (packet)
963 */
964inline static int trace_pread_packet_sliding_window(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packet)
965{
966        int ret, i, thread/*, psize*/;
967
968        if (t->state == THREAD_FINISHING)
969                return trace_handle_finishing_perpkt(libtrace, packet, t);
970
971        while (1) {
972                // Check if we have packets ready
973                if(try_waiting_queue(libtrace, t, packet, &ret))
974                        return ret;
975
976                // We limit the number of packets we get to the size of the sliding window
977                // such that it is impossible for any given thread to fail to store a packet
978                ASSERT_RET(sem_wait(&libtrace->sem), == 0);
979                /*~~~~Single threaded read of a packet~~~~*/
980                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
981
982                /* Re-check our queue things we might have data waiting */
983                if(try_waiting_queue(libtrace, t, packet, &ret)) {
984                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
985                        ASSERT_RET(sem_post(&libtrace->sem), == 0);
986                        return ret;
987                }
988
989                // TODO put on *proper* condition variable
990                if (libtrace->perpkt_queue_full) {
991                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
992                        ASSERT_RET(sem_post(&libtrace->sem), == 0);
993                        contention_stats[t->perpkt_num].wait_for_fill_complete_hits++;
994                        continue;
995                }
996
997                if (!*packet)
998                        libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) packet, 1, 1);
999                assert(*packet);
1000
1001                if (libtrace_halt || ((*packet)->error = trace_read_packet(libtrace, *packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
1002                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1003                        ASSERT_RET(sem_post(&libtrace->sem), == 0);
1004                        // Finish this thread ensuring that any data written later by another thread is retrieved also
1005                        if (libtrace_halt)
1006                                return 0;
1007                        else
1008                                return trace_finish_perpkt(libtrace, packet, t);
1009                }
1010                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1011
1012                /* ~~~~Multiple threads can run the hasher~~~~ */
1013                trace_packet_set_hash(*packet, (*libtrace->hasher)(*packet, libtrace->hasher_data));
1014
1015                /* Yes this is correct opposite read lock for a write operation */
1016                ASSERT_RET(pthread_rwlock_rdlock(&libtrace->window_lock), == 0);
1017                if (!libtrace_slidingwindow_try_write(&libtrace->sliding_window, trace_packet_get_order(*packet), *packet))
1018                        assert(!"Semaphore should stop us from ever overfilling the sliding window");
1019                ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0);
1020                *packet = NULL;
1021
1022                // Always try read any data from the sliding window
1023                while (libtrace_slidingwindow_read_ready(&libtrace->sliding_window)) {
1024                        ASSERT_RET(pthread_rwlock_wrlock(&libtrace->window_lock), == 0);
1025                        if (libtrace->perpkt_queue_full) {
1026                                // I might be the holdup in which case if I can read my queue I should do that and return
1027                                if(try_waiting_queue(libtrace, t, packet, &ret)) {
1028                                        ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0);
1029                                        return ret;
1030                                }
1031                                ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0);
1032                                continue;
1033                        }
1034                        // Read greedily as many as we can
1035                        while (libtrace_slidingwindow_try_read(&libtrace->sliding_window, (void **) packet, NULL)) {
1036                                thread = trace_packet_get_hash(*packet) % libtrace->perpkt_thread_count;
1037                                if (libtrace->perpkt_threads[thread].state != THREAD_FINISHED) {
1038                                        while (!libtrace_ringbuffer_try_swrite_bl(&libtrace->perpkt_threads[thread].rbuffer, *packet)) {
1039                                                if (t->perpkt_num == thread)
1040                                                {
1041                                                        // TODO think about this case more because we have to stop early if this were to happen on the last read
1042                                                        // before EOF/error we might not have emptied the sliding window
1043                                                        printf("!~!~!~!~!~!~In this Code~!~!~!~!\n");
1044                                                        // Its our queue we must have a packet to read out
1045                                                        if(try_waiting_queue(libtrace, t, packet, &ret)) {
1046                                                                // We must be able to write this now 100% without fail
1047                                                                libtrace_ringbuffer_write(&libtrace->perpkt_threads[thread].rbuffer, *packet);
1048                                                                ASSERT_RET(sem_post(&libtrace->sem), == 0);
1049                                                                ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0);
1050                                                                return ret;
1051                                                        } else {
1052                                                                assert(!"Our queue is full but I cannot read from it??");
1053                                                        }
1054                                                }
1055                                                // Not us we have to give the other threads a chance to write there packets then
1056                                                libtrace->perpkt_queue_full = true;
1057                                                ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0);
1058                                                for (i = 0; i < libtrace->perpkt_thread_count-1; i++) // Release all other threads to read there packets
1059                                                        ASSERT_RET(sem_post(&libtrace->sem), == 0);
1060
1061                                                contention_stats[t->perpkt_num].full_queue_hits++;
1062                                                ASSERT_RET(pthread_rwlock_wrlock(&libtrace->window_lock), == 0);
1063                                                // Grab these back
1064                                                for (i = 0; i < libtrace->perpkt_thread_count-1; i++) // Release all other threads to read there packets
1065                                                        ASSERT_RET(sem_wait(&libtrace->sem), == 0);
1066                                                libtrace->perpkt_queue_full = false;
1067                                        }
1068                                        ASSERT_RET(sem_post(&libtrace->sem), == 0);
1069                                        *packet = NULL;
1070                                } else {
1071                                        // Cannot write to a queue if no ones waiting (I think this is unreachable)
1072                                        // in the general case (unless the user ends early without proper clean up).
1073                                        assert (!"unreachable code??");
1074                                }
1075                        }
1076                        ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0);
1077                }
1078                // Now we go back to checking our queue anyways
1079        }
1080}
1081
1082
1083/**
1084 * For the first packet of each queue we keep a copy and note the system
1085 * time it was received at.
1086 *
1087 * This is used for finding the first packet when playing back a trace
1088 * in trace time. And can be used by real time applications to print
1089 * results out every XXX seconds.
1090 */
1091void store_first_packet(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t)
1092{
1093        if (!t->recorded_first) {
1094                struct timeval tv;
1095                libtrace_packet_t * dup;
1096                // For what it's worth we can call these outside of the lock
1097                gettimeofday(&tv, NULL);
1098                dup = trace_copy_packet(packet);
1099                ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
1100                libtrace->first_packets.packets[t->perpkt_num].packet = dup;
1101                //printf("Stored first packet time=%f\n", trace_get_seconds(dup));
1102                memcpy(&libtrace->first_packets.packets[t->perpkt_num].tv, &tv, sizeof(tv));
1103                // Now update the first
1104                libtrace->first_packets.count++;
1105                if (libtrace->first_packets.count == 1) {
1106                        // We the first entry hence also the first known packet
1107                        libtrace->first_packets.first = t->perpkt_num;
1108                } else {
1109                        // Check if we are newer than the previous 'first' packet
1110                        size_t first = libtrace->first_packets.first;
1111                        if (trace_get_seconds(dup) <
1112                                trace_get_seconds(libtrace->first_packets.packets[first].packet))
1113                                libtrace->first_packets.first = t->perpkt_num;
1114                }
1115                ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
1116                libtrace_message_t mesg = {0};
1117                mesg.code = MESSAGE_FIRST_PACKET;
1118                trace_send_message_to_reporter(libtrace, &mesg);
1119                t->recorded_first = true;
1120        }
1121}
1122
1123/**
1124 * Returns 1 if it's certain that the first packet is truly the first packet
1125 * rather than a best guess based upon threads that have published so far.
1126 * Otherwise 0 is returned.
1127 * It's recommended that this result is stored rather than calling this
1128 * function again.
1129 */
1130DLLEXPORT int retrive_first_packet(libtrace_t *libtrace, libtrace_packet_t **packet, struct timeval **tv)
1131{
1132        int ret = 0;
1133        ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
1134        if (libtrace->first_packets.count) {
1135                *packet = libtrace->first_packets.packets[libtrace->first_packets.first].packet;
1136                *tv = &libtrace->first_packets.packets[libtrace->first_packets.first].tv;
1137                if (libtrace->first_packets.count == (size_t) libtrace->perpkt_thread_count) {
1138                        ret = 1;
1139                } else {
1140                        struct timeval curr_tv;
1141                        // If a second has passed since the first entry we will assume this is the very first packet
1142                        gettimeofday(&curr_tv, NULL);
1143                        if (curr_tv.tv_sec > (*tv)->tv_sec) {
1144                                if(curr_tv.tv_usec > (*tv)->tv_usec || curr_tv.tv_sec - (*tv)->tv_sec > 1) {
1145                                        ret = 1;
1146                                }
1147                        }
1148                }
1149        } else {
1150                *packet = NULL;
1151                *tv = NULL;
1152        }
1153        ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
1154        return ret;
1155}
1156
1157
1158DLLEXPORT uint64_t tv_to_usec(struct timeval *tv)
1159{
1160        return (uint64_t) tv->tv_sec*1000000ull + (uint64_t) tv->tv_usec;
1161}
1162
1163inline static struct timeval usec_to_tv(uint64_t usec)
1164{
1165        struct timeval tv;
1166        tv.tv_sec = usec / 1000000;
1167        tv.tv_usec = usec % 1000000;
1168        return tv;
1169}
1170
1171/** Similar to delay_tracetime but send messages to all threads periodically */
1172static void* reporter_entry(void *data) {
1173        libtrace_message_t message = {0};
1174        libtrace_t *trace = (libtrace_t *)data;
1175        libtrace_thread_t *t = &trace->reporter_thread;
1176        size_t res_size;
1177        libtrace_vector_t results;
1178        libtrace_vector_init(&results, sizeof(libtrace_result_t));
1179        fprintf(stderr, "Reporter thread starting\n");
1180        libtrace_result_t result;
1181        size_t i;
1182
1183        message.code = MESSAGE_STARTING;
1184        message.sender = t;
1185        (*trace->reporter)(trace, NULL, &message);
1186        message.code = MESSAGE_RESUMING;
1187        (*trace->reporter)(trace, NULL, &message);
1188
1189        while (!trace_finished(trace)) {
1190                if (trace->config.reporter_polling) {
1191                        if (libtrace_message_queue_try_get(&t->messages, &message) == LIBTRACE_MQ_FAILED)
1192                                message.code = MESSAGE_POST_REPORTER;
1193                } else {
1194                        libtrace_message_queue_get(&t->messages, &message);
1195                }
1196                switch (message.code) {
1197                        // Check for results
1198                        case MESSAGE_POST_REPORTER:
1199                                res_size = trace_get_results(trace, &results);
1200                                for (i = 0; i < res_size; i++) {
1201                                        ASSERT_RET(libtrace_vector_get(&results, i, (void *) &result), == 1);
1202                                        (*trace->reporter)(trace, &result, NULL);
1203                                }
1204                                break;
1205                        case MESSAGE_DO_PAUSE:
1206                                message.code = MESSAGE_PAUSING;
1207                                message.sender = t;
1208                                (*trace->reporter)(trace, NULL, &message);
1209                                trace_thread_pause(trace, t);
1210                                message.code = MESSAGE_RESUMING;
1211                                (*trace->reporter)(trace, NULL, &message);
1212                                break;
1213                        default:
1214                                (*trace->reporter)(trace, NULL, &message);
1215                }
1216        }
1217
1218        // Flush out whats left now all our threads have finished
1219        res_size = trace_get_results(trace, &results);
1220        for (i = 0; i < res_size; i++) {
1221                ASSERT_RET(libtrace_vector_get(&results, i, (void *) &result), == 1);
1222                (*trace->reporter)(trace, &result, NULL);
1223        }
1224        libtrace_vector_destroy(&results);
1225
1226        // GOODBYE
1227        message.code = MESSAGE_PAUSING;
1228        message.sender = t;
1229        (*trace->reporter)(trace, NULL, &message);
1230        message.code = MESSAGE_STOPPING;
1231        (*trace->reporter)(trace, NULL, &message);
1232
1233        thread_change_state(trace, &trace->reporter_thread, THREAD_FINISHED, true);
1234        print_memory_stats();
1235        return NULL;
1236}
1237
1238/** Similar to delay_tracetime but send messages to all threads periodically */
1239static void* keepalive_entry(void *data) {
1240        struct timeval prev, next;
1241        libtrace_message_t message = {0};
1242        libtrace_t *trace = (libtrace_t *)data;
1243        uint64_t next_release;
1244        fprintf(stderr, "keepalive thread is starting\n");
1245
1246        gettimeofday(&prev, NULL);
1247        message.code = MESSAGE_TICK;
1248        while (trace->state != STATE_FINSHED) {
1249                fd_set rfds;
1250                next_release = tv_to_usec(&prev) + (trace->config.tick_interval * 1000);
1251                gettimeofday(&next, NULL);
1252                if (next_release > tv_to_usec(&next)) {
1253                        next = usec_to_tv(next_release - tv_to_usec(&next));
1254                        // Wait for timeout or a message
1255                        FD_ZERO(&rfds);
1256                FD_SET(libtrace_message_queue_get_fd(&trace->keepalive_thread.messages), &rfds);
1257                        if (select(libtrace_message_queue_get_fd(&trace->keepalive_thread.messages)+1, &rfds, NULL, NULL, &next) == 1) {
1258                                libtrace_message_t msg;
1259                                libtrace_message_queue_get(&trace->keepalive_thread.messages, &msg);
1260                                assert(msg.code == MESSAGE_DO_STOP);
1261                                goto done;
1262                        }
1263                }
1264                prev = usec_to_tv(next_release);
1265                if (trace->state == STATE_RUNNING) {
1266                        message.additional.uint64 = tv_to_usec(&prev);
1267                        trace_send_message_to_perpkts(trace, &message);
1268                }
1269        }
1270done:
1271
1272        thread_change_state(trace, &trace->keepalive_thread, THREAD_FINISHED, true);
1273        return NULL;
1274}
1275
1276/**
1277 * Delays a packets playback so the playback will be in trace time
1278 */
1279static inline void delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t) {
1280        struct timeval curr_tv, pkt_tv;
1281        uint64_t next_release = t->tracetime_offset_usec; // Time at which to release the packet
1282        uint64_t curr_usec;
1283        /* Tracetime we might delay releasing this packet */
1284        if (!t->tracetime_offset_usec) {
1285                libtrace_packet_t * first_pkt;
1286                struct timeval *sys_tv;
1287                int64_t initial_offset;
1288                int stable = retrive_first_packet(libtrace, &first_pkt, &sys_tv);
1289                assert(first_pkt);
1290                pkt_tv = trace_get_timeval(first_pkt);
1291                initial_offset = (int64_t)tv_to_usec(sys_tv) - (int64_t)tv_to_usec(&pkt_tv);
1292                if (stable)
1293                        // 0->1 because 0 is used to mean unset
1294                        t->tracetime_offset_usec = initial_offset ? initial_offset: 1;
1295                next_release = initial_offset;
1296        }
1297        /* next_release == offset */
1298        pkt_tv = trace_get_timeval(packet);
1299        next_release += tv_to_usec(&pkt_tv);
1300        gettimeofday(&curr_tv, NULL);
1301        curr_usec = tv_to_usec(&curr_tv);
1302        if (next_release > curr_usec) {
1303                // We need to wait
1304                struct timeval delay_tv = usec_to_tv(next_release-curr_usec);
1305                //printf("WAITING for %d.%d next=%"PRIu64" curr=%"PRIu64" seconds packettime %f\n", delay_tv.tv_sec, delay_tv.tv_usec, next_release, curr_usec, trace_get_seconds(packet));
1306                select(0, NULL, NULL, NULL, &delay_tv);
1307        }
1308}
1309
1310/* Read one packet from the trace into a buffer. Note that this function will
1311 * block until a packet is read (or EOF is reached).
1312 *
1313 * @param libtrace      the libtrace opaque pointer
1314 * @param packet        the packet opaque pointer
1315 * @returns 0 on EOF, negative value on error
1316 *
1317 * Note this is identical to read_packet but calls pread_packet instead of
1318 * read packet in the format.
1319 *
1320 */
1321static inline int trace_pread_packet_wrapper(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packet) {
1322
1323        assert(libtrace && "You called trace_read_packet() with a NULL libtrace parameter!\n");
1324        if (trace_is_err(libtrace))
1325                return -1;
1326        if (!libtrace->started) {
1327                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,"You must call libtrace_start() before trace_read_packet()\n");
1328                return -1;
1329        }
1330        if (!(packet->buf_control==TRACE_CTRL_PACKET || packet->buf_control==TRACE_CTRL_EXTERNAL)) {
1331                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,"Packet passed to trace_read_packet() is invalid\n");
1332                return -1;
1333        }
1334        assert(packet);
1335
1336        if (libtrace->format->read_packet) {
1337                do {
1338                        size_t ret;
1339                        /* Finalise the packet, freeing any resources the format module
1340                         * may have allocated it and zeroing all data associated with it.
1341                         */
1342                        trace_fin_packet(packet);
1343                        /* Store the trace we are reading from into the packet opaque
1344                         * structure */
1345                        packet->trace = libtrace;
1346                        ret=libtrace->format->pread_packet(libtrace, t, packet);
1347                        if (ret <= 0) {
1348                                return ret;
1349                        }
1350                        if (libtrace->filter) {
1351                                /* If the filter doesn't match, read another
1352                                 * packet
1353                                 */
1354                                if (!trace_apply_filter(libtrace->filter,packet)){
1355                                        ++libtrace->filtered_packets;
1356                                        continue;
1357                                }
1358                        }
1359                        if (libtrace->snaplen>0) {
1360                                /* Snap the packet */
1361                                trace_set_capture_length(packet,
1362                                                libtrace->snaplen);
1363                        }
1364                       
1365                        ++t->accepted_packets;
1366                        // TODO look into this better
1367                        trace_packet_set_order(packet, trace_get_erf_timestamp(packet));
1368                        //trace_packet_set_order(packet, libtrace->accepted_packets);
1369                        //++libtrace->accepted_packets;
1370                        return ret;
1371                } while(1);
1372        }
1373        trace_set_err(libtrace,TRACE_ERR_UNSUPPORTED,"This format does not support reading packets\n");
1374        return ~0U;
1375}
1376
1377/**
1378 * Read packets from the parallel trace
1379 * @return the number of packets read, null packets indicate messages. Check packet->error before
1380 * assuming a packet is valid.
1381 */
1382static size_t trace_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packets[], size_t nb_packets)
1383{
1384        size_t ret;
1385        size_t i;
1386        assert(nb_packets);
1387
1388        for (i = 0; i < nb_packets; i++) {
1389                // Cleanup the packet passed back
1390                if (packets[i])
1391                        trace_fin_packet(packets[i]);
1392        }
1393
1394        if (trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
1395                if (!packets[0])
1396                        libtrace_ocache_alloc(&libtrace->packet_freelist, (void **)packets, 1, 1);
1397                packets[0]->error = trace_pread_packet_wrapper(libtrace, t, *packets);
1398                ret = 1;
1399        } else if (trace_has_dedicated_hasher(libtrace)) {
1400                ret = trace_pread_packet_hasher_thread(libtrace, t, packets, nb_packets);
1401        } else if (!trace_has_dedicated_hasher(libtrace)) {
1402                /* We don't care about which core a packet goes to */
1403                ret = trace_pread_packet_first_in_first_served(libtrace, t, packets, nb_packets);
1404        } /* else {
1405                ret = trace_pread_packet_hash_locked(libtrace, packet);
1406        }*/
1407
1408        // Formats can also optionally do this internally to ensure the first
1409        // packet is always reported correctly
1410        assert(ret);
1411        assert(ret <= nb_packets);
1412        if (packets[0]->error > 0) {
1413                store_first_packet(libtrace, packets[0], t);
1414                if (libtrace->tracetime)
1415                        delay_tracetime(libtrace, packets[0], t);
1416        }
1417
1418        return ret;
1419}
1420
1421/* Starts perpkt threads
1422 * @return threads_started
1423 */
1424static inline int trace_start_perpkt_threads (libtrace_t *libtrace) {
1425        int i;
1426        char name[16];
1427        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1428                libtrace_thread_t *t = &libtrace->perpkt_threads[i];
1429                ASSERT_RET(pthread_create(&t->tid, NULL, perpkt_threads_entry, (void *) libtrace), == 0);
1430                snprintf(name, 16, "perpkt-%d", i);
1431                pthread_setname_np(t->tid, name);
1432        }
1433        return libtrace->perpkt_thread_count;
1434}
1435
1436/* Start an input trace in a parallel fashion, or restart a paused trace.
1437 *
1438 * NOTE: libtrace lock is held for the majority of this function
1439 *
1440 * @param libtrace the input trace to start
1441 * @param global_blob some global data you can share with the new perpkt threads
1442 * @returns 0 on success
1443 */
1444DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob, fn_per_pkt per_pkt, fn_reporter reporter)
1445{
1446        int i;
1447        char name[16];
1448        sigset_t sig_before, sig_block_all;
1449        assert(libtrace);
1450        if (trace_is_err(libtrace)) {
1451                return -1;
1452        }
1453       
1454        // NOTE: Until the trace is started we wont have a libtrace_lock initialised
1455        if (libtrace->state != STATE_NEW) {
1456                int err = 0;
1457                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1458                if (libtrace->state != STATE_PAUSED) {
1459                        trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
1460                                "The trace(%s) has already been started and is not paused!!", libtrace->uridata);
1461                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1462                        return -1;
1463                }
1464               
1465                // Update the per_pkt function, or reuse the old one
1466                if (per_pkt)
1467                        libtrace->per_pkt = per_pkt;
1468
1469                if (reporter)
1470                        libtrace->reporter = reporter;
1471
1472                assert(libtrace_parallel);
1473                assert(!libtrace->perpkt_thread_states[THREAD_RUNNING]);
1474                assert(libtrace->per_pkt);
1475               
1476                if (libtrace->perpkt_thread_count > 1 && trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
1477                        fprintf(stderr, "Restarting trace pstart_input()\n");
1478                        err = libtrace->format->pstart_input(libtrace);
1479                } else {
1480                        if (libtrace->format->start_input) {
1481                                fprintf(stderr, "Restarting trace start_input()\n");
1482                                err = libtrace->format->start_input(libtrace);
1483                        }
1484                }
1485               
1486                if (err == 0) {
1487                        libtrace->started = true;
1488                        libtrace_change_state(libtrace, STATE_RUNNING, false);
1489                }
1490                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1491                return err;
1492        }
1493
1494        assert(libtrace->state == STATE_NEW);
1495        libtrace_parallel = 1;
1496
1497        // Store the user defined things against the trace
1498        libtrace->global_blob = global_blob;
1499        libtrace->per_pkt = per_pkt;
1500        libtrace->reporter = reporter;
1501
1502        ASSERT_RET(pthread_mutex_init(&libtrace->libtrace_lock, NULL), == 0);
1503        ASSERT_RET(pthread_cond_init(&libtrace->perpkt_cond, NULL), == 0);
1504        ASSERT_RET(pthread_rwlock_init(&libtrace->window_lock, NULL), == 0);
1505        // Grab the lock
1506        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1507
1508        // Set default buffer sizes
1509        if (libtrace->config.hasher_queue_size <= 0)
1510                libtrace->config.hasher_queue_size = 1000;
1511
1512        if (libtrace->perpkt_thread_count <= 0) {
1513                // TODO add BSD support
1514                libtrace->perpkt_thread_count = sysconf(_SC_NPROCESSORS_ONLN);
1515                if (libtrace->perpkt_thread_count <= 0)
1516                        // Lets just use one
1517                        libtrace->perpkt_thread_count = 1;
1518        }
1519
1520        if (libtrace->config.reporter_thold <= 0)
1521                libtrace->config.reporter_thold = 100;
1522        if (libtrace->config.burst_size <= 0)
1523                libtrace->config.burst_size = 10;
1524        if (libtrace->config.packet_thread_cache_size <= 0)
1525                libtrace->config.packet_thread_cache_size = 20;
1526        if (libtrace->config.packet_cache_size <= 0)
1527                libtrace->config.packet_cache_size = (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count;
1528
1529        if (libtrace->config.packet_cache_size <
1530                (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count)
1531                fprintf(stderr, "WARNING deadlocks may occur and extra memory allocating buffer sizes (packet_freelist_size) mismatched\n");
1532
1533        libtrace->started = true; // Before we start the threads otherwise we could have issues
1534        libtrace_change_state(libtrace, STATE_RUNNING, false);
1535        /* Disable signals - Pthread signal handling */
1536
1537        sigemptyset(&sig_block_all);
1538
1539        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_block_all, &sig_before), == 0);
1540
1541        // If we are using a hasher start it
1542        // If single threaded we don't need a hasher
1543        if (libtrace->perpkt_thread_count > 1 && libtrace->hasher && libtrace->hasher_type != HASHER_HARDWARE) {
1544                libtrace_thread_t *t = &libtrace->hasher_thread;
1545                t->trace = libtrace;
1546                t->ret = NULL;
1547                t->type = THREAD_HASHER;
1548                t->state = THREAD_RUNNING;
1549                libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
1550                ASSERT_RET(pthread_create(&t->tid, NULL, hasher_entry, (void *) libtrace), == 0);
1551                snprintf(name, sizeof(name), "hasher-thread");
1552                pthread_setname_np(t->tid, name);
1553        } else {
1554                libtrace->hasher_thread.type = THREAD_EMPTY;
1555        }
1556
1557        libtrace_ocache_init(&libtrace->packet_freelist,
1558                                                 (void* (*)()) trace_create_packet,
1559                                                 (void (*)(void *))trace_destroy_packet,
1560                                                 libtrace->config.packet_thread_cache_size,
1561                                                 libtrace->config.packet_cache_size * 4,
1562                                                 libtrace->config.fixed_packet_count);
1563        // Unused slidingwindow code
1564        //libtrace_slidingwindow_init(&libtrace->sliding_window, libtrace->packet_freelist_size, 0);
1565        //ASSERT_RET(sem_init(&libtrace->sem, 0, libtrace->packet_freelist_size), == 0);
1566
1567        // This will be applied to every new thread that starts, i.e. they will block all signals
1568        // Lets start a fixed number of reading threads
1569
1570        /* Ready some storages */
1571        libtrace->first_packets.first = 0;
1572        libtrace->first_packets.count = 0;
1573        ASSERT_RET(pthread_spin_init(&libtrace->first_packets.lock, 0), == 0);
1574        libtrace->first_packets.packets = calloc(libtrace->perpkt_thread_count, sizeof(struct  __packet_storage_magic_type));
1575
1576
1577        /* Ready all of our perpkt threads - they are started later */
1578        libtrace->perpkt_threads = calloc(sizeof(libtrace_thread_t), libtrace->perpkt_thread_count);
1579        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1580                libtrace_thread_t *t = &libtrace->perpkt_threads[i];
1581                t->trace = libtrace;
1582                t->ret = NULL;
1583                t->type = THREAD_PERPKT;
1584                t->state = THREAD_RUNNING;
1585                t->user_data = NULL;
1586                // t->tid DONE on create
1587                t->perpkt_num = i;
1588                if (libtrace->hasher)
1589                        libtrace_ringbuffer_init(&t->rbuffer, libtrace->config.hasher_queue_size,
1590                                                 libtrace->config.hasher_polling?LIBTRACE_RINGBUFFER_POLLING:0);
1591                // Depending on the mode vector or deque might be chosen
1592                libtrace_vector_init(&t->vector, sizeof(libtrace_result_t));
1593                libtrace_deque_init(&t->deque, sizeof(libtrace_result_t));
1594                libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
1595                t->recorded_first = false;
1596                t->tracetime_offset_usec = 0;;
1597        }
1598
1599        int threads_started = 0;
1600        /* Setup the trace and start our threads */
1601        if (libtrace->perpkt_thread_count > 1 && trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
1602                printf("This format has direct support for p's\n");
1603                threads_started = libtrace->format->pstart_input(libtrace);
1604        } else {
1605                if (libtrace->format->start_input) {
1606                        threads_started=libtrace->format->start_input(libtrace);
1607                }
1608        }
1609        if (threads_started == 0)
1610                threads_started = trace_start_perpkt_threads(libtrace);
1611
1612        libtrace->reporter_thread.type = THREAD_REPORTER;
1613        libtrace->reporter_thread.state = THREAD_RUNNING;
1614        libtrace_message_queue_init(&libtrace->reporter_thread.messages, sizeof(libtrace_message_t));
1615        if (reporter) {
1616                // Got a real reporter
1617                ASSERT_RET(pthread_create(&libtrace->reporter_thread.tid, NULL, reporter_entry, (void *) libtrace), == 0);
1618        } else {
1619                // Main thread is reporter
1620                libtrace->reporter_thread.tid = pthread_self();
1621        }
1622
1623        if (libtrace->config.tick_interval > 0) {
1624                libtrace->keepalive_thread.type = THREAD_KEEPALIVE;
1625                libtrace->keepalive_thread.state = THREAD_RUNNING;
1626                libtrace_message_queue_init(&libtrace->keepalive_thread.messages, sizeof(libtrace_message_t));
1627                ASSERT_RET(pthread_create(&libtrace->keepalive_thread.tid, NULL, keepalive_entry, (void *) libtrace), == 0);
1628        }
1629
1630        for (i = 0; i < THREAD_STATE_MAX; ++i) {
1631                libtrace->perpkt_thread_states[i] = 0;
1632        }
1633        libtrace->perpkt_thread_states[THREAD_RUNNING] = threads_started;
1634
1635        // Revert back - Allow signals again
1636        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_before, NULL), == 0);
1637        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1638
1639        if (threads_started < 0)
1640                // Error
1641                return threads_started;
1642
1643        // TODO fix these leaks etc
1644        if (libtrace->perpkt_thread_count != threads_started)
1645                fprintf(stderr, "Warning started threads not equal requested s=%d r=%d", threads_started, libtrace->perpkt_thread_count);
1646
1647
1648        return 0;
1649}
1650
1651/**
1652 * Pauses a trace, this should only be called by the main thread
1653 * 1. Set started = false
1654 * 2. All perpkt threads are paused waiting on a condition var
1655 * 3. Then call ppause on the underlying format if found
1656 * 4. The traces state is paused
1657 *
1658 * Once done you should be able to modify the trace setup and call pstart again
1659 * TODO handle changing thread numbers
1660 */
1661DLLEXPORT int trace_ppause(libtrace_t *libtrace)
1662{
1663        libtrace_thread_t *t;
1664        int i;
1665        assert(libtrace);
1666       
1667        t = get_thread_table(libtrace);
1668        // Check state from within the lock if we are going to change it
1669        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1670        if (!libtrace->started || libtrace->state != STATE_RUNNING) {
1671                fprintf(stderr, "pause failed started=%d state=%s (%d)\n", libtrace->started, get_trace_state_name(libtrace->state), libtrace->state);
1672                trace_set_err(libtrace,TRACE_ERR_BAD_STATE, "You must call trace_start() before calling trace_ppause()");
1673                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1674                return -1;
1675        }
1676
1677        libtrace_change_state(libtrace, STATE_PAUSING, false);
1678        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1679
1680        // Special case handle the hasher thread case
1681        if (trace_has_dedicated_hasher(libtrace)) {
1682                if (libtrace->config.debug_state)
1683                        fprintf(stderr, "Hasher thread is running, asking it to pause ...");
1684                libtrace_message_t message = {0};
1685                message.code = MESSAGE_DO_PAUSE;
1686                trace_send_message_to_thread(libtrace, &libtrace->hasher_thread, &message);
1687                // Wait for it to pause
1688                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1689                while (libtrace->hasher_thread.state == THREAD_RUNNING) {
1690                        ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
1691                }
1692                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1693                if (libtrace->config.debug_state)
1694                        fprintf(stderr, " DONE\n");
1695        }
1696
1697        if (libtrace->config.debug_state)
1698                fprintf(stderr, "Asking perpkt threads to pause ...");
1699        // Stop threads, skip this one if it's a perpkt
1700        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1701                if (&libtrace->perpkt_threads[i] != t) {
1702                        libtrace_message_t message = {0};
1703                        message.code = MESSAGE_DO_PAUSE;
1704                        trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message);
1705                        if(trace_has_dedicated_hasher(libtrace)) {
1706                                // The hasher has stopped and other threads have messages waiting therefore
1707                                // If the queues are empty the other threads would have no data
1708                                // So send some message packets to simply ask the threads to check
1709                                // We are the only writer since hasher has paused
1710                                libtrace_packet_t *pkt;
1711                                libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &pkt, 1, 1);
1712                                pkt->error = READ_MESSAGE;
1713                                libtrace_ringbuffer_write(&libtrace->perpkt_threads[i].rbuffer, pkt);
1714                        }
1715                } else {
1716                        fprintf(stderr, "Mapper threads should not be used to pause a trace this could cause any number of problems!!\n");
1717                }
1718        }
1719
1720        if (t) {
1721                // A perpkt is doing the pausing, interesting, fake an extra thread paused
1722                // We rely on the user to *not* return before starting the trace again
1723                thread_change_state(libtrace, t, THREAD_PAUSED, true);
1724        }
1725
1726        // Wait for all threads to pause
1727        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1728        while(libtrace->perpkt_thread_states[THREAD_RUNNING]) {
1729                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
1730        }
1731        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1732
1733        if (libtrace->config.debug_state)
1734                fprintf(stderr, " DONE\n");
1735
1736        // Deal with the reporter
1737        if (trace_has_dedicated_reporter(libtrace)) {
1738                if (libtrace->config.debug_state)
1739                        fprintf(stderr, "Reporter thread is running, asking it to pause ...");
1740                libtrace_message_t message = {0};
1741                message.code = MESSAGE_DO_PAUSE;
1742                trace_send_message_to_thread(libtrace, &libtrace->reporter_thread, &message);
1743                // Wait for it to pause
1744                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1745                while (libtrace->reporter_thread.state == THREAD_RUNNING) {
1746                        ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
1747                }
1748                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1749                if (libtrace->config.debug_state)
1750                        fprintf(stderr, " DONE\n");
1751        }
1752
1753        if (trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
1754                uint64_t tmp_stats;
1755                libtrace->dropped_packets = trace_get_dropped_packets(libtrace);
1756                libtrace->received_packets = trace_get_received_packets(libtrace);
1757                if (libtrace->format->get_filtered_packets) {
1758                        if ((tmp_stats = libtrace->format->get_filtered_packets(libtrace)) != UINT64_MAX) {
1759                                libtrace->filtered_packets += tmp_stats;
1760                        }
1761                }
1762                libtrace->started = false;
1763                if (libtrace->format->ppause_input)
1764                        libtrace->format->ppause_input(libtrace);
1765                // TODO What happens if we don't have pause input??
1766        } else {
1767                int err;
1768                fprintf(stderr, "Trace is not parallel so we are doing a normal pause %s\n", libtrace->uridata);
1769                err = trace_pause(libtrace);
1770                // We should handle this a bit better
1771                if (err)
1772                        return err;
1773        }
1774
1775        // Only set as paused after the pause has been called on the trace
1776        libtrace_change_state(libtrace, STATE_PAUSED, true);
1777        return 0;
1778}
1779
1780/**
1781 * Stop trace finish prematurely as though it meet an EOF
1782 * This should only be called by the main thread
1783 * 1. Calls ppause
1784 * 2. Sends a message asking for threads to finish
1785 * 3. Releases threads which will pause
1786 */
1787DLLEXPORT int trace_pstop(libtrace_t *libtrace)
1788{
1789        int i, err;
1790        libtrace_message_t message = {0};
1791        assert(libtrace);
1792
1793        // Ensure all threads have paused and the underlying trace format has
1794        // been closed and all packets associated are cleaned up
1795        // Pause will do any state checks for us
1796        err = trace_ppause(libtrace);
1797        if (err)
1798                return err;
1799
1800        // Now send a message asking the threads to stop
1801        // This will be retrieved before trying to read another packet
1802       
1803        message.code = MESSAGE_DO_STOP;
1804        trace_send_message_to_perpkts(libtrace, &message);
1805        if (trace_has_dedicated_hasher(libtrace))
1806                trace_send_message_to_thread(libtrace, &libtrace->hasher_thread, &message);
1807       
1808        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1809                trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message);
1810        }
1811
1812        // Now release the threads and let them stop
1813        libtrace_change_state(libtrace, STATE_FINSHED, true);
1814        return 0;
1815}
1816
1817/**
1818 * Set the hasher type along with a selected function, if hardware supports
1819 * that generic type of hashing it will be used otherwise the supplied
1820 * hasher function will be used and passed data when called.
1821 *
1822 * @return 0 if successful otherwise -1 on error
1823 */
1824DLLEXPORT int trace_set_hasher(libtrace_t *trace, enum hasher_types type, fn_hasher hasher, void *data) {
1825        int ret = -1;
1826        if (type == HASHER_HARDWARE || (type == HASHER_CUSTOM && !hasher) || (type == HASHER_BALANCE && hasher)) {
1827                return -1;
1828        }
1829
1830        // Save the requirements
1831        trace->hasher_type = type;
1832        if (hasher) {
1833                trace->hasher = hasher;
1834                trace->hasher_data = data;
1835        } else {
1836                trace->hasher = NULL;
1837                // TODO consider how to handle freeing this
1838                trace->hasher_data = NULL;
1839        }
1840
1841        // Try push this to hardware - NOTE hardware could do custom if
1842        // there is a more efficient way to apply it, in this case
1843        // it will simply grab the function out of libtrace_t
1844        if (trace->format->pconfig_input)
1845                ret = trace->format->pconfig_input(trace, TRACE_OPTION_SET_HASHER, &type);
1846
1847        if (ret == -1) {
1848                // We have to deal with this ourself
1849                // This most likely means single threaded reading of the trace
1850                if (!hasher) {
1851                        switch (type)
1852                        {
1853                                case HASHER_CUSTOM:
1854                                case HASHER_BALANCE:
1855                                        return 0;
1856                                case HASHER_BIDIRECTIONAL:
1857                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
1858                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
1859                                        toeplitz_init_config(trace->hasher_data, 1);
1860                                        return 0;
1861                                case HASHER_UNIDIRECTIONAL:
1862                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
1863                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
1864                                        toeplitz_init_config(trace->hasher_data, 0);
1865                                        return 0;
1866                                case HASHER_HARDWARE:
1867                                        return -1;
1868                        }
1869                        return -1;
1870                }
1871        } else {
1872                // The hardware is dealing with this yay
1873                trace->hasher_type = HASHER_HARDWARE;
1874        }
1875
1876        return 0;
1877}
1878
1879// Waits for all threads to finish
1880DLLEXPORT void trace_join(libtrace_t *libtrace) {
1881        int i;
1882
1883        /* Firstly wait for the perpkt threads to finish, since these are
1884         * user controlled */
1885        for (i=0; i< libtrace->perpkt_thread_count; i++) {
1886                //printf("Waiting to join with perpkt #%d\n", i);
1887                ASSERT_RET(pthread_join(libtrace->perpkt_threads[i].tid, NULL), == 0);
1888                //printf("Joined with perpkt #%d\n", i);
1889                // So we must do our best effort to empty the queue - so
1890                // the producer (or any other threads) don't block.
1891                libtrace_packet_t * packet;
1892                assert(libtrace->perpkt_threads[i].state == THREAD_FINISHED);
1893                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
1894                        if (packet) // This could be NULL iff the perpkt finishes early
1895                                trace_destroy_packet(packet);
1896        }
1897
1898        /* Now the hasher */
1899        if (trace_has_dedicated_hasher(libtrace)) {
1900                pthread_join(libtrace->hasher_thread.tid, NULL);
1901                assert(libtrace->hasher_thread.state == THREAD_FINISHED);
1902        }
1903
1904        // Now that everything is finished nothing can be touching our
1905        // buffers so clean them up
1906        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1907                // Its possible 1 packet got added by the reporter (or 1 per any other thread) since we cleaned up
1908                // if they lost timeslice before-during a write
1909                libtrace_packet_t * packet;
1910                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
1911                        trace_destroy_packet(packet);
1912                if (libtrace->hasher) {
1913                        assert(libtrace_ringbuffer_is_empty(&libtrace->perpkt_threads[i].rbuffer));
1914                        libtrace_ringbuffer_destroy(&libtrace->perpkt_threads[i].rbuffer);
1915                }
1916                // Cannot destroy vector yet, this happens with trace_destroy
1917        }
1918        // TODO consider perpkt threads marking trace as finished before join is called
1919        libtrace_change_state(libtrace, STATE_FINSHED, true);
1920
1921        if (trace_has_dedicated_reporter(libtrace)) {
1922                pthread_join(libtrace->reporter_thread.tid, NULL);
1923                assert(libtrace->reporter_thread.state == THREAD_FINISHED);
1924        }
1925       
1926        // Wait for the tick (keepalive) thread if it has been started
1927        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
1928                libtrace_message_t msg = {0};
1929                msg.code = MESSAGE_DO_STOP;
1930                trace_send_message_to_thread(libtrace, &libtrace->keepalive_thread, &msg);
1931                pthread_join(libtrace->keepalive_thread.tid, NULL);
1932        }
1933       
1934        libtrace_change_state(libtrace, STATE_JOINED, true);
1935        print_memory_stats();
1936}
1937
1938DLLEXPORT int libtrace_thread_get_message_count(libtrace_t * libtrace)
1939{
1940        libtrace_thread_t * t = get_thread_descriptor(libtrace);
1941        assert(t);
1942        return libtrace_message_queue_count(&t->messages);
1943}
1944
1945DLLEXPORT int libtrace_thread_get_message(libtrace_t * libtrace, libtrace_message_t * message)
1946{
1947        libtrace_thread_t * t = get_thread_descriptor(libtrace);
1948        assert(t);
1949        return libtrace_message_queue_get(&t->messages, message);
1950}
1951
1952DLLEXPORT int libtrace_thread_try_get_message(libtrace_t * libtrace, libtrace_message_t * message)
1953{
1954        libtrace_thread_t * t = get_thread_descriptor(libtrace);
1955        assert(t);
1956        return libtrace_message_queue_try_get(&t->messages, message);
1957}
1958
1959/**
1960 * Return backlog indicator
1961 */
1962DLLEXPORT int trace_post_reporter(libtrace_t *libtrace)
1963{
1964        libtrace_message_t message = {0};
1965        message.code = MESSAGE_POST_REPORTER;
1966        message.sender = get_thread_descriptor(libtrace);
1967        return libtrace_message_queue_put(&libtrace->reporter_thread.messages, (void *) &message);
1968}
1969
1970/**
1971 * Return backlog indicator
1972 */
1973DLLEXPORT int trace_send_message_to_reporter(libtrace_t * libtrace, libtrace_message_t * message)
1974{
1975        //printf("Sending message code=%d to reporter\n", message->code);
1976        message->sender = get_thread_descriptor(libtrace);
1977        return libtrace_message_queue_put(&libtrace->reporter_thread.messages, message);
1978}
1979
1980/**
1981 *
1982 */
1983DLLEXPORT int trace_send_message_to_thread(libtrace_t * libtrace, libtrace_thread_t *t, libtrace_message_t * message)
1984{
1985        //printf("Sending message code=%d to reporter\n", message->code);
1986        message->sender = get_thread_descriptor(libtrace);
1987        return libtrace_message_queue_put(&t->messages, message);
1988}
1989
1990DLLEXPORT int trace_send_message_to_perpkts(libtrace_t * libtrace, libtrace_message_t * message)
1991{
1992        int i;
1993        message->sender = get_thread_descriptor(libtrace);
1994        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1995                libtrace_message_queue_put(&libtrace->perpkt_threads[i].messages, message);
1996        }
1997        //printf("Sending message code=%d to reporter\n", message->code);
1998        return 0;
1999}
2000
2001DLLEXPORT void libtrace_result_set_key(libtrace_result_t * result, uint64_t key) {
2002        result->key = key;
2003}
2004DLLEXPORT uint64_t libtrace_result_get_key(libtrace_result_t * result) {
2005        return result->key;
2006}
2007DLLEXPORT void libtrace_result_set_value(libtrace_result_t * result, void * value) {
2008        result->value = value;
2009}
2010DLLEXPORT void* libtrace_result_get_value(libtrace_result_t * result) {
2011        return result->value;
2012}
2013DLLEXPORT void libtrace_result_set_key_value(libtrace_result_t * result, uint64_t key, void * value) {
2014        result->key = key;
2015        result->value = value;
2016}
2017DLLEXPORT void trace_destroy_result(libtrace_result_t ** result) {
2018        free(*result);
2019        result = NULL;
2020        // TODO automatically back with a free list!!
2021}
2022
2023DLLEXPORT void * trace_get_global(libtrace_t *trace)
2024{
2025        return trace->global_blob;
2026}
2027
2028DLLEXPORT void * trace_set_global(libtrace_t *trace, void * data)
2029{
2030        if (trace->global_blob && trace->global_blob != data) {
2031                void * ret = trace->global_blob;
2032                trace->global_blob = data;
2033                return ret;
2034        } else {
2035                trace->global_blob = data;
2036                return NULL;
2037        }
2038}
2039
2040DLLEXPORT void * trace_get_tls(libtrace_thread_t *t)
2041{
2042        return t->user_data;
2043}
2044
2045DLLEXPORT void * trace_set_tls(libtrace_thread_t *t, void * data)
2046{
2047        if(t->user_data && t->user_data != data) {
2048                void *ret = t->user_data;
2049                t->user_data = data;
2050                return ret;
2051        } else {
2052                t->user_data = data;
2053                return NULL;
2054        }
2055}
2056
2057/**
2058 * Publish to the reduce queue, return
2059 * Should only be called by a perpkt thread, i.e. from a perpkt handler
2060 */
2061DLLEXPORT void trace_publish_result(libtrace_t *libtrace, libtrace_thread_t *t, uint64_t key, void * value, int type) {
2062        libtrace_result_t res;
2063        UNUSED static __thread int count = 0;
2064        res.type = type;
2065
2066        libtrace_result_set_key_value(&res, key, value);
2067        /*
2068        if (count == 1)
2069                printf("My vector size is %d\n", libtrace_vector_get_size(&t->vector));
2070        count = (count+1) %1000;
2071        libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :)
2072        */
2073        /*if (count == 1)
2074                printf("My vector size is %d\n", libtrace_deque_get_size(&t->deque));
2075        count = (count+1)%1000;*/
2076        if (libtrace->reporter_flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED)) {
2077                if (libtrace_deque_get_size(&t->deque) >= libtrace->config.reporter_thold) {
2078                        trace_post_reporter(libtrace);
2079                }
2080                //while (libtrace_deque_get_size(&t->deque) >= 1000)
2081                //      sched_yield();
2082                libtrace_deque_push_back(&t->deque, &res); // Automatically locking for us :)
2083        } else {
2084                //while (libtrace_vector_get_size(&t->vector) >= 1000)
2085                //      sched_yield();
2086
2087                if (libtrace_vector_get_size(&t->vector) >= libtrace->config.reporter_thold) {
2088                        trace_post_reporter(libtrace);
2089                }
2090                libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :)
2091        }
2092}
2093
2094static int compareres(const void* p1, const void* p2)
2095{
2096        if (libtrace_result_get_key((libtrace_result_t *) p1) < libtrace_result_get_key((libtrace_result_t *) p2))
2097                return -1;
2098        if (libtrace_result_get_key((libtrace_result_t *) p1) == libtrace_result_get_key((libtrace_result_t *) p2))
2099                return 0;
2100        else
2101                return 1;
2102}
2103
2104DLLEXPORT int trace_get_results(libtrace_t *libtrace, libtrace_vector_t * results) {
2105        int i;
2106        int flags = libtrace->reporter_flags; // Hint these aren't a changing
2107
2108        libtrace_vector_empty(results);
2109
2110        /* Here we assume queues are in order ascending order and they want
2111         * the smallest result first. If they are not in order the results
2112         * may not be in order.
2113         */
2114        if (flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED)) {
2115                int live_count = 0;
2116                bool live[libtrace->perpkt_thread_count]; // Set if a trace is alive
2117                uint64_t key[libtrace->perpkt_thread_count]; // Cached keys
2118                uint64_t min_key = UINT64_MAX; // XXX use max int here stdlimit.h?
2119                int min_queue = -1;
2120
2121                /* Loop through check all are alive (have data) and find the smallest */
2122                for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
2123                        libtrace_queue_t *v = &libtrace->perpkt_threads[i].deque;
2124                        if (libtrace_deque_get_size(v) != 0) {
2125                                libtrace_result_t r;
2126                                libtrace_deque_peek_front(v, (void *) &r);
2127                                live_count++;
2128                                live[i] = 1;
2129                                key[i] = libtrace_result_get_key(&r);
2130                                if (i==0 || min_key > key[i]) {
2131                                        min_key = key[i];
2132                                        min_queue = i;
2133                                }
2134                        } else {
2135                                live[i] = 0;
2136                        }
2137                }
2138
2139                /* Now remove the smallest and loop - special case if all threads have joined we always flush whats left */
2140                while ((live_count == libtrace->perpkt_thread_count) || (live_count &&
2141                                ((flags & REDUCE_SEQUENTIAL && min_key == libtrace->expected_key) ||
2142                                libtrace->state == STATE_JOINED))) {
2143                        /* Get the minimum queue and then do stuff */
2144                        libtrace_result_t r;
2145
2146                        assert (libtrace_deque_pop_front(&libtrace->perpkt_threads[min_queue].deque, (void *) &r) == 1);
2147                        libtrace_vector_push_back(results, &r);
2148
2149                        // We expect the key we read +1 now
2150                        libtrace->expected_key = key[min_queue] + 1;
2151
2152                        // Now update the one we just removed
2153                        if (libtrace_deque_get_size(&libtrace->perpkt_threads[min_queue].deque) )
2154                        {
2155                                libtrace_deque_peek_front(&libtrace->perpkt_threads[min_queue].deque, (void *) &r);
2156                                key[min_queue] = libtrace_result_get_key(&r);
2157                                if (key[min_queue] <= min_key) {
2158                                        // We are still the smallest, might be out of order though :(
2159                                        min_key = key[min_queue];
2160                                } else {
2161                                        min_key = key[min_queue]; // Update our minimum
2162                                        // Check all find the smallest again - all are alive
2163                                        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
2164                                                if (live[i] && min_key > key[i]) {
2165                                                        min_key = key[i];
2166                                                        min_queue = i;
2167                                                }
2168                                        }
2169                                }
2170                        } else {
2171                                live[min_queue] = 0;
2172                                live_count--;
2173                                min_key = UINT64_MAX; // Update our minimum
2174                                // Check all find the smallest again - all are alive
2175                                for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
2176                                        // Still not 100% TODO (what if order is wrong or not increasing)
2177                                        if (live[i] && min_key >= key[i]) {
2178                                                min_key = key[i];
2179                                                min_queue = i;
2180                                        }
2181                                }
2182                        }
2183                }
2184        } else { // Queues are not in order - return all results in the queue
2185                for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2186                        libtrace_vector_append(results, &libtrace->perpkt_threads[i].vector);
2187                }
2188                if (flags & REDUCE_SORT) {
2189                        qsort(results->elements, results->size, results->element_size, &compareres);
2190                }
2191        }
2192        return libtrace_vector_get_size(results);
2193}
2194
2195DLLEXPORT uint64_t trace_packet_get_order(libtrace_packet_t * packet) {
2196        return packet->order;
2197}
2198
2199DLLEXPORT uint64_t trace_packet_get_hash(libtrace_packet_t * packet) {
2200        return packet->hash;
2201}
2202
2203DLLEXPORT void trace_packet_set_order(libtrace_packet_t * packet, uint64_t order) {
2204        packet->order = order;
2205}
2206
2207DLLEXPORT void trace_packet_set_hash(libtrace_packet_t * packet, uint64_t hash) {
2208        packet->hash = hash;
2209}
2210
2211DLLEXPORT int trace_finished(libtrace_t * libtrace) {
2212        // TODO I don't like using this so much, we could use state!!!
2213        return libtrace->perpkt_thread_states[THREAD_FINISHED] == libtrace->perpkt_thread_count;
2214}
2215
2216DLLEXPORT int trace_parallel_config(libtrace_t *libtrace, trace_parallel_option_t option, void *value)
2217{
2218        UNUSED int ret = -1;
2219        switch (option) {
2220                case TRACE_OPTION_TICK_INTERVAL:
2221                        libtrace->config.tick_interval = *((int *) value);
2222                        return 1;
2223                case TRACE_OPTION_SET_HASHER:
2224                        return trace_set_hasher(libtrace, (enum hasher_types) *((int *) value), NULL, NULL);
2225                case TRACE_OPTION_SET_PERPKT_THREAD_COUNT:
2226                        libtrace->perpkt_thread_count = *((int *) value);
2227                        return 1;
2228                case TRACE_DROP_OUT_OF_ORDER:
2229                        if (*((int *) value))
2230                                libtrace->reporter_flags |= REDUCE_DROP_OOO;
2231                        else
2232                                libtrace->reporter_flags &= ~REDUCE_DROP_OOO;
2233                        return 1;
2234                case TRACE_OPTION_SEQUENTIAL:
2235                        if (*((int *) value))
2236                                libtrace->reporter_flags |= REDUCE_SEQUENTIAL;
2237                        else
2238                                libtrace->reporter_flags &= ~REDUCE_SEQUENTIAL;
2239                        return 1;
2240                case TRACE_OPTION_ORDERED:
2241                        if (*((int *) value))
2242                                libtrace->reporter_flags |= REDUCE_ORDERED;
2243                        else
2244                                libtrace->reporter_flags &= ~REDUCE_ORDERED;
2245                        return 1;
2246                case TRACE_OPTION_TRACETIME:
2247                        if(*((int *) value))
2248                                libtrace->tracetime = 1;
2249                        else
2250                                libtrace->tracetime = 0;
2251                        return 0;
2252                case TRACE_OPTION_SET_CONFIG:
2253                        libtrace->config = *((struct user_configuration *) value);
2254                case TRACE_OPTION_GET_CONFIG:
2255                        *((struct user_configuration *) value) = libtrace->config;
2256        }
2257        return 0;
2258}
2259
2260static bool config_bool_parse(char *value, size_t nvalue) {
2261        if (strncmp(value, "true", nvalue) == 0)
2262                return true;
2263        else if (strncmp(value, "false", nvalue) == 0)
2264                return false;
2265        else
2266                return strtoll(value, NULL, 10) != 0;
2267}
2268
2269static void config_string(struct user_configuration *uc, char *key, size_t nkey, char *value, size_t nvalue) {
2270        assert(key);
2271        assert(value);
2272        assert(uc);
2273        if (strncmp(key, "packet_cache_size", nkey) == 0
2274                || strncmp(key, "pcs", nkey) == 0) {
2275                uc->packet_cache_size = strtoll(value, NULL, 10);
2276        } else if (strncmp(key, "packet_thread_cache_size", nkey) == 0
2277                           || strncmp(key, "ptcs", nkey) == 0) {
2278                uc->packet_thread_cache_size = strtoll(value, NULL, 10);
2279        } else if (strncmp(key, "fixed_packet_count", nkey) == 0
2280                  || strncmp(key, "fpc", nkey) == 0) {
2281                uc->fixed_packet_count = config_bool_parse(value, nvalue);
2282        } else if (strncmp(key, "burst_size", nkey) == 0
2283                   || strncmp(key, "bs", nkey) == 0) {
2284                uc->burst_size = strtoll(value, NULL, 10);
2285        } else if (strncmp(key, "tick_interval", nkey) == 0
2286                   || strncmp(key, "ti", nkey) == 0) {
2287                uc->tick_interval = strtoll(value, NULL, 10);
2288        } else if (strncmp(key, "tick_count", nkey) == 0
2289                   || strncmp(key, "tc", nkey) == 0) {
2290                uc->tick_count = strtoll(value, NULL, 10);
2291        } else if (strncmp(key, "perpkt_threads", nkey) == 0
2292                   || strncmp(key, "pt", nkey) == 0) {
2293                uc->perpkt_threads = strtoll(value, NULL, 10);
2294        } else if (strncmp(key, "hasher_queue_size", nkey) == 0
2295                   || strncmp(key, "hqs", nkey) == 0) {
2296                uc->hasher_queue_size = strtoll(value, NULL, 10);
2297        } else if (strncmp(key, "hasher_polling", nkey) == 0
2298                   || strncmp(key, "hp", nkey) == 0) {
2299                uc->hasher_polling = config_bool_parse(value, nvalue);
2300        } else if (strncmp(key, "reporter_polling", nkey) == 0
2301                   || strncmp(key, "rp", nkey) == 0) {
2302                uc->reporter_polling = config_bool_parse(value, nvalue);
2303        } else if (strncmp(key, "reporter_thold", nkey) == 0
2304                   || strncmp(key, "rt", nkey) == 0) {
2305                uc->reporter_thold = strtoll(value, NULL, 10);
2306        } else if (strncmp(key, "debug_state", nkey) == 0
2307                   || strncmp(key, "ds", nkey) == 0) {
2308                uc->debug_state = config_bool_parse(value, nvalue);
2309        } else {
2310                fprintf(stderr, "No matching value %s(=%s)\n", key, value);
2311        }
2312}
2313
2314DLLEXPORT void parse_user_config(struct user_configuration* uc, char * str) {
2315        char *pch;
2316        char key[100];
2317        char value[100];
2318        assert(str);
2319        assert(uc);
2320        printf ("Splitting string \"%s\" into tokens:\n",str);
2321        pch = strtok (str," ,.-");
2322        while (pch != NULL)
2323        {
2324                if (sscanf(pch, "%99[^=]=%99s", key, value) == 2) {
2325                        config_string(uc, key, sizeof(key), value, sizeof(value));
2326                } else {
2327                        fprintf(stderr, "Error parsing %s\n", pch);
2328                }
2329                pch = strtok (NULL," ,.-");
2330        }
2331}
2332
2333DLLEXPORT void parse_user_config_file(struct user_configuration* uc, FILE *file) {
2334        char line[1024];
2335        while (fgets(line, sizeof(line), file) != NULL)
2336        {
2337                parse_user_config(uc, line);
2338        }
2339}
2340
2341DLLEXPORT libtrace_packet_t* trace_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet) {
2342        libtrace_packet_t* result;
2343        libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &result, 1, 1);
2344        assert(result);
2345        swap_packets(result, packet); // Move the current packet into our copy
2346        return result;
2347}
2348
2349DLLEXPORT void trace_free_result_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
2350        // Try write back the packet
2351        assert(packet);
2352        // Always release any resources this might be holding such as a slot in a ringbuffer
2353        trace_fin_packet(packet);
2354        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packet, 1, 1);
2355}
2356
2357DLLEXPORT libtrace_info_t *trace_get_information(libtrace_t * libtrace) {
2358        if (libtrace->format)
2359                return &libtrace->format->info;
2360        else
2361                return NULL;
2362}
Note: See TracBrowser for help on using the repository browser.