source: lib/trace_parallel.c @ f0e8bd6

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since f0e8bd6 was f0e8bd6, checked in by Richard Sanger <rsangerarj@…>, 7 years ago

Fixes reporter function bug.
Links configuration option for threads via the configuration struct.

  • Property mode set to 100644
File size: 80.7 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * All rights reserved.
8 *
9 * This code has been developed by the University of Waikato WAND
10 * research group. For further information please see http://www.wand.net.nz/
11 *
12 * libtrace is free software; you can redistribute it and/or modify
13 * it under the terms of the GNU General Public License as published by
14 * the Free Software Foundation; either version 2 of the License, or
15 * (at your option) any later version.
16 *
17 * libtrace is distributed in the hope that it will be useful,
18 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 * GNU General Public License for more details.
21 *
22 * You should have received a copy of the GNU General Public License
23 * along with libtrace; if not, write to the Free Software
24 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25 *
26 * $Id$
27 *
28 */
29
30
31#define _GNU_SOURCE
32#include "common.h"
33#include "config.h"
34#include <assert.h>
35#include <errno.h>
36#include <fcntl.h>
37#include <stdio.h>
38#include <stdlib.h>
39#include <string.h>
40#include <sys/stat.h>
41#include <sys/types.h>
42#ifndef WIN32
43#include <sys/socket.h>
44#endif
45#include <stdarg.h>
46#include <sys/param.h>
47
48#ifdef HAVE_LIMITS_H
49#  include <limits.h>
50#endif
51
52#ifdef HAVE_SYS_LIMITS_H
53#  include <sys/limits.h>
54#endif
55
56#ifdef HAVE_NET_IF_ARP_H
57#  include <net/if_arp.h>
58#endif
59
60#ifdef HAVE_NET_IF_H
61#  include <net/if.h>
62#endif
63
64#ifdef HAVE_NETINET_IN_H
65#  include <netinet/in.h>
66#endif
67
68#ifdef HAVE_NET_ETHERNET_H
69#  include <net/ethernet.h>
70#endif
71
72#ifdef HAVE_NETINET_IF_ETHER_H
73#  include <netinet/if_ether.h>
74#endif
75
76#include <time.h>
77#ifdef WIN32
78#include <sys/timeb.h>
79#endif
80
81#include "libtrace.h"
82#include "libtrace_int.h"
83
84#ifdef HAVE_PCAP_BPF_H
85#  include <pcap-bpf.h>
86#else
87#  ifdef HAVE_NET_BPF_H
88#    include <net/bpf.h>
89#  endif
90#endif
91
92
93#include "libtrace_int.h"
94#include "format_helper.h"
95#include "rt_protocol.h"
96#include "hash_toeplitz.h"
97
98#include <pthread.h>
99#include <signal.h>
100#include <unistd.h>
101
102
103static size_t trace_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packets[], size_t nb_packets);
104
105extern int libtrace_parallel;
106
107struct multithreading_stats {
108        uint64_t full_queue_hits;
109        uint64_t wait_for_fill_complete_hits;
110} contention_stats[1024];
111
112struct mem_stats {
113        struct memfail {
114           uint64_t cache_hit;
115           uint64_t ring_hit;
116           uint64_t miss;
117           uint64_t recycled;
118        } readbulk, read, write, writebulk;
119};
120
121// Grrr gcc wants this spelt out
122__thread struct mem_stats mem_hits = {{0},{0},{0},{0}};
123
124static void print_memory_stats() {
125        char t_name[50];
126        uint64_t total;
127        pthread_getname_np(pthread_self(), t_name, sizeof(t_name));
128
129        fprintf(stderr, "Thread ID#%d - %s\n", (int) pthread_self(), t_name);
130
131        total = mem_hits.read.cache_hit + mem_hits.read.ring_hit + mem_hits.read.miss;
132        if (total) {
133                fprintf(stderr, "\tRead:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
134                                mem_hits.read.cache_hit, mem_hits.read.ring_hit, mem_hits.read.miss, mem_hits.read.recycled);
135                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
136                                total, (double) mem_hits.read.miss / (double) total * 100.0);
137        }
138
139        total = mem_hits.readbulk.cache_hit + mem_hits.readbulk.ring_hit + mem_hits.readbulk.miss;
140        if (total) {
141                fprintf(stderr, "\tReadbulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
142                                mem_hits.readbulk.cache_hit, mem_hits.readbulk.ring_hit, mem_hits.readbulk.miss, mem_hits.readbulk.recycled);
143
144
145                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
146                                total, (double) mem_hits.readbulk.miss / (double) total * 100.0);
147        }
148
149        total = mem_hits.write.cache_hit + mem_hits.write.ring_hit + mem_hits.write.miss;
150        if (total) {
151                fprintf(stderr, "\tWrite:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
152                                mem_hits.write.cache_hit, mem_hits.write.ring_hit, mem_hits.write.miss, mem_hits.write.recycled);
153
154                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
155                                total, (double) mem_hits.write.miss / (double) total * 100.0);
156        }
157
158        total = mem_hits.writebulk.cache_hit + mem_hits.writebulk.ring_hit + mem_hits.writebulk.miss;
159        if (total) {
160                fprintf(stderr, "\tWritebulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
161                                mem_hits.writebulk.cache_hit, mem_hits.writebulk.ring_hit, mem_hits.writebulk.miss, mem_hits.writebulk.recycled);
162
163                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
164                                total, (double) mem_hits.writebulk.miss / (double) total * 100.0);
165        }
166
167}
168
169/**
170 * True if the trace has dedicated hasher thread otherwise false,
171 * to be used after the trace is running
172 */
173static inline int trace_has_dedicated_hasher(libtrace_t * libtrace)
174{
175        assert(libtrace->state != STATE_NEW);
176        return libtrace->hasher_thread.type == THREAD_HASHER;
177}
178
179/**
180 * True if the trace has dedicated hasher thread otherwise false,
181 * to be used after the trace is running
182 */
183static inline int trace_has_dedicated_reporter(libtrace_t * libtrace)
184{
185        assert(libtrace->state != STATE_NEW);
186        return libtrace->reporter_thread.type == THREAD_REPORTER && libtrace->reporter;
187}
188
189/**
190 * Changes a thread's state and broadcasts the condition variable. This
191 * should always be done when the lock is held.
192 *
193 * Additionally for perpkt threads the state counts are updated.
194 *
195 * @param trace A pointer to the trace
196 * @param t A pointer to the thread to modify
197 * @param new_state The new state of the thread
198 * @param need_lock Set to true if libtrace_lock is not held, otherwise
199 *        false in the case the lock is currently held by this thread.
200 */
201static inline void thread_change_state(libtrace_t *trace, libtrace_thread_t *t,
202        const enum thread_states new_state, const bool need_lock)
203{
204        enum thread_states prev_state;
205        if (need_lock)
206                pthread_mutex_lock(&trace->libtrace_lock);
207        prev_state = t->state;
208        t->state = new_state;
209        if (t->type == THREAD_PERPKT) {
210                --trace->perpkt_thread_states[prev_state];
211                ++trace->perpkt_thread_states[new_state];
212        }
213
214        if (trace->config.debug_state)
215                fprintf(stderr, "Thread %d state changed from %d to %d\n", (int) t->tid,
216                        prev_state, t->state);
217
218        if (need_lock)
219                pthread_mutex_unlock(&trace->libtrace_lock);
220        pthread_cond_broadcast(&trace->perpkt_cond);
221}
222
223/**
224 * Changes the overall traces state and signals the condition.
225 *
226 * @param trace A pointer to the trace
227 * @param new_state The new state of the trace
228 * @param need_lock Set to true if libtrace_lock is not held, otherwise
229 *        false in the case the lock is currently held by this thread.
230 */
231static inline void libtrace_change_state(libtrace_t *trace, 
232        const enum trace_state new_state, const bool need_lock)
233{
234        UNUSED enum trace_state prev_state;
235        if (need_lock)
236                pthread_mutex_lock(&trace->libtrace_lock);
237        prev_state = trace->state;
238        trace->state = new_state;
239
240        if (trace->config.debug_state)
241                fprintf(stderr, "Trace(%s) state changed from %s to %s\n",
242                        trace->uridata, get_trace_state_name(prev_state),
243                        get_trace_state_name(trace->state));
244
245        if (need_lock)
246                pthread_mutex_unlock(&trace->libtrace_lock);
247        pthread_cond_broadcast(&trace->perpkt_cond);
248}
249
250/**
251 * @return True if the format supports parallel threads.
252 */
253static inline bool trace_supports_parallel(libtrace_t *trace)
254{
255        assert(trace);
256        assert(trace->format);
257        if (trace->format->pstart_input)
258                return true;
259        else
260                return false;
261}
262
263DLLEXPORT void print_contention_stats(libtrace_t *libtrace) {
264        int i;
265        struct multithreading_stats totals = {0};
266        for (i = 0; i < libtrace->perpkt_thread_count ; i++) {
267                fprintf(stderr, "\nStats for perpkt thread#%d\n", i);
268                fprintf(stderr, "\tfull_queue_hits: %"PRIu64"\n", contention_stats[i].full_queue_hits);
269                totals.full_queue_hits += contention_stats[i].full_queue_hits;
270                fprintf(stderr, "\twait_for_fill_complete_hits: %"PRIu64"\n", contention_stats[i].wait_for_fill_complete_hits);
271                totals.wait_for_fill_complete_hits += contention_stats[i].wait_for_fill_complete_hits;
272        }
273        fprintf(stderr, "\nTotals for perpkt threads\n");
274        fprintf(stderr, "\tfull_queue_hits: %"PRIu64"\n", totals.full_queue_hits);
275        fprintf(stderr, "\twait_for_fill_complete_hits: %"PRIu64"\n", totals.wait_for_fill_complete_hits);
276
277        return;
278}
279
280void libtrace_zero_thread(libtrace_thread_t * t) {
281        t->trace = NULL;
282        t->ret = NULL;
283        t->type = THREAD_EMPTY;
284        libtrace_zero_ringbuffer(&t->rbuffer);
285        libtrace_zero_vector(&t->vector);
286        libtrace_zero_deque(&t->deque);
287        t->recorded_first = false;
288        t->perpkt_num = -1;
289        t->accepted_packets = 0;
290}
291
292// Ints are aligned int is atomic so safe to read and write at same time
293// However write must be locked, read doesn't (We never try read before written to table)
294libtrace_thread_t * get_thread_table(libtrace_t *libtrace) {
295        int i = 0;
296        pthread_t tid = pthread_self();
297
298        for (;i<libtrace->perpkt_thread_count ;++i) {
299                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
300                        return &libtrace->perpkt_threads[i];
301        }
302        return NULL;
303}
304
305int get_thread_table_num(libtrace_t *libtrace) {
306        int i = 0;
307        pthread_t tid = pthread_self();
308        for (;i<libtrace->perpkt_thread_count; ++i) {
309                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
310                        return i;
311        }
312        return -1;
313}
314
315static libtrace_thread_t * get_thread_descriptor(libtrace_t *libtrace) {
316        libtrace_thread_t *ret;
317        if (!(ret = get_thread_table(libtrace))) {
318                pthread_t tid = pthread_self();
319                // Check if we are reporter or something else
320                if (pthread_equal(tid, libtrace->reporter_thread.tid))
321                        ret = &libtrace->reporter_thread;
322                else if (pthread_equal(tid, libtrace->hasher_thread.tid))
323                        ret = &libtrace->hasher_thread;
324                else
325                        ret = NULL;
326        }
327        return ret;
328}
329
330/** Used below in trace_make_results_packets_safe*/
331static void do_copy_result_packet(void *data)
332{
333        libtrace_result_t *res = (libtrace_result_t *)data;
334        if (res->type == RESULT_PACKET) {
335                // Duplicate the packet in standard malloc'd memory and free the
336                // original, This is a 1:1 exchange so is ocache count remains unchanged.
337                libtrace_packet_t *oldpkt, *dup;
338                oldpkt = (libtrace_packet_t *) res->value;
339                dup = trace_copy_packet(oldpkt);
340                res->value = (void *)dup;
341                trace_destroy_packet(oldpkt);
342        }
343}
344
345/**
346 * Make a safe replacement copy of any result packets that are owned
347 * by the format in the result queue. Used when pausing traces.
348 */ 
349static void trace_make_results_packets_safe(libtrace_t *trace) {
350        libtrace_thread_t *t = get_thread_descriptor(trace);
351        if (trace->reporter_flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED))
352                libtrace_deque_apply_function(&t->deque, &do_copy_result_packet);
353        else 
354                libtrace_vector_apply_function(&t->vector, &do_copy_result_packet);
355}
356
357/**
358 * Holds threads in a paused state, until released by broadcasting
359 * the condition mutex.
360 */
361static void trace_thread_pause(libtrace_t *trace, libtrace_thread_t *t) {
362        if (t->type == THREAD_PERPKT)
363                trace_make_results_packets_safe(trace);
364        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
365        thread_change_state(trace, t, THREAD_PAUSED, false);
366        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
367                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
368        }
369        thread_change_state(trace, t, THREAD_RUNNING, false);
370        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
371}
372
373
374
375/**
376 * Dispatches packets to their correct place and applies any translations
377 * as needed
378 * @param trace
379 * @param t
380 * @param packet (in, out) this will be set to NULL if the user doesn't return the packet for reuse
381 * @return -1 if an error or EOF has occured and the trace should end otherwise 0 to continue as normal
382 */
383static inline int dispatch_packets(libtrace_t *trace, libtrace_thread_t *t, libtrace_packet_t **packets,
384                                   size_t nb_packets) {
385        libtrace_message_t message;
386        size_t i;
387        for (i = 0; i < nb_packets; ++i) {
388                if (packets[i]->error > 0) {
389                        packets[i] = (*trace->per_pkt)(trace, packets[i], NULL, t);
390                } else if (packets[i]->error == READ_TICK) {
391                        message.code = MESSAGE_TICK;
392                        message.additional.uint64 = trace_packet_get_order(packets[i]);
393                        message.sender = t;
394                        (*trace->per_pkt)(trace, NULL, &message, t);
395                } else if (packets[i]->error != READ_MESSAGE) {
396                        // An error this should be the last packet we read
397                        size_t z;
398                        // We could have an eof or error and a message such as pause
399                        for (z = i ; z < nb_packets; ++z) {
400                                fprintf(stderr, "i=%d nb_packet=%d err=%d\n", (int) z, (int) nb_packets, packets[z]->error);
401                                assert (packets[z]->error <= 0);
402                        }
403                        return -1;
404                }
405                // -2 is a message its not worth checking now just finish this lot and we'll check
406                // when we loop next
407        }
408        return 0;
409}
410
411/**
412 * The is the entry point for our packet processing threads.
413 */
414static void* perpkt_threads_entry(void *data) {
415        libtrace_t *trace = (libtrace_t *)data;
416        libtrace_thread_t * t;
417        libtrace_message_t message = {0};
418        libtrace_packet_t *packets[trace->config.burst_size];
419        size_t nb_packets;
420        size_t i;
421
422        memset(&packets, 0, sizeof(void*) * trace->config.burst_size);
423        // Force this thread to wait until trace_pstart has been completed
424        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
425        t = get_thread_table(trace);
426        assert(t);
427        //printf("Yay Started perpkt thread #%d\n", (int) get_thread_table_num(trace));
428        if (trace->format->pregister_thread) {
429                trace->format->pregister_thread(trace, t, !trace_has_dedicated_hasher(trace));
430        }
431        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
432
433        /* ~~~~~~~~~~~ Setup complete now we loop ~~~~~~~~~~~~~~~ */
434        // Send a message to say we've started
435
436        // Let the per_packet function know we have started
437        message.code = MESSAGE_STARTING;
438        message.sender = t;
439        (*trace->per_pkt)(trace, NULL, &message, t);
440        message.code = MESSAGE_RESUMING;
441        (*trace->per_pkt)(trace, NULL, &message, t);
442
443
444        for (;;) {
445
446                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
447                        switch (message.code) {
448                                case MESSAGE_DO_PAUSE: // This is internal
449                                        // Send message to say we are pausing, TODO consider sender
450                                        message.code = MESSAGE_PAUSING;
451                                        message.sender = t;
452                                        (*trace->per_pkt)(trace, NULL, &message, t);
453                                        // If a hasher thread is running empty input queues so we don't lose data
454                                        if (trace_has_dedicated_hasher(trace)) {
455                                                fprintf(stderr, "Trace is using a hasher thread emptying queues\n");
456                                                // The hasher has stopped by this point, so the queue shouldn't be filling
457                                                while(!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
458                                                        ASSERT_RET(trace_pread_packet(trace, t, packets, 1), == 1);
459                                                        if (dispatch_packets(trace, t, packets, 1) == -1) {
460                                                                // EOF or error, either way we'll stop
461                                                                while (!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
462                                                                        ASSERT_RET(trace_pread_packet(trace, t, packets, 1), == 1);
463                                                                        // No packets after this should have any data in them
464                                                                        assert(packets[0]->error <= 0);
465                                                                }
466                                                                goto stop;
467                                                        }
468                                                }
469                                        }
470                                        // Now we do the actual pause, this returns when we are done
471                                        trace_thread_pause(trace, t);
472                                        message.code = MESSAGE_RESUMING;
473                                        (*trace->per_pkt)(trace, NULL, &message, t);
474                                        // Check for new messages as soon as we return
475                                        continue;
476                                case MESSAGE_DO_STOP: // This is internal
477                                        goto stop;
478                        }
479                        (*trace->per_pkt)(trace, NULL, &message, t);
480                        continue;
481                }
482
483                if (trace->perpkt_thread_count == 1) {
484                        if (!packets[0]) {
485                                libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packets[0], 1, 1);
486                        }
487                        assert(packets[0]);
488                        packets[0]->error = trace_read_packet(trace, packets[0]);
489                        nb_packets = 1;
490                } else {
491                        nb_packets = trace_pread_packet(trace, t, packets, trace->config.burst_size);
492                }
493                // Loop through the packets we just read
494                if (dispatch_packets(trace, t, packets, nb_packets) == -1)
495                        break;
496        }
497
498
499stop:
500        /* ~~~~~~~~~~~~~~ Trace is finished do tear down ~~~~~~~~~~~~~~~~~~~~~ */
501
502        // Let the per_packet function know we have stopped
503        message.code = MESSAGE_PAUSING;
504        message.sender = t;
505        (*trace->per_pkt)(trace, NULL, &message, t);
506        message.code = MESSAGE_STOPPING;
507        message.additional.uint64 = 0;
508        (*trace->per_pkt)(trace, NULL, &message, t);
509
510        // Free any remaining packets
511        for (i = 0; i < trace->config.burst_size; i++) {
512                if (packets[i]) {
513                        libtrace_ocache_free(&trace->packet_freelist, (void **) &packets[i], 1, 1);
514                        packets[i] = NULL;
515                }
516        }
517
518       
519        thread_change_state(trace, t, THREAD_FINISHED, true);
520
521        // Notify only after we've defiantly set the state to finished
522        message.code = MESSAGE_PERPKT_ENDED;
523        message.additional.uint64 = 0;
524        trace_send_message_to_reporter(trace, &message);
525
526        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
527        if (trace->format->punregister_thread) {
528                trace->format->punregister_thread(trace, t);
529        }
530        print_memory_stats();
531
532        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
533
534        pthread_exit(NULL);
535};
536
537/**
538 * The start point for our single threaded hasher thread, this will read
539 * and hash a packet from a data source and queue it against the correct
540 * core to process it.
541 */
542static void* hasher_entry(void *data) {
543        libtrace_t *trace = (libtrace_t *)data;
544        libtrace_thread_t * t;
545        int i;
546        libtrace_packet_t * packet;
547        libtrace_message_t message = {0};
548
549        assert(trace_has_dedicated_hasher(trace));
550        /* Wait until all threads are started and objects are initialised (ring buffers) */
551        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
552        t = &trace->hasher_thread;
553        assert(t->type == THREAD_HASHER && pthread_equal(pthread_self(), t->tid));
554        printf("Hasher Thread started\n");
555        if (trace->format->pregister_thread) {
556                trace->format->pregister_thread(trace, t, true);
557        }
558        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
559        int pkt_skipped = 0;
560        /* Read all packets in then hash and queue against the correct thread */
561        while (1) {
562                int thread;
563                if (!pkt_skipped)
564                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
565                assert(packet);
566
567                if (libtrace_halt) // Signal to die has been sent - TODO
568                        break;
569
570                // Check for messages that we expect MESSAGE_DO_PAUSE, (internal messages only)
571                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
572                        switch(message.code) {
573                                case MESSAGE_DO_PAUSE:
574                                        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
575                                        thread_change_state(trace, t, THREAD_PAUSED, false);
576                                        pthread_cond_broadcast(&trace->perpkt_cond);
577                                        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
578                                                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
579                                        }
580                                        thread_change_state(trace, t, THREAD_RUNNING, false);
581                                        pthread_cond_broadcast(&trace->perpkt_cond);
582                                        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
583                                        break;
584                                case MESSAGE_DO_STOP:
585                                        // Stop called after pause
586                                        assert(trace->started == false);
587                                        assert(trace->state == STATE_FINSHED);
588                                        break;
589                                default:
590                                        fprintf(stderr, "Hasher thread didn't expect message code=%d\n", message.code);
591                        }
592                        pkt_skipped = 1;
593                        continue;
594                }
595
596                if ((packet->error = trace_read_packet(trace, packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
597                        break; /* We are EOF or error'd either way we stop  */
598                }
599
600                /* We are guaranteed to have a hash function i.e. != NULL */
601                trace_packet_set_hash(packet, (*trace->hasher)(packet, trace->hasher_data));
602                thread = trace_packet_get_hash(packet) % trace->perpkt_thread_count;
603                /* Blocking write to the correct queue - I'm the only writer */
604                if (trace->perpkt_threads[thread].state != THREAD_FINISHED) {
605                        uint64_t order = trace_packet_get_order(packet);
606                        libtrace_ringbuffer_write(&trace->perpkt_threads[thread].rbuffer, packet);
607                        if (trace->config.tick_count && order % trace->config.tick_count == 0) {
608                                // Write ticks to everyone else
609                                libtrace_packet_t * pkts[trace->perpkt_thread_count];
610                                memset(pkts, 0, sizeof(void *) * trace->perpkt_thread_count);
611                                libtrace_ocache_alloc(&trace->packet_freelist, (void **) pkts, trace->perpkt_thread_count, trace->perpkt_thread_count);
612                                for (i = 0; i < trace->perpkt_thread_count; i++) {
613                                        pkts[i]->error = READ_TICK;
614                                        trace_packet_set_order(pkts[i], order);
615                                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, pkts[i]);
616                                }
617                        }
618                        pkt_skipped = 0;
619                } else {
620                        assert(!"Dropping a packet!!");
621                        pkt_skipped = 1; // Reuse that packet no one read it
622                }
623        }
624
625        /* Broadcast our last failed read to all threads */
626        for (i = 0; i < trace->perpkt_thread_count; i++) {
627                libtrace_packet_t * bcast;
628                fprintf(stderr, "Broadcasting error/EOF now the trace is over\n");
629                if (i == trace->perpkt_thread_count - 1) {
630                        bcast = packet;
631                } else {
632                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &bcast, 1, 1);
633                        bcast->error = packet->error;
634                }
635                ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
636                if (trace->perpkt_threads[i].state != THREAD_FINISHED) {
637                        // Unlock early otherwise we could deadlock
638                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, bcast);
639                        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
640                } else {
641                        fprintf(stderr, "SKIPPING THREAD !!!%d!!!/n", (int) i);
642                        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
643                }
644        }
645
646        // We don't need to free the packet
647        thread_change_state(trace, t, THREAD_FINISHED, true);
648
649        // Notify only after we've defiantly set the state to finished
650        message.code = MESSAGE_PERPKT_ENDED;
651        message.additional.uint64 = 0;
652        trace_send_message_to_reporter(trace, &message);
653        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
654        if (trace->format->punregister_thread) {
655                trace->format->punregister_thread(trace, t);
656        }
657        print_memory_stats();
658        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
659
660        // TODO remove from TTABLE t sometime
661        pthread_exit(NULL);
662};
663
664/**
665 * Moves src into dest(Complete copy) and copies the memory buffer and
666 * its flags from dest into src ready for reuse without needing extra mallocs.
667 */
668static inline void swap_packets(libtrace_packet_t *dest, libtrace_packet_t *src) {
669        // Save the passed in buffer status
670        assert(dest->trace == NULL); // Must be a empty packet
671        void * temp_buf = dest->buffer;
672        buf_control_t temp_buf_control = dest->buf_control;
673        // Completely copy StoredPacket into packet
674        memcpy(dest, src, sizeof(libtrace_packet_t));
675        // Set the buffer settings on the returned packet
676        src->buffer = temp_buf;
677        src->buf_control = temp_buf_control;
678        src->trace = NULL;
679}
680
681/**
682 * @brief Move NULLs to the end of an array.
683 * @param values
684 * @param len
685 * @return The location the first NULL, aka the number of non NULL elements
686 */
687static inline size_t move_nulls_back(void *arr[], size_t len) {
688        size_t fr=0, en = len-1;
689        // Shift all non NULL elements to the front of the array, and NULLs to the
690        // end, traverses every element at most once
691        for (;fr < en; ++fr) {
692                if (arr[fr] == NULL) {
693                        for (;en > fr; --en) {
694                                if(arr[en]) {
695                                        arr[fr] = arr[en];
696                                        arr[en] = NULL;
697                                        break;
698                                }
699                        }
700                }
701        }
702        // This is the index of the first NULL
703        en = MIN(fr, en);
704        // Or the end of the array if this special case
705        if (arr[en])
706                en++;
707        return en;
708}
709
710/** returns the number of packets successfully allocated in the final array
711 these will all be at the front of the array */
712inline static size_t fill_array_with_empty_packets(libtrace_t *libtrace, libtrace_packet_t *packets[], size_t nb_packets) {
713        size_t nb;
714        nb = move_nulls_back((void **) packets, nb_packets);
715        mem_hits.read.recycled += nb;
716        nb += libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &packets[nb], nb_packets - nb, nb_packets - nb);
717        assert(nb_packets == nb);
718        return nb;
719}
720
721
722inline static size_t empty_array_of_packets(libtrace_t *libtrace, libtrace_packet_t *packets[], size_t nb_packets) {
723        size_t nb;
724        nb = move_nulls_back((void **) packets, nb_packets);
725        mem_hits.write.recycled += nb_packets - nb;
726        nb += nb_packets - libtrace_ocache_free(&libtrace->packet_freelist, (void **)packets, nb, nb);
727        memset(packets, 0, nb); // XXX make better, maybe do this in ocache??
728        return nb;
729}
730
731/* Our simplest case when a thread becomes ready it can obtain an exclusive
732 * lock to read packets from the underlying trace.
733 */
734inline static size_t trace_pread_packet_first_in_first_served(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packets[], size_t nb_packets)
735{
736        size_t i = 0;
737        bool tick_hit = false;
738
739        nb_packets = fill_array_with_empty_packets(libtrace, packets, nb_packets);
740
741        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
742        /* Read nb_packets */
743        for (i = 0; i < nb_packets; ++i) {
744                packets[i]->error = trace_read_packet(libtrace, packets[i]);
745                if (packets[i]->error <= 0) {
746                        ++i;
747                        break;
748                }
749                /*
750                if (libtrace->config.tick_count && trace_packet_get_order(packets[i]) % libtrace->config.tick_count == 0) {
751                        tick_hit = true;
752                }*/
753        }
754        // Doing this inside the lock ensures the first packet is always
755        // recorded first
756        if (packets[0]->error > 0) {
757                store_first_packet(libtrace, packets[0], t);
758        }
759        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
760        /* XXX TODO this needs to be inband with packets, or we don't bother in this case
761        if (tick_hit) {
762                libtrace_message_t tick;
763                tick.additional.uint64 = trace_packet_get_order(packets[i]);
764                tick.code = MESSAGE_TICK;
765                trace_send_message_to_perpkts(libtrace, &tick);
766        } */
767        return i;
768}
769
770/**
771 * For the case that we have a dedicated hasher thread
772 * 1. We read a packet from our buffer
773 * 2. Move that into the packet provided (packet)
774 */
775inline static size_t trace_pread_packet_hasher_thread(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packets, size_t nb_packets)
776{
777        size_t i;
778
779        // Always grab at least one
780        if (packets[0]) // Recycle the old get the new
781                libtrace_ocache_free(&libtrace->packet_freelist, (void **) packets, 1, 1);
782        packets[0] = libtrace_ringbuffer_read(&t->rbuffer);
783
784        if (packets[0]->error < 0)
785                return 1;
786
787        for (i = 1; i < nb_packets; i++) {
788                if (packets[i]) // Recycle the old get the new
789                        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packets[i], 1, 1);
790                if (!libtrace_ringbuffer_try_read(&t->rbuffer, (void **) &packets[i])) {
791                        packets[i] = NULL;
792                        break;
793                }
794                // These are typically urgent
795                if (packets[i]->error < 0)
796                        break;
797        }
798       
799        return i;
800}
801
802/**
803 * Tries to read from our queue and returns 1 if a packet was retrieved
804 */
805static inline int try_waiting_queue(libtrace_t *libtrace, libtrace_thread_t * t, libtrace_packet_t **packet, int * ret)
806{
807        libtrace_packet_t* retrived_packet;
808
809        /* Lets see if we have one waiting */
810        if (libtrace_ringbuffer_try_read(&t->rbuffer, (void **) &retrived_packet)) {
811                /* Copy paste from trace_pread_packet_hasher_thread() except that we try read (non-blocking) */
812                assert(retrived_packet);
813
814                if (*packet) // Recycle the old get the new
815                        libtrace_ocache_free(&libtrace->packet_freelist, (void **) packet, 1, 1);
816                *packet = retrived_packet;
817                *ret = (*packet)->error;
818                return 1;
819        }
820        return 0;
821}
822
823/**
824 * Allows us to ensure all threads are finished writing to our threads ring_buffer
825 * before returning EOF/error.
826 */
827inline static int trace_handle_finishing_perpkt(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t)
828{
829        /* We are waiting for the condition that another thread ends to check
830         * our queue for new data, once all threads end we can go to finished */
831        bool complete = false;
832        int ret;
833
834        do {
835                // Wait for a thread to end
836                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
837
838                // Check before
839                if (libtrace->perpkt_thread_states[THREAD_FINISHING] == libtrace->perpkt_thread_count) {
840                        complete = true;
841                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
842                        continue;
843                }
844
845                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
846
847                // Check after
848                if (libtrace->perpkt_thread_states[THREAD_FINISHING] == libtrace->perpkt_thread_count) {
849                        complete = true;
850                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
851                        continue;
852                }
853
854                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
855
856                // Always trying to keep our buffer empty for the unlikely case more threads than buffer space want to write into our queue
857                if(try_waiting_queue(libtrace, t, packet, &ret))
858                        return ret;
859        } while (!complete);
860
861        // We can only end up here once all threads complete
862        try_waiting_queue(libtrace, t, packet, &ret);
863
864        return ret;
865        // TODO rethink this logic fix bug here
866}
867
868/**
869 * Expects the libtrace_lock to not be held
870 */
871inline static int trace_finish_perpkt(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t)
872{
873        thread_change_state(libtrace, t, THREAD_FINISHING, true);
874        return trace_handle_finishing_perpkt(libtrace, packet, t);
875}
876
877/**
878 * This case is much like the dedicated hasher, except that we will become
879 * hasher if we don't have a a packet waiting.
880 *
881 * Note: This is only every used if we have are doing hashing.
882 *
883 * TODO: Can block on zero copy formats such as ring: and dpdk: if the
884 * queue sizes in total are larger than the ring size.
885 *
886 * 1. We read a packet from our buffer
887 * 2. Move that into the packet provided (packet)
888 */
889inline static int trace_pread_packet_hash_locked(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packet)
890{
891        int thread, ret/*, psize*/;
892
893        while (1) {
894                if(try_waiting_queue(libtrace, t, packet, &ret))
895                        return ret;
896                // Can still block here if another thread is writing to a full queue
897                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
898
899                // Its impossible for our own queue to overfill, because no one can write
900                // when we are in the lock
901                if(try_waiting_queue(libtrace, t, packet, &ret)) {
902                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
903                        return ret;
904                }
905
906                // Another thread cannot write a packet because a queue has filled up. Is it ours?
907                if (libtrace->perpkt_queue_full) {
908                        contention_stats[t->perpkt_num].wait_for_fill_complete_hits++;
909                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
910                        continue;
911                }
912
913                if (!*packet)
914                        libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) packet, 1, 1);
915                assert(*packet);
916
917                // If we fail here we can guarantee that our queue is empty (and no new data will be added because we hold the lock)
918                if (libtrace_halt || ((*packet)->error = trace_read_packet(libtrace, *packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
919                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
920                        if (libtrace_halt)
921                                return 0;
922                        else
923                                return (*packet)->error;
924                }
925
926                trace_packet_set_hash(*packet, (*libtrace->hasher)(*packet, libtrace->hasher_data));
927                thread = trace_packet_get_hash(*packet) % libtrace->perpkt_thread_count;
928                if (thread == t->perpkt_num) {
929                        // If it's this thread we must be in order because we checked the buffer once we got the lock
930                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
931                        return (*packet)->error;
932                }
933
934                if (libtrace->perpkt_threads[thread].state != THREAD_FINISHED) {
935                        while (!libtrace_ringbuffer_try_swrite_bl(&libtrace->perpkt_threads[thread].rbuffer, *packet)) {
936                                libtrace->perpkt_queue_full = true;
937                                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
938                                contention_stats[t->perpkt_num].full_queue_hits++;
939                                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
940                        }
941                        *packet = NULL;
942                        libtrace->perpkt_queue_full = false;
943                } else {
944                        /* We can get here if the user closes the thread before natural completion/or error */
945                        assert (!"packet_hash_locked() The user terminated the trace in a abnormal manner");
946                }
947                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
948        }
949}
950
951/**
952 * This case is much like the dedicated hasher, except that we will become
953 * hasher if we don't have a packet waiting.
954 *
955 * TODO: You can lose the tail of a trace if the final thread
956 * fills its own queue and therefore breaks early and doesn't empty the sliding window.
957 *
958 * TODO: Can block on zero copy formats such as ring: and dpdk: if the
959 * queue sizes in total are larger than the ring size.
960 *
961 * 1. We read a packet from our buffer
962 * 2. Move that into the packet provided (packet)
963 */
964inline static int trace_pread_packet_sliding_window(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packet)
965{
966        int ret, i, thread/*, psize*/;
967
968        if (t->state == THREAD_FINISHING)
969                return trace_handle_finishing_perpkt(libtrace, packet, t);
970
971        while (1) {
972                // Check if we have packets ready
973                if(try_waiting_queue(libtrace, t, packet, &ret))
974                        return ret;
975
976                // We limit the number of packets we get to the size of the sliding window
977                // such that it is impossible for any given thread to fail to store a packet
978                ASSERT_RET(sem_wait(&libtrace->sem), == 0);
979                /*~~~~Single threaded read of a packet~~~~*/
980                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
981
982                /* Re-check our queue things we might have data waiting */
983                if(try_waiting_queue(libtrace, t, packet, &ret)) {
984                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
985                        ASSERT_RET(sem_post(&libtrace->sem), == 0);
986                        return ret;
987                }
988
989                // TODO put on *proper* condition variable
990                if (libtrace->perpkt_queue_full) {
991                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
992                        ASSERT_RET(sem_post(&libtrace->sem), == 0);
993                        contention_stats[t->perpkt_num].wait_for_fill_complete_hits++;
994                        continue;
995                }
996
997                if (!*packet)
998                        libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) packet, 1, 1);
999                assert(*packet);
1000
1001                if (libtrace_halt || ((*packet)->error = trace_read_packet(libtrace, *packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
1002                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1003                        ASSERT_RET(sem_post(&libtrace->sem), == 0);
1004                        // Finish this thread ensuring that any data written later by another thread is retrieved also
1005                        if (libtrace_halt)
1006                                return 0;
1007                        else
1008                                return trace_finish_perpkt(libtrace, packet, t);
1009                }
1010                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1011
1012                /* ~~~~Multiple threads can run the hasher~~~~ */
1013                trace_packet_set_hash(*packet, (*libtrace->hasher)(*packet, libtrace->hasher_data));
1014
1015                /* Yes this is correct opposite read lock for a write operation */
1016                ASSERT_RET(pthread_rwlock_rdlock(&libtrace->window_lock), == 0);
1017                if (!libtrace_slidingwindow_try_write(&libtrace->sliding_window, trace_packet_get_order(*packet), *packet))
1018                        assert(!"Semaphore should stop us from ever overfilling the sliding window");
1019                ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0);
1020                *packet = NULL;
1021
1022                // Always try read any data from the sliding window
1023                while (libtrace_slidingwindow_read_ready(&libtrace->sliding_window)) {
1024                        ASSERT_RET(pthread_rwlock_wrlock(&libtrace->window_lock), == 0);
1025                        if (libtrace->perpkt_queue_full) {
1026                                // I might be the holdup in which case if I can read my queue I should do that and return
1027                                if(try_waiting_queue(libtrace, t, packet, &ret)) {
1028                                        ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0);
1029                                        return ret;
1030                                }
1031                                ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0);
1032                                continue;
1033                        }
1034                        // Read greedily as many as we can
1035                        while (libtrace_slidingwindow_try_read(&libtrace->sliding_window, (void **) packet, NULL)) {
1036                                thread = trace_packet_get_hash(*packet) % libtrace->perpkt_thread_count;
1037                                if (libtrace->perpkt_threads[thread].state != THREAD_FINISHED) {
1038                                        while (!libtrace_ringbuffer_try_swrite_bl(&libtrace->perpkt_threads[thread].rbuffer, *packet)) {
1039                                                if (t->perpkt_num == thread)
1040                                                {
1041                                                        // TODO think about this case more because we have to stop early if this were to happen on the last read
1042                                                        // before EOF/error we might not have emptied the sliding window
1043                                                        printf("!~!~!~!~!~!~In this Code~!~!~!~!\n");
1044                                                        // Its our queue we must have a packet to read out
1045                                                        if(try_waiting_queue(libtrace, t, packet, &ret)) {
1046                                                                // We must be able to write this now 100% without fail
1047                                                                libtrace_ringbuffer_write(&libtrace->perpkt_threads[thread].rbuffer, *packet);
1048                                                                ASSERT_RET(sem_post(&libtrace->sem), == 0);
1049                                                                ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0);
1050                                                                return ret;
1051                                                        } else {
1052                                                                assert(!"Our queue is full but I cannot read from it??");
1053                                                        }
1054                                                }
1055                                                // Not us we have to give the other threads a chance to write there packets then
1056                                                libtrace->perpkt_queue_full = true;
1057                                                ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0);
1058                                                for (i = 0; i < libtrace->perpkt_thread_count-1; i++) // Release all other threads to read there packets
1059                                                        ASSERT_RET(sem_post(&libtrace->sem), == 0);
1060
1061                                                contention_stats[t->perpkt_num].full_queue_hits++;
1062                                                ASSERT_RET(pthread_rwlock_wrlock(&libtrace->window_lock), == 0);
1063                                                // Grab these back
1064                                                for (i = 0; i < libtrace->perpkt_thread_count-1; i++) // Release all other threads to read there packets
1065                                                        ASSERT_RET(sem_wait(&libtrace->sem), == 0);
1066                                                libtrace->perpkt_queue_full = false;
1067                                        }
1068                                        ASSERT_RET(sem_post(&libtrace->sem), == 0);
1069                                        *packet = NULL;
1070                                } else {
1071                                        // Cannot write to a queue if no ones waiting (I think this is unreachable)
1072                                        // in the general case (unless the user ends early without proper clean up).
1073                                        assert (!"unreachable code??");
1074                                }
1075                        }
1076                        ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0);
1077                }
1078                // Now we go back to checking our queue anyways
1079        }
1080}
1081
1082
1083/**
1084 * For the first packet of each queue we keep a copy and note the system
1085 * time it was received at.
1086 *
1087 * This is used for finding the first packet when playing back a trace
1088 * in trace time. And can be used by real time applications to print
1089 * results out every XXX seconds.
1090 */
1091void store_first_packet(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t)
1092{
1093        if (!t->recorded_first) {
1094                struct timeval tv;
1095                libtrace_packet_t * dup;
1096                // For what it's worth we can call these outside of the lock
1097                gettimeofday(&tv, NULL);
1098                dup = trace_copy_packet(packet);
1099                ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
1100                libtrace->first_packets.packets[t->perpkt_num].packet = dup;
1101                //printf("Stored first packet time=%f\n", trace_get_seconds(dup));
1102                memcpy(&libtrace->first_packets.packets[t->perpkt_num].tv, &tv, sizeof(tv));
1103                // Now update the first
1104                libtrace->first_packets.count++;
1105                if (libtrace->first_packets.count == 1) {
1106                        // We the first entry hence also the first known packet
1107                        libtrace->first_packets.first = t->perpkt_num;
1108                } else {
1109                        // Check if we are newer than the previous 'first' packet
1110                        size_t first = libtrace->first_packets.first;
1111                        if (trace_get_seconds(dup) <
1112                                trace_get_seconds(libtrace->first_packets.packets[first].packet))
1113                                libtrace->first_packets.first = t->perpkt_num;
1114                }
1115                ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
1116                libtrace_message_t mesg = {0};
1117                mesg.code = MESSAGE_FIRST_PACKET;
1118                trace_send_message_to_reporter(libtrace, &mesg);
1119                t->recorded_first = true;
1120        }
1121}
1122
1123/**
1124 * Returns 1 if it's certain that the first packet is truly the first packet
1125 * rather than a best guess based upon threads that have published so far.
1126 * Otherwise 0 is returned.
1127 * It's recommended that this result is stored rather than calling this
1128 * function again.
1129 */
1130DLLEXPORT int retrive_first_packet(libtrace_t *libtrace, libtrace_packet_t **packet, struct timeval **tv)
1131{
1132        int ret = 0;
1133        ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
1134        if (libtrace->first_packets.count) {
1135                *packet = libtrace->first_packets.packets[libtrace->first_packets.first].packet;
1136                *tv = &libtrace->first_packets.packets[libtrace->first_packets.first].tv;
1137                if (libtrace->first_packets.count == (size_t) libtrace->perpkt_thread_count) {
1138                        ret = 1;
1139                } else {
1140                        struct timeval curr_tv;
1141                        // If a second has passed since the first entry we will assume this is the very first packet
1142                        gettimeofday(&curr_tv, NULL);
1143                        if (curr_tv.tv_sec > (*tv)->tv_sec) {
1144                                if(curr_tv.tv_usec > (*tv)->tv_usec || curr_tv.tv_sec - (*tv)->tv_sec > 1) {
1145                                        ret = 1;
1146                                }
1147                        }
1148                }
1149        } else {
1150                *packet = NULL;
1151                *tv = NULL;
1152        }
1153        ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
1154        return ret;
1155}
1156
1157
1158DLLEXPORT uint64_t tv_to_usec(struct timeval *tv)
1159{
1160        return (uint64_t) tv->tv_sec*1000000ull + (uint64_t) tv->tv_usec;
1161}
1162
1163inline static struct timeval usec_to_tv(uint64_t usec)
1164{
1165        struct timeval tv;
1166        tv.tv_sec = usec / 1000000;
1167        tv.tv_usec = usec % 1000000;
1168        return tv;
1169}
1170
1171/** Similar to delay_tracetime but send messages to all threads periodically */
1172static void* reporter_entry(void *data) {
1173        libtrace_message_t message = {0};
1174        libtrace_t *trace = (libtrace_t *)data;
1175        libtrace_thread_t *t = &trace->reporter_thread;
1176        size_t res_size;
1177        libtrace_vector_t results;
1178        libtrace_vector_init(&results, sizeof(libtrace_result_t));
1179        fprintf(stderr, "Reporter thread starting\n");
1180        libtrace_result_t result;
1181        size_t i;
1182
1183        message.code = MESSAGE_STARTING;
1184        message.sender = t;
1185        (*trace->reporter)(trace, NULL, &message);
1186        message.code = MESSAGE_RESUMING;
1187        (*trace->reporter)(trace, NULL, &message);
1188
1189        while (!trace_finished(trace)) {
1190                if (trace->config.reporter_polling) {
1191                        if (libtrace_message_queue_try_get(&t->messages, &message) == LIBTRACE_MQ_FAILED)
1192                                message.code = MESSAGE_POST_REPORTER;
1193                } else {
1194                        libtrace_message_queue_get(&t->messages, &message);
1195                }
1196                switch (message.code) {
1197                        // Check for results
1198                        case MESSAGE_POST_REPORTER:
1199                                res_size = trace_get_results(trace, &results);
1200                                for (i = 0; i < res_size; i++) {
1201                                        ASSERT_RET(libtrace_vector_get(&results, i, (void *) &result), == 1);
1202                                        (*trace->reporter)(trace, &result, NULL);
1203                                }
1204                                break;
1205                        case MESSAGE_DO_PAUSE:
1206                                message.code = MESSAGE_PAUSING;
1207                                message.sender = t;
1208                                (*trace->reporter)(trace, NULL, &message);
1209                                trace_thread_pause(trace, t);
1210                                message.code = MESSAGE_RESUMING;
1211                                (*trace->reporter)(trace, NULL, &message);
1212                                break;
1213                        default:
1214                                (*trace->reporter)(trace, NULL, &message);
1215                }
1216        }
1217
1218        // Flush out whats left now all our threads have finished
1219        res_size = trace_get_results(trace, &results);
1220        for (i = 0; i < res_size; i++) {
1221                ASSERT_RET(libtrace_vector_get(&results, i, (void *) &result), == 1);
1222                (*trace->reporter)(trace, &result, NULL);
1223        }
1224        libtrace_vector_destroy(&results);
1225
1226        // GOODBYE
1227        message.code = MESSAGE_PAUSING;
1228        message.sender = t;
1229        (*trace->reporter)(trace, NULL, &message);
1230        message.code = MESSAGE_STOPPING;
1231        (*trace->reporter)(trace, NULL, &message);
1232
1233        thread_change_state(trace, &trace->reporter_thread, THREAD_FINISHED, true);
1234        print_memory_stats();
1235        return NULL;
1236}
1237
1238/** Similar to delay_tracetime but send messages to all threads periodically */
1239static void* keepalive_entry(void *data) {
1240        struct timeval prev, next;
1241        libtrace_message_t message = {0};
1242        libtrace_t *trace = (libtrace_t *)data;
1243        uint64_t next_release;
1244        fprintf(stderr, "keepalive thread is starting\n");
1245
1246        gettimeofday(&prev, NULL);
1247        message.code = MESSAGE_TICK;
1248        while (trace->state != STATE_FINSHED) {
1249                fd_set rfds;
1250                next_release = tv_to_usec(&prev) + (trace->config.tick_interval * 1000);
1251                gettimeofday(&next, NULL);
1252                if (next_release > tv_to_usec(&next)) {
1253                        next = usec_to_tv(next_release - tv_to_usec(&next));
1254                        // Wait for timeout or a message
1255                        FD_ZERO(&rfds);
1256                FD_SET(libtrace_message_queue_get_fd(&trace->keepalive_thread.messages), &rfds);
1257                        if (select(libtrace_message_queue_get_fd(&trace->keepalive_thread.messages)+1, &rfds, NULL, NULL, &next) == 1) {
1258                                libtrace_message_t msg;
1259                                libtrace_message_queue_get(&trace->keepalive_thread.messages, &msg);
1260                                assert(msg.code == MESSAGE_DO_STOP);
1261                                goto done;
1262                        }
1263                }
1264                prev = usec_to_tv(next_release);
1265                if (trace->state == STATE_RUNNING) {
1266                        message.additional.uint64 = tv_to_usec(&prev);
1267                        trace_send_message_to_perpkts(trace, &message);
1268                }
1269        }
1270done:
1271
1272        thread_change_state(trace, &trace->keepalive_thread, THREAD_FINISHED, true);
1273        return NULL;
1274}
1275
1276/**
1277 * Delays a packets playback so the playback will be in trace time
1278 */
1279static inline void delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t) {
1280        struct timeval curr_tv, pkt_tv;
1281        uint64_t next_release = t->tracetime_offset_usec; // Time at which to release the packet
1282        uint64_t curr_usec;
1283        /* Tracetime we might delay releasing this packet */
1284        if (!t->tracetime_offset_usec) {
1285                libtrace_packet_t * first_pkt;
1286                struct timeval *sys_tv;
1287                int64_t initial_offset;
1288                int stable = retrive_first_packet(libtrace, &first_pkt, &sys_tv);
1289                assert(first_pkt);
1290                pkt_tv = trace_get_timeval(first_pkt);
1291                initial_offset = (int64_t)tv_to_usec(sys_tv) - (int64_t)tv_to_usec(&pkt_tv);
1292                if (stable)
1293                        // 0->1 because 0 is used to mean unset
1294                        t->tracetime_offset_usec = initial_offset ? initial_offset: 1;
1295                next_release = initial_offset;
1296        }
1297        /* next_release == offset */
1298        pkt_tv = trace_get_timeval(packet);
1299        next_release += tv_to_usec(&pkt_tv);
1300        gettimeofday(&curr_tv, NULL);
1301        curr_usec = tv_to_usec(&curr_tv);
1302        if (next_release > curr_usec) {
1303                // We need to wait
1304                struct timeval delay_tv = usec_to_tv(next_release-curr_usec);
1305                //printf("WAITING for %d.%d next=%"PRIu64" curr=%"PRIu64" seconds packettime %f\n", delay_tv.tv_sec, delay_tv.tv_usec, next_release, curr_usec, trace_get_seconds(packet));
1306                select(0, NULL, NULL, NULL, &delay_tv);
1307        }
1308}
1309
1310/* Read one packet from the trace into a buffer. Note that this function will
1311 * block until a packet is read (or EOF is reached).
1312 *
1313 * @param libtrace      the libtrace opaque pointer
1314 * @param packet        the packet opaque pointer
1315 * @returns 0 on EOF, negative value on error
1316 *
1317 * Note this is identical to read_packet but calls pread_packet instead of
1318 * read packet in the format.
1319 *
1320 */
1321static inline int trace_pread_packet_wrapper(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packet) {
1322
1323        assert(libtrace && "You called trace_read_packet() with a NULL libtrace parameter!\n");
1324        if (trace_is_err(libtrace))
1325                return -1;
1326        if (!libtrace->started) {
1327                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,"You must call libtrace_start() before trace_read_packet()\n");
1328                return -1;
1329        }
1330        if (!(packet->buf_control==TRACE_CTRL_PACKET || packet->buf_control==TRACE_CTRL_EXTERNAL)) {
1331                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,"Packet passed to trace_read_packet() is invalid\n");
1332                return -1;
1333        }
1334        assert(packet);
1335
1336        if (libtrace->format->read_packet) {
1337                do {
1338                        size_t ret;
1339                        /* Finalise the packet, freeing any resources the format module
1340                         * may have allocated it and zeroing all data associated with it.
1341                         */
1342                        trace_fin_packet(packet);
1343                        /* Store the trace we are reading from into the packet opaque
1344                         * structure */
1345                        packet->trace = libtrace;
1346                        ret=libtrace->format->pread_packet(libtrace, t, packet);
1347                        if (ret <= 0) {
1348                                return ret;
1349                        }
1350                        if (libtrace->filter) {
1351                                /* If the filter doesn't match, read another
1352                                 * packet
1353                                 */
1354                                if (!trace_apply_filter(libtrace->filter,packet)){
1355                                        ++libtrace->filtered_packets;
1356                                        continue;
1357                                }
1358                        }
1359                        if (libtrace->snaplen>0) {
1360                                /* Snap the packet */
1361                                trace_set_capture_length(packet,
1362                                                libtrace->snaplen);
1363                        }
1364                       
1365                        ++t->accepted_packets;
1366                        // TODO look into this better
1367                        trace_packet_set_order(packet, trace_get_erf_timestamp(packet));
1368                        //trace_packet_set_order(packet, libtrace->accepted_packets);
1369                        //++libtrace->accepted_packets;
1370                        return ret;
1371                } while(1);
1372        }
1373        trace_set_err(libtrace,TRACE_ERR_UNSUPPORTED,"This format does not support reading packets\n");
1374        return ~0U;
1375}
1376
1377/**
1378 * Read packets from the parallel trace
1379 * @return the number of packets read, null packets indicate messages. Check packet->error before
1380 * assuming a packet is valid.
1381 */
1382static size_t trace_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packets[], size_t nb_packets)
1383{
1384        size_t ret;
1385        size_t i;
1386        assert(nb_packets);
1387
1388        for (i = 0; i < nb_packets; i++) {
1389                // Cleanup the packet passed back
1390                if (packets[i])
1391                        trace_fin_packet(packets[i]);
1392        }
1393
1394        if (trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
1395                if (!packets[0])
1396                        libtrace_ocache_alloc(&libtrace->packet_freelist, (void **)packets, 1, 1);
1397                packets[0]->error = trace_pread_packet_wrapper(libtrace, t, *packets);
1398                ret = 1;
1399        } else if (trace_has_dedicated_hasher(libtrace)) {
1400                ret = trace_pread_packet_hasher_thread(libtrace, t, packets, nb_packets);
1401        } else if (!trace_has_dedicated_hasher(libtrace)) {
1402                /* We don't care about which core a packet goes to */
1403                ret = trace_pread_packet_first_in_first_served(libtrace, t, packets, nb_packets);
1404        } /* else {
1405                ret = trace_pread_packet_hash_locked(libtrace, packet);
1406        }*/
1407
1408        // Formats can also optionally do this internally to ensure the first
1409        // packet is always reported correctly
1410        assert(ret);
1411        assert(ret <= nb_packets);
1412        if (packets[0]->error > 0) {
1413                store_first_packet(libtrace, packets[0], t);
1414                if (libtrace->tracetime)
1415                        delay_tracetime(libtrace, packets[0], t);
1416        }
1417
1418        return ret;
1419}
1420
1421/* Starts perpkt threads
1422 * @return threads_started
1423 */
1424static inline int trace_start_perpkt_threads (libtrace_t *libtrace) {
1425        int i;
1426        char name[16];
1427        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1428                libtrace_thread_t *t = &libtrace->perpkt_threads[i];
1429                ASSERT_RET(pthread_create(&t->tid, NULL, perpkt_threads_entry, (void *) libtrace), == 0);
1430                snprintf(name, 16, "perpkt-%d", i);
1431                pthread_setname_np(t->tid, name);
1432        }
1433        return libtrace->perpkt_thread_count;
1434}
1435
1436/* Start an input trace in a parallel fashion, or restart a paused trace.
1437 *
1438 * NOTE: libtrace lock is held for the majority of this function
1439 *
1440 * @param libtrace the input trace to start
1441 * @param global_blob some global data you can share with the new perpkt threads
1442 * @returns 0 on success
1443 */
1444DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob, fn_per_pkt per_pkt, fn_reporter reporter)
1445{
1446        int i;
1447        char name[16];
1448        sigset_t sig_before, sig_block_all;
1449        assert(libtrace);
1450        if (trace_is_err(libtrace)) {
1451                return -1;
1452        }
1453       
1454        // NOTE: Until the trace is started we wont have a libtrace_lock initialised
1455        if (libtrace->state != STATE_NEW) {
1456                int err = 0;
1457                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1458                if (libtrace->state != STATE_PAUSED) {
1459                        trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
1460                                "The trace(%s) has already been started and is not paused!!", libtrace->uridata);
1461                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1462                        return -1;
1463                }
1464               
1465                // Update the per_pkt function, or reuse the old one
1466                if (per_pkt)
1467                        libtrace->per_pkt = per_pkt;
1468
1469                if (reporter)
1470                        libtrace->reporter = reporter;
1471
1472                assert(libtrace_parallel);
1473                assert(!libtrace->perpkt_thread_states[THREAD_RUNNING]);
1474                assert(libtrace->per_pkt);
1475               
1476                if (libtrace->perpkt_thread_count > 1 && trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
1477                        fprintf(stderr, "Restarting trace pstart_input()\n");
1478                        err = libtrace->format->pstart_input(libtrace);
1479                } else {
1480                        if (libtrace->format->start_input) {
1481                                fprintf(stderr, "Restarting trace start_input()\n");
1482                                err = libtrace->format->start_input(libtrace);
1483                        }
1484                }
1485               
1486                if (err == 0) {
1487                        libtrace->started = true;
1488                        libtrace_change_state(libtrace, STATE_RUNNING, false);
1489                }
1490                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1491                return err;
1492        }
1493
1494        assert(libtrace->state == STATE_NEW);
1495        libtrace_parallel = 1;
1496
1497        // Store the user defined things against the trace
1498        libtrace->global_blob = global_blob;
1499        libtrace->per_pkt = per_pkt;
1500        libtrace->reporter = reporter;
1501
1502        ASSERT_RET(pthread_mutex_init(&libtrace->libtrace_lock, NULL), == 0);
1503        ASSERT_RET(pthread_cond_init(&libtrace->perpkt_cond, NULL), == 0);
1504        ASSERT_RET(pthread_rwlock_init(&libtrace->window_lock, NULL), == 0);
1505        // Grab the lock
1506        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1507
1508        // Set default buffer sizes
1509        if (libtrace->config.hasher_queue_size <= 0)
1510                libtrace->config.hasher_queue_size = 1000;
1511
1512        if (libtrace->config.perpkt_threads <= 0) {
1513                // TODO add BSD support
1514                libtrace->perpkt_thread_count = sysconf(_SC_NPROCESSORS_ONLN);
1515                if (libtrace->perpkt_thread_count <= 0)
1516                        // Lets just use one
1517                        libtrace->perpkt_thread_count = 1;
1518        } else {
1519                libtrace->perpkt_thread_count = libtrace->config.perpkt_threads;
1520        }
1521
1522        if (libtrace->config.reporter_thold <= 0)
1523                libtrace->config.reporter_thold = 100;
1524        if (libtrace->config.burst_size <= 0)
1525                libtrace->config.burst_size = 10;
1526        if (libtrace->config.packet_thread_cache_size <= 0)
1527                libtrace->config.packet_thread_cache_size = 20;
1528        if (libtrace->config.packet_cache_size <= 0)
1529                libtrace->config.packet_cache_size = (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count;
1530
1531        if (libtrace->config.packet_cache_size <
1532                (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count)
1533                fprintf(stderr, "WARNING deadlocks may occur and extra memory allocating buffer sizes (packet_freelist_size) mismatched\n");
1534
1535        libtrace->started = true; // Before we start the threads otherwise we could have issues
1536        libtrace_change_state(libtrace, STATE_RUNNING, false);
1537        /* Disable signals - Pthread signal handling */
1538
1539        sigemptyset(&sig_block_all);
1540
1541        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_block_all, &sig_before), == 0);
1542
1543        // If we are using a hasher start it
1544        // If single threaded we don't need a hasher
1545        if (libtrace->perpkt_thread_count > 1 && libtrace->hasher && libtrace->hasher_type != HASHER_HARDWARE) {
1546                libtrace_thread_t *t = &libtrace->hasher_thread;
1547                t->trace = libtrace;
1548                t->ret = NULL;
1549                t->type = THREAD_HASHER;
1550                t->state = THREAD_RUNNING;
1551                libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
1552                ASSERT_RET(pthread_create(&t->tid, NULL, hasher_entry, (void *) libtrace), == 0);
1553                snprintf(name, sizeof(name), "hasher-thread");
1554                pthread_setname_np(t->tid, name);
1555        } else {
1556                libtrace->hasher_thread.type = THREAD_EMPTY;
1557        }
1558
1559        libtrace_ocache_init(&libtrace->packet_freelist,
1560                                                 (void* (*)()) trace_create_packet,
1561                                                 (void (*)(void *))trace_destroy_packet,
1562                                                 libtrace->config.packet_thread_cache_size,
1563                                                 libtrace->config.packet_cache_size * 4,
1564                                                 libtrace->config.fixed_packet_count);
1565        // Unused slidingwindow code
1566        //libtrace_slidingwindow_init(&libtrace->sliding_window, libtrace->packet_freelist_size, 0);
1567        //ASSERT_RET(sem_init(&libtrace->sem, 0, libtrace->packet_freelist_size), == 0);
1568
1569        // This will be applied to every new thread that starts, i.e. they will block all signals
1570        // Lets start a fixed number of reading threads
1571
1572        /* Ready some storages */
1573        libtrace->first_packets.first = 0;
1574        libtrace->first_packets.count = 0;
1575        ASSERT_RET(pthread_spin_init(&libtrace->first_packets.lock, 0), == 0);
1576        libtrace->first_packets.packets = calloc(libtrace->perpkt_thread_count, sizeof(struct  __packet_storage_magic_type));
1577
1578
1579        /* Ready all of our perpkt threads - they are started later */
1580        libtrace->perpkt_threads = calloc(sizeof(libtrace_thread_t), libtrace->perpkt_thread_count);
1581        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1582                libtrace_thread_t *t = &libtrace->perpkt_threads[i];
1583                t->trace = libtrace;
1584                t->ret = NULL;
1585                t->type = THREAD_PERPKT;
1586                t->state = THREAD_RUNNING;
1587                t->user_data = NULL;
1588                // t->tid DONE on create
1589                t->perpkt_num = i;
1590                if (libtrace->hasher)
1591                        libtrace_ringbuffer_init(&t->rbuffer, libtrace->config.hasher_queue_size,
1592                                                 libtrace->config.hasher_polling?LIBTRACE_RINGBUFFER_POLLING:0);
1593                // Depending on the mode vector or deque might be chosen
1594                libtrace_vector_init(&t->vector, sizeof(libtrace_result_t));
1595                libtrace_deque_init(&t->deque, sizeof(libtrace_result_t));
1596                libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
1597                t->recorded_first = false;
1598                t->tracetime_offset_usec = 0;;
1599        }
1600
1601        int threads_started = 0;
1602        /* Setup the trace and start our threads */
1603        if (libtrace->perpkt_thread_count > 1 && trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
1604                printf("This format has direct support for p's\n");
1605                threads_started = libtrace->format->pstart_input(libtrace);
1606        } else {
1607                if (libtrace->format->start_input) {
1608                        threads_started=libtrace->format->start_input(libtrace);
1609                }
1610        }
1611        if (threads_started == 0)
1612                threads_started = trace_start_perpkt_threads(libtrace);
1613
1614        libtrace->reporter_thread.type = THREAD_REPORTER;
1615        libtrace->reporter_thread.state = THREAD_RUNNING;
1616        libtrace_message_queue_init(&libtrace->reporter_thread.messages, sizeof(libtrace_message_t));
1617        if (reporter) {
1618                // Got a real reporter
1619                ASSERT_RET(pthread_create(&libtrace->reporter_thread.tid, NULL, reporter_entry, (void *) libtrace), == 0);
1620        } else {
1621                // Main thread is reporter
1622                libtrace->reporter_thread.tid = pthread_self();
1623        }
1624
1625        if (libtrace->config.tick_interval > 0) {
1626                libtrace->keepalive_thread.type = THREAD_KEEPALIVE;
1627                libtrace->keepalive_thread.state = THREAD_RUNNING;
1628                libtrace_message_queue_init(&libtrace->keepalive_thread.messages, sizeof(libtrace_message_t));
1629                ASSERT_RET(pthread_create(&libtrace->keepalive_thread.tid, NULL, keepalive_entry, (void *) libtrace), == 0);
1630        }
1631
1632        for (i = 0; i < THREAD_STATE_MAX; ++i) {
1633                libtrace->perpkt_thread_states[i] = 0;
1634        }
1635        libtrace->perpkt_thread_states[THREAD_RUNNING] = threads_started;
1636
1637        // Revert back - Allow signals again
1638        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_before, NULL), == 0);
1639        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1640
1641        if (threads_started < 0)
1642                // Error
1643                return threads_started;
1644
1645        // TODO fix these leaks etc
1646        if (libtrace->perpkt_thread_count != threads_started)
1647                fprintf(stderr, "Warning started threads not equal requested s=%d r=%d", threads_started, libtrace->perpkt_thread_count);
1648
1649
1650        return 0;
1651}
1652
1653/**
1654 * Pauses a trace, this should only be called by the main thread
1655 * 1. Set started = false
1656 * 2. All perpkt threads are paused waiting on a condition var
1657 * 3. Then call ppause on the underlying format if found
1658 * 4. The traces state is paused
1659 *
1660 * Once done you should be able to modify the trace setup and call pstart again
1661 * TODO handle changing thread numbers
1662 */
1663DLLEXPORT int trace_ppause(libtrace_t *libtrace)
1664{
1665        libtrace_thread_t *t;
1666        int i;
1667        assert(libtrace);
1668       
1669        t = get_thread_table(libtrace);
1670        // Check state from within the lock if we are going to change it
1671        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1672        if (!libtrace->started || libtrace->state != STATE_RUNNING) {
1673                fprintf(stderr, "pause failed started=%d state=%s (%d)\n", libtrace->started, get_trace_state_name(libtrace->state), libtrace->state);
1674                trace_set_err(libtrace,TRACE_ERR_BAD_STATE, "You must call trace_start() before calling trace_ppause()");
1675                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1676                return -1;
1677        }
1678
1679        libtrace_change_state(libtrace, STATE_PAUSING, false);
1680        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1681
1682        // Special case handle the hasher thread case
1683        if (trace_has_dedicated_hasher(libtrace)) {
1684                if (libtrace->config.debug_state)
1685                        fprintf(stderr, "Hasher thread is running, asking it to pause ...");
1686                libtrace_message_t message = {0};
1687                message.code = MESSAGE_DO_PAUSE;
1688                trace_send_message_to_thread(libtrace, &libtrace->hasher_thread, &message);
1689                // Wait for it to pause
1690                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1691                while (libtrace->hasher_thread.state == THREAD_RUNNING) {
1692                        ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
1693                }
1694                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1695                if (libtrace->config.debug_state)
1696                        fprintf(stderr, " DONE\n");
1697        }
1698
1699        if (libtrace->config.debug_state)
1700                fprintf(stderr, "Asking perpkt threads to pause ...");
1701        // Stop threads, skip this one if it's a perpkt
1702        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1703                if (&libtrace->perpkt_threads[i] != t) {
1704                        libtrace_message_t message = {0};
1705                        message.code = MESSAGE_DO_PAUSE;
1706                        trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message);
1707                        if(trace_has_dedicated_hasher(libtrace)) {
1708                                // The hasher has stopped and other threads have messages waiting therefore
1709                                // If the queues are empty the other threads would have no data
1710                                // So send some message packets to simply ask the threads to check
1711                                // We are the only writer since hasher has paused
1712                                libtrace_packet_t *pkt;
1713                                libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &pkt, 1, 1);
1714                                pkt->error = READ_MESSAGE;
1715                                libtrace_ringbuffer_write(&libtrace->perpkt_threads[i].rbuffer, pkt);
1716                        }
1717                } else {
1718                        fprintf(stderr, "Mapper threads should not be used to pause a trace this could cause any number of problems!!\n");
1719                }
1720        }
1721
1722        if (t) {
1723                // A perpkt is doing the pausing, interesting, fake an extra thread paused
1724                // We rely on the user to *not* return before starting the trace again
1725                thread_change_state(libtrace, t, THREAD_PAUSED, true);
1726        }
1727
1728        // Wait for all threads to pause
1729        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1730        while(libtrace->perpkt_thread_states[THREAD_RUNNING]) {
1731                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
1732        }
1733        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1734
1735        if (libtrace->config.debug_state)
1736                fprintf(stderr, " DONE\n");
1737
1738        // Deal with the reporter
1739        if (trace_has_dedicated_reporter(libtrace)) {
1740                if (libtrace->config.debug_state)
1741                        fprintf(stderr, "Reporter thread is running, asking it to pause ...");
1742                libtrace_message_t message = {0};
1743                message.code = MESSAGE_DO_PAUSE;
1744                trace_send_message_to_thread(libtrace, &libtrace->reporter_thread, &message);
1745                // Wait for it to pause
1746                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1747                while (libtrace->reporter_thread.state == THREAD_RUNNING) {
1748                        ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
1749                }
1750                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1751                if (libtrace->config.debug_state)
1752                        fprintf(stderr, " DONE\n");
1753        }
1754
1755        if (trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
1756                uint64_t tmp_stats;
1757                libtrace->dropped_packets = trace_get_dropped_packets(libtrace);
1758                libtrace->received_packets = trace_get_received_packets(libtrace);
1759                if (libtrace->format->get_filtered_packets) {
1760                        if ((tmp_stats = libtrace->format->get_filtered_packets(libtrace)) != UINT64_MAX) {
1761                                libtrace->filtered_packets += tmp_stats;
1762                        }
1763                }
1764                libtrace->started = false;
1765                if (libtrace->format->ppause_input)
1766                        libtrace->format->ppause_input(libtrace);
1767                // TODO What happens if we don't have pause input??
1768        } else {
1769                int err;
1770                fprintf(stderr, "Trace is not parallel so we are doing a normal pause %s\n", libtrace->uridata);
1771                err = trace_pause(libtrace);
1772                // We should handle this a bit better
1773                if (err)
1774                        return err;
1775        }
1776
1777        // Only set as paused after the pause has been called on the trace
1778        libtrace_change_state(libtrace, STATE_PAUSED, true);
1779        return 0;
1780}
1781
1782/**
1783 * Stop trace finish prematurely as though it meet an EOF
1784 * This should only be called by the main thread
1785 * 1. Calls ppause
1786 * 2. Sends a message asking for threads to finish
1787 * 3. Releases threads which will pause
1788 */
1789DLLEXPORT int trace_pstop(libtrace_t *libtrace)
1790{
1791        int i, err;
1792        libtrace_message_t message = {0};
1793        assert(libtrace);
1794
1795        // Ensure all threads have paused and the underlying trace format has
1796        // been closed and all packets associated are cleaned up
1797        // Pause will do any state checks for us
1798        err = trace_ppause(libtrace);
1799        if (err)
1800                return err;
1801
1802        // Now send a message asking the threads to stop
1803        // This will be retrieved before trying to read another packet
1804       
1805        message.code = MESSAGE_DO_STOP;
1806        trace_send_message_to_perpkts(libtrace, &message);
1807        if (trace_has_dedicated_hasher(libtrace))
1808                trace_send_message_to_thread(libtrace, &libtrace->hasher_thread, &message);
1809       
1810        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1811                trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message);
1812        }
1813
1814        // Now release the threads and let them stop
1815        libtrace_change_state(libtrace, STATE_FINSHED, true);
1816        return 0;
1817}
1818
1819/**
1820 * Set the hasher type along with a selected function, if hardware supports
1821 * that generic type of hashing it will be used otherwise the supplied
1822 * hasher function will be used and passed data when called.
1823 *
1824 * @return 0 if successful otherwise -1 on error
1825 */
1826DLLEXPORT int trace_set_hasher(libtrace_t *trace, enum hasher_types type, fn_hasher hasher, void *data) {
1827        int ret = -1;
1828        if (type == HASHER_HARDWARE || (type == HASHER_CUSTOM && !hasher) || (type == HASHER_BALANCE && hasher)) {
1829                return -1;
1830        }
1831
1832        // Save the requirements
1833        trace->hasher_type = type;
1834        if (hasher) {
1835                trace->hasher = hasher;
1836                trace->hasher_data = data;
1837        } else {
1838                trace->hasher = NULL;
1839                // TODO consider how to handle freeing this
1840                trace->hasher_data = NULL;
1841        }
1842
1843        // Try push this to hardware - NOTE hardware could do custom if
1844        // there is a more efficient way to apply it, in this case
1845        // it will simply grab the function out of libtrace_t
1846        if (trace->format->pconfig_input)
1847                ret = trace->format->pconfig_input(trace, TRACE_OPTION_SET_HASHER, &type);
1848
1849        if (ret == -1) {
1850                // We have to deal with this ourself
1851                // This most likely means single threaded reading of the trace
1852                if (!hasher) {
1853                        switch (type)
1854                        {
1855                                case HASHER_CUSTOM:
1856                                case HASHER_BALANCE:
1857                                        return 0;
1858                                case HASHER_BIDIRECTIONAL:
1859                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
1860                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
1861                                        toeplitz_init_config(trace->hasher_data, 1);
1862                                        return 0;
1863                                case HASHER_UNIDIRECTIONAL:
1864                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
1865                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
1866                                        toeplitz_init_config(trace->hasher_data, 0);
1867                                        return 0;
1868                                case HASHER_HARDWARE:
1869                                        return -1;
1870                        }
1871                        return -1;
1872                }
1873        } else {
1874                // The hardware is dealing with this yay
1875                trace->hasher_type = HASHER_HARDWARE;
1876        }
1877
1878        return 0;
1879}
1880
1881// Waits for all threads to finish
1882DLLEXPORT void trace_join(libtrace_t *libtrace) {
1883        int i;
1884
1885        /* Firstly wait for the perpkt threads to finish, since these are
1886         * user controlled */
1887        for (i=0; i< libtrace->perpkt_thread_count; i++) {
1888                //printf("Waiting to join with perpkt #%d\n", i);
1889                ASSERT_RET(pthread_join(libtrace->perpkt_threads[i].tid, NULL), == 0);
1890                //printf("Joined with perpkt #%d\n", i);
1891                // So we must do our best effort to empty the queue - so
1892                // the producer (or any other threads) don't block.
1893                libtrace_packet_t * packet;
1894                assert(libtrace->perpkt_threads[i].state == THREAD_FINISHED);
1895                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
1896                        if (packet) // This could be NULL iff the perpkt finishes early
1897                                trace_destroy_packet(packet);
1898        }
1899
1900        /* Now the hasher */
1901        if (trace_has_dedicated_hasher(libtrace)) {
1902                pthread_join(libtrace->hasher_thread.tid, NULL);
1903                assert(libtrace->hasher_thread.state == THREAD_FINISHED);
1904        }
1905
1906        // Now that everything is finished nothing can be touching our
1907        // buffers so clean them up
1908        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1909                // Its possible 1 packet got added by the reporter (or 1 per any other thread) since we cleaned up
1910                // if they lost timeslice before-during a write
1911                libtrace_packet_t * packet;
1912                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
1913                        trace_destroy_packet(packet);
1914                if (libtrace->hasher) {
1915                        assert(libtrace_ringbuffer_is_empty(&libtrace->perpkt_threads[i].rbuffer));
1916                        libtrace_ringbuffer_destroy(&libtrace->perpkt_threads[i].rbuffer);
1917                }
1918                // Cannot destroy vector yet, this happens with trace_destroy
1919        }
1920        // TODO consider perpkt threads marking trace as finished before join is called
1921        libtrace_change_state(libtrace, STATE_FINSHED, true);
1922
1923        if (trace_has_dedicated_reporter(libtrace)) {
1924                pthread_join(libtrace->reporter_thread.tid, NULL);
1925                assert(libtrace->reporter_thread.state == THREAD_FINISHED);
1926        }
1927       
1928        // Wait for the tick (keepalive) thread if it has been started
1929        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
1930                libtrace_message_t msg = {0};
1931                msg.code = MESSAGE_DO_STOP;
1932                trace_send_message_to_thread(libtrace, &libtrace->keepalive_thread, &msg);
1933                pthread_join(libtrace->keepalive_thread.tid, NULL);
1934        }
1935       
1936        libtrace_change_state(libtrace, STATE_JOINED, true);
1937        print_memory_stats();
1938}
1939
1940DLLEXPORT int libtrace_thread_get_message_count(libtrace_t * libtrace)
1941{
1942        libtrace_thread_t * t = get_thread_descriptor(libtrace);
1943        assert(t);
1944        return libtrace_message_queue_count(&t->messages);
1945}
1946
1947DLLEXPORT int libtrace_thread_get_message(libtrace_t * libtrace, libtrace_message_t * message)
1948{
1949        libtrace_thread_t * t = get_thread_descriptor(libtrace);
1950        assert(t);
1951        return libtrace_message_queue_get(&t->messages, message);
1952}
1953
1954DLLEXPORT int libtrace_thread_try_get_message(libtrace_t * libtrace, libtrace_message_t * message)
1955{
1956        libtrace_thread_t * t = get_thread_descriptor(libtrace);
1957        assert(t);
1958        return libtrace_message_queue_try_get(&t->messages, message);
1959}
1960
1961/**
1962 * Return backlog indicator
1963 */
1964DLLEXPORT int trace_post_reporter(libtrace_t *libtrace)
1965{
1966        libtrace_message_t message = {0};
1967        message.code = MESSAGE_POST_REPORTER;
1968        message.sender = get_thread_descriptor(libtrace);
1969        return libtrace_message_queue_put(&libtrace->reporter_thread.messages, (void *) &message);
1970}
1971
1972/**
1973 * Return backlog indicator
1974 */
1975DLLEXPORT int trace_send_message_to_reporter(libtrace_t * libtrace, libtrace_message_t * message)
1976{
1977        //printf("Sending message code=%d to reporter\n", message->code);
1978        message->sender = get_thread_descriptor(libtrace);
1979        return libtrace_message_queue_put(&libtrace->reporter_thread.messages, message);
1980}
1981
1982/**
1983 *
1984 */
1985DLLEXPORT int trace_send_message_to_thread(libtrace_t * libtrace, libtrace_thread_t *t, libtrace_message_t * message)
1986{
1987        //printf("Sending message code=%d to reporter\n", message->code);
1988        message->sender = get_thread_descriptor(libtrace);
1989        return libtrace_message_queue_put(&t->messages, message);
1990}
1991
1992DLLEXPORT int trace_send_message_to_perpkts(libtrace_t * libtrace, libtrace_message_t * message)
1993{
1994        int i;
1995        message->sender = get_thread_descriptor(libtrace);
1996        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1997                libtrace_message_queue_put(&libtrace->perpkt_threads[i].messages, message);
1998        }
1999        //printf("Sending message code=%d to reporter\n", message->code);
2000        return 0;
2001}
2002
2003DLLEXPORT void libtrace_result_set_key(libtrace_result_t * result, uint64_t key) {
2004        result->key = key;
2005}
2006DLLEXPORT uint64_t libtrace_result_get_key(libtrace_result_t * result) {
2007        return result->key;
2008}
2009DLLEXPORT void libtrace_result_set_value(libtrace_result_t * result, void * value) {
2010        result->value = value;
2011}
2012DLLEXPORT void* libtrace_result_get_value(libtrace_result_t * result) {
2013        return result->value;
2014}
2015DLLEXPORT void libtrace_result_set_key_value(libtrace_result_t * result, uint64_t key, void * value) {
2016        result->key = key;
2017        result->value = value;
2018}
2019DLLEXPORT void trace_destroy_result(libtrace_result_t ** result) {
2020        free(*result);
2021        result = NULL;
2022        // TODO automatically back with a free list!!
2023}
2024
2025DLLEXPORT void * trace_get_global(libtrace_t *trace)
2026{
2027        return trace->global_blob;
2028}
2029
2030DLLEXPORT void * trace_set_global(libtrace_t *trace, void * data)
2031{
2032        if (trace->global_blob && trace->global_blob != data) {
2033                void * ret = trace->global_blob;
2034                trace->global_blob = data;
2035                return ret;
2036        } else {
2037                trace->global_blob = data;
2038                return NULL;
2039        }
2040}
2041
2042DLLEXPORT void * trace_get_tls(libtrace_thread_t *t)
2043{
2044        return t->user_data;
2045}
2046
2047DLLEXPORT void * trace_set_tls(libtrace_thread_t *t, void * data)
2048{
2049        if(t->user_data && t->user_data != data) {
2050                void *ret = t->user_data;
2051                t->user_data = data;
2052                return ret;
2053        } else {
2054                t->user_data = data;
2055                return NULL;
2056        }
2057}
2058
2059/**
2060 * Publish to the reduce queue, return
2061 * Should only be called by a perpkt thread, i.e. from a perpkt handler
2062 */
2063DLLEXPORT void trace_publish_result(libtrace_t *libtrace, libtrace_thread_t *t, uint64_t key, void * value, int type) {
2064        libtrace_result_t res;
2065        UNUSED static __thread int count = 0;
2066        res.type = type;
2067
2068        libtrace_result_set_key_value(&res, key, value);
2069
2070        /*
2071        if (count == 1)
2072                printf("My vector size is %d\n", libtrace_vector_get_size(&t->vector));
2073        count = (count+1) %1000;
2074        libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :)
2075        */
2076        /*if (count == 1)
2077                printf("My vector size is %d\n", libtrace_deque_get_size(&t->deque));
2078        count = (count+1)%1000;*/
2079
2080        if (libtrace->reporter_flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED)) {
2081                if (libtrace_deque_get_size(&t->deque) >= libtrace->config.reporter_thold) {
2082                        trace_post_reporter(libtrace);
2083                }
2084                //while (libtrace_deque_get_size(&t->deque) >= 1000)
2085                //      sched_yield();
2086                libtrace_deque_push_back(&t->deque, &res); // Automatically locking for us :)
2087        } else {
2088                //while (libtrace_vector_get_size(&t->vector) >= 1000)
2089                //      sched_yield();
2090
2091                if (libtrace_vector_get_size(&t->vector) >= libtrace->config.reporter_thold) {
2092                        trace_post_reporter(libtrace);
2093                }
2094                libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :)
2095        }
2096}
2097
2098static int compareres(const void* p1, const void* p2)
2099{
2100        if (libtrace_result_get_key((libtrace_result_t *) p1) < libtrace_result_get_key((libtrace_result_t *) p2))
2101                return -1;
2102        if (libtrace_result_get_key((libtrace_result_t *) p1) == libtrace_result_get_key((libtrace_result_t *) p2))
2103                return 0;
2104        else
2105                return 1;
2106}
2107
2108DLLEXPORT int trace_get_results(libtrace_t *libtrace, libtrace_vector_t * results) {
2109        int i;
2110        int flags = libtrace->reporter_flags; // Hint these aren't a changing
2111
2112        libtrace_vector_empty(results);
2113
2114        /* Here we assume queues are in order ascending order and they want
2115         * the smallest result first. If they are not in order the results
2116         * may not be in order.
2117         */
2118        if (flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED)) {
2119                int live_count = 0;
2120                bool live[libtrace->perpkt_thread_count]; // Set if a trace is alive
2121                uint64_t key[libtrace->perpkt_thread_count]; // Cached keys
2122                uint64_t min_key = UINT64_MAX; // XXX use max int here stdlimit.h?
2123                int min_queue = -1;
2124
2125                /* Loop through check all are alive (have data) and find the smallest */
2126                for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
2127                        libtrace_queue_t *v = &libtrace->perpkt_threads[i].deque;
2128                        if (libtrace_deque_get_size(v) != 0) {
2129                                libtrace_result_t r;
2130                                libtrace_deque_peek_front(v, (void *) &r);
2131                                live_count++;
2132                                live[i] = 1;
2133                                key[i] = libtrace_result_get_key(&r);
2134                                if (i==0 || min_key > key[i]) {
2135                                        min_key = key[i];
2136                                        min_queue = i;
2137                                }
2138                        } else {
2139                                live[i] = 0;
2140                        }
2141                }
2142
2143                /* Now remove the smallest and loop - special case if all threads have joined we always flush whats left */
2144                while ((live_count == libtrace->perpkt_thread_count) || (live_count &&
2145                                ((flags & REDUCE_SEQUENTIAL && min_key == libtrace->expected_key) ||
2146                                trace_finished(libtrace)))) {
2147                        /* Get the minimum queue and then do stuff */
2148                        libtrace_result_t r;
2149
2150                        assert (libtrace_deque_pop_front(&libtrace->perpkt_threads[min_queue].deque, (void *) &r) == 1);
2151                        libtrace_vector_push_back(results, &r);
2152
2153                        // We expect the key we read +1 now
2154                        libtrace->expected_key = key[min_queue] + 1;
2155
2156                        // Now update the one we just removed
2157                        if (libtrace_deque_get_size(&libtrace->perpkt_threads[min_queue].deque) )
2158                        {
2159                                libtrace_deque_peek_front(&libtrace->perpkt_threads[min_queue].deque, (void *) &r);
2160                                key[min_queue] = libtrace_result_get_key(&r);
2161                                if (key[min_queue] <= min_key) {
2162                                        // We are still the smallest, might be out of order though :(
2163                                        min_key = key[min_queue];
2164                                } else {
2165                                        min_key = key[min_queue]; // Update our minimum
2166                                        // Check all find the smallest again - all are alive
2167                                        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
2168                                                if (live[i] && min_key > key[i]) {
2169                                                        min_key = key[i];
2170                                                        min_queue = i;
2171                                                }
2172                                        }
2173                                }
2174                        } else {
2175                                live[min_queue] = 0;
2176                                live_count--;
2177                                min_key = UINT64_MAX; // Update our minimum
2178                                // Check all find the smallest again - all are alive
2179                                for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
2180                                        // Still not 100% TODO (what if order is wrong or not increasing)
2181                                        if (live[i] && min_key >= key[i]) {
2182                                                min_key = key[i];
2183                                                min_queue = i;
2184                                        }
2185                                }
2186                        }
2187                }
2188        } else { // Queues are not in order - return all results in the queue
2189                for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2190                        libtrace_vector_append(results, &libtrace->perpkt_threads[i].vector);
2191                }
2192                if (flags & REDUCE_SORT) {
2193                        qsort(results->elements, results->size, results->element_size, &compareres);
2194                }
2195        }
2196        return libtrace_vector_get_size(results);
2197}
2198
2199DLLEXPORT uint64_t trace_packet_get_order(libtrace_packet_t * packet) {
2200        return packet->order;
2201}
2202
2203DLLEXPORT uint64_t trace_packet_get_hash(libtrace_packet_t * packet) {
2204        return packet->hash;
2205}
2206
2207DLLEXPORT void trace_packet_set_order(libtrace_packet_t * packet, uint64_t order) {
2208        packet->order = order;
2209}
2210
2211DLLEXPORT void trace_packet_set_hash(libtrace_packet_t * packet, uint64_t hash) {
2212        packet->hash = hash;
2213}
2214
2215DLLEXPORT int trace_finished(libtrace_t * libtrace) {
2216        // TODO I don't like using this so much, we could use state!!!
2217        return libtrace->perpkt_thread_states[THREAD_FINISHED] == libtrace->perpkt_thread_count;
2218}
2219
2220DLLEXPORT int trace_parallel_config(libtrace_t *libtrace, trace_parallel_option_t option, void *value)
2221{
2222        UNUSED int ret = -1;
2223        switch (option) {
2224                case TRACE_OPTION_TICK_INTERVAL:
2225                        libtrace->config.tick_interval = *((int *) value);
2226                        return 1;
2227                case TRACE_OPTION_SET_HASHER:
2228                        return trace_set_hasher(libtrace, (enum hasher_types) *((int *) value), NULL, NULL);
2229                case TRACE_OPTION_SET_PERPKT_THREAD_COUNT:
2230                        libtrace->config.perpkt_threads = *((int *) value);
2231                        return 1;
2232                case TRACE_DROP_OUT_OF_ORDER:
2233                        if (*((int *) value))
2234                                libtrace->reporter_flags |= REDUCE_DROP_OOO;
2235                        else
2236                                libtrace->reporter_flags &= ~REDUCE_DROP_OOO;
2237                        return 1;
2238                case TRACE_OPTION_SEQUENTIAL:
2239                        if (*((int *) value))
2240                                libtrace->reporter_flags |= REDUCE_SEQUENTIAL;
2241                        else
2242                                libtrace->reporter_flags &= ~REDUCE_SEQUENTIAL;
2243                        return 1;
2244                case TRACE_OPTION_ORDERED:
2245                        if (*((int *) value))
2246                                libtrace->reporter_flags |= REDUCE_ORDERED;
2247                        else
2248                                libtrace->reporter_flags &= ~REDUCE_ORDERED;
2249                        return 1;
2250                case TRACE_OPTION_TRACETIME:
2251                        if(*((int *) value))
2252                                libtrace->tracetime = 1;
2253                        else
2254                                libtrace->tracetime = 0;
2255                        return 0;
2256                case TRACE_OPTION_SET_CONFIG:
2257                        libtrace->config = *((struct user_configuration *) value);
2258                case TRACE_OPTION_GET_CONFIG:
2259                        *((struct user_configuration *) value) = libtrace->config;
2260        }
2261        return 0;
2262}
2263
2264static bool config_bool_parse(char *value, size_t nvalue) {
2265        if (strncmp(value, "true", nvalue) == 0)
2266                return true;
2267        else if (strncmp(value, "false", nvalue) == 0)
2268                return false;
2269        else
2270                return strtoll(value, NULL, 10) != 0;
2271}
2272
2273static void config_string(struct user_configuration *uc, char *key, size_t nkey, char *value, size_t nvalue) {
2274        assert(key);
2275        assert(value);
2276        assert(uc);
2277        if (strncmp(key, "packet_cache_size", nkey) == 0
2278                || strncmp(key, "pcs", nkey) == 0) {
2279                uc->packet_cache_size = strtoll(value, NULL, 10);
2280        } else if (strncmp(key, "packet_thread_cache_size", nkey) == 0
2281                           || strncmp(key, "ptcs", nkey) == 0) {
2282                uc->packet_thread_cache_size = strtoll(value, NULL, 10);
2283        } else if (strncmp(key, "fixed_packet_count", nkey) == 0
2284                  || strncmp(key, "fpc", nkey) == 0) {
2285                uc->fixed_packet_count = config_bool_parse(value, nvalue);
2286        } else if (strncmp(key, "burst_size", nkey) == 0
2287                   || strncmp(key, "bs", nkey) == 0) {
2288                uc->burst_size = strtoll(value, NULL, 10);
2289        } else if (strncmp(key, "tick_interval", nkey) == 0
2290                   || strncmp(key, "ti", nkey) == 0) {
2291                uc->tick_interval = strtoll(value, NULL, 10);
2292        } else if (strncmp(key, "tick_count", nkey) == 0
2293                   || strncmp(key, "tc", nkey) == 0) {
2294                uc->tick_count = strtoll(value, NULL, 10);
2295        } else if (strncmp(key, "perpkt_threads", nkey) == 0
2296                   || strncmp(key, "pt", nkey) == 0) {
2297                uc->perpkt_threads = strtoll(value, NULL, 10);
2298        } else if (strncmp(key, "hasher_queue_size", nkey) == 0
2299                   || strncmp(key, "hqs", nkey) == 0) {
2300                uc->hasher_queue_size = strtoll(value, NULL, 10);
2301        } else if (strncmp(key, "hasher_polling", nkey) == 0
2302                   || strncmp(key, "hp", nkey) == 0) {
2303                uc->hasher_polling = config_bool_parse(value, nvalue);
2304        } else if (strncmp(key, "reporter_polling", nkey) == 0
2305                   || strncmp(key, "rp", nkey) == 0) {
2306                uc->reporter_polling = config_bool_parse(value, nvalue);
2307        } else if (strncmp(key, "reporter_thold", nkey) == 0
2308                   || strncmp(key, "rt", nkey) == 0) {
2309                uc->reporter_thold = strtoll(value, NULL, 10);
2310        } else if (strncmp(key, "debug_state", nkey) == 0
2311                   || strncmp(key, "ds", nkey) == 0) {
2312                uc->debug_state = config_bool_parse(value, nvalue);
2313        } else {
2314                fprintf(stderr, "No matching value %s(=%s)\n", key, value);
2315        }
2316}
2317
2318DLLEXPORT void parse_user_config(struct user_configuration* uc, char * str) {
2319        char *pch;
2320        char key[100];
2321        char value[100];
2322        assert(str);
2323        assert(uc);
2324        printf ("Splitting string \"%s\" into tokens:\n",str);
2325        pch = strtok (str," ,.-");
2326        while (pch != NULL)
2327        {
2328                if (sscanf(pch, "%99[^=]=%99s", key, value) == 2) {
2329                        config_string(uc, key, sizeof(key), value, sizeof(value));
2330                } else {
2331                        fprintf(stderr, "Error parsing %s\n", pch);
2332                }
2333                pch = strtok (NULL," ,.-");
2334        }
2335}
2336
2337DLLEXPORT void parse_user_config_file(struct user_configuration* uc, FILE *file) {
2338        char line[1024];
2339        while (fgets(line, sizeof(line), file) != NULL)
2340        {
2341                parse_user_config(uc, line);
2342        }
2343}
2344
2345DLLEXPORT libtrace_packet_t* trace_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet) {
2346        libtrace_packet_t* result;
2347        libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &result, 1, 1);
2348        assert(result);
2349        swap_packets(result, packet); // Move the current packet into our copy
2350        return result;
2351}
2352
2353DLLEXPORT void trace_free_result_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
2354        // Try write back the packet
2355        assert(packet);
2356        // Always release any resources this might be holding such as a slot in a ringbuffer
2357        trace_fin_packet(packet);
2358        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packet, 1, 1);
2359}
2360
2361DLLEXPORT libtrace_info_t *trace_get_information(libtrace_t * libtrace) {
2362        if (libtrace->format)
2363                return &libtrace->format->info;
2364        else
2365                return NULL;
2366}
Note: See TracBrowser for help on using the repository browser.