source: lib/trace_parallel.c @ 85e87b5

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 85e87b5 was 85e87b5, checked in by Richard Sanger <rsangerarj@…>, 7 years ago

Automatically scale threads to the number of available CPU cores on Linux.

  • Property mode set to 100644
File size: 66.0 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * All rights reserved.
8 *
9 * This code has been developed by the University of Waikato WAND
10 * research group. For further information please see http://www.wand.net.nz/
11 *
12 * libtrace is free software; you can redistribute it and/or modify
13 * it under the terms of the GNU General Public License as published by
14 * the Free Software Foundation; either version 2 of the License, or
15 * (at your option) any later version.
16 *
17 * libtrace is distributed in the hope that it will be useful,
18 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 * GNU General Public License for more details.
21 *
22 * You should have received a copy of the GNU General Public License
23 * along with libtrace; if not, write to the Free Software
24 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25 *
26 * $Id$
27 *
28 */
29
30
31#define _GNU_SOURCE
32#include "common.h"
33#include "config.h"
34#include <assert.h>
35#include <errno.h>
36#include <fcntl.h>
37#include <stdio.h>
38#include <stdlib.h>
39#include <string.h>
40#include <sys/stat.h>
41#include <sys/types.h>
42#ifndef WIN32
43#include <sys/socket.h>
44#endif
45#include <stdarg.h>
46#include <sys/param.h>
47
48#ifdef HAVE_LIMITS_H
49#  include <limits.h>
50#endif
51
52#ifdef HAVE_SYS_LIMITS_H
53#  include <sys/limits.h>
54#endif
55
56#ifdef HAVE_NET_IF_ARP_H
57#  include <net/if_arp.h>
58#endif
59
60#ifdef HAVE_NET_IF_H
61#  include <net/if.h>
62#endif
63
64#ifdef HAVE_NETINET_IN_H
65#  include <netinet/in.h>
66#endif
67
68#ifdef HAVE_NET_ETHERNET_H
69#  include <net/ethernet.h>
70#endif
71
72#ifdef HAVE_NETINET_IF_ETHER_H
73#  include <netinet/if_ether.h>
74#endif
75
76#include <time.h>
77#ifdef WIN32
78#include <sys/timeb.h>
79#endif
80
81#include "libtrace.h"
82#include "libtrace_int.h"
83
84#ifdef HAVE_PCAP_BPF_H
85#  include <pcap-bpf.h>
86#else
87#  ifdef HAVE_NET_BPF_H
88#    include <net/bpf.h>
89#  endif
90#endif
91
92
93#include "libtrace_int.h"
94#include "format_helper.h"
95#include "rt_protocol.h"
96#include "hash_toeplitz.h"
97
98#include <pthread.h>
99#include <signal.h>
100#include <unistd.h>
101
102extern int libtrace_parallel;
103
104struct multithreading_stats {
105        uint64_t full_queue_hits;
106        uint64_t wait_for_fill_complete_hits;
107} contention_stats[1024];
108
109inline int trace_has_dedicated_hasher(libtrace_t * libtrace);
110
111/**
112 * @return True if the format supports parallel threads.
113 */
114static inline bool trace_supports_parallel(libtrace_t *trace)
115{
116        assert(trace);
117        assert(trace->format);
118        if (trace->format->pstart_input)
119                return true;
120        else
121                return false;
122        //return trace->format->pstart_input;
123}
124
125DLLEXPORT void print_contention_stats(libtrace_t *libtrace) {
126        int i;
127        struct multithreading_stats totals = {0};
128        for (i = 0; i < libtrace->perpkt_thread_count ; i++) {
129                fprintf(stderr, "\nStats for perpkt thread#%d\n", i);
130                fprintf(stderr, "\tfull_queue_hits: %"PRIu64"\n", contention_stats[i].full_queue_hits);
131                totals.full_queue_hits += contention_stats[i].full_queue_hits;
132                fprintf(stderr, "\twait_for_fill_complete_hits: %"PRIu64"\n", contention_stats[i].wait_for_fill_complete_hits);
133                totals.wait_for_fill_complete_hits += contention_stats[i].wait_for_fill_complete_hits;
134        }
135        fprintf(stderr, "\nTotals for perpkt threads\n");
136        fprintf(stderr, "\tfull_queue_hits: %"PRIu64"\n", totals.full_queue_hits);
137        fprintf(stderr, "\twait_for_fill_complete_hits: %"PRIu64"\n", totals.wait_for_fill_complete_hits);
138
139        return;
140}
141
142inline void libtrace_zero_thread(libtrace_thread_t * t) {
143        t->trace = NULL;
144        t->ret = NULL;
145        t->type = THREAD_EMPTY;
146        libtrace_zero_ringbuffer(&t->rbuffer);
147        libtrace_zero_vector(&t->vector);
148        libtrace_zero_deque(&t->deque);
149        t->recorded_first = false;
150        t->perpkt_num = -1;
151}
152
153// Ints are aligned int is atomic so safe to read and write at same time
154// However write must be locked, read doesn't (We never try read before written to table)
155libtrace_thread_t * get_thread_table(libtrace_t *libtrace) {
156        int i = 0;
157        pthread_t tid = pthread_self();
158
159        for (;i<libtrace->perpkt_thread_count ;++i) {
160                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
161                        return &libtrace->perpkt_threads[i];
162        }
163        return NULL;
164}
165
166int get_thread_table_num(libtrace_t *libtrace) {
167        int i = 0;
168        pthread_t tid = pthread_self();
169        for (;i<libtrace->perpkt_thread_count; ++i) {
170                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
171                        return i;
172        }
173        return -1;
174}
175
176static libtrace_thread_t * get_thread_descriptor(libtrace_t *libtrace) {
177        libtrace_thread_t *ret;
178        if (!(ret = get_thread_table(libtrace))) {
179                pthread_t tid = pthread_self();
180                // Check if we are reducer or something else
181                if (pthread_equal(tid, libtrace->reducer_thread.tid))
182                        ret = &libtrace->reducer_thread;
183                else if (pthread_equal(tid, libtrace->hasher_thread.tid))
184                        ret = &libtrace->hasher_thread;
185                else
186                        ret = NULL;
187        }
188        return ret;
189}
190
191/** Used below in trace_make_results_packets_safe*/
192static void do_copy_result_packet(void *data)
193{
194        libtrace_result_t *res = (libtrace_result_t *)data;
195        if (res->is_packet) {
196                // Duplicate the packet in standard malloc'd memory and free the
197                // original
198                libtrace_packet_t *oldpkt, *dup;
199                oldpkt = (libtrace_packet_t *) res->value;
200                dup = trace_copy_packet(oldpkt);
201                res->value = (void *)dup;
202                trace_destroy_packet(oldpkt);
203                fprintf(stderr, "Made a packet safe!!\n");
204        }
205}
206
207/**
208 * Make a safe replacement copy of any result packets that are owned
209 * by the format in the result queue. Used when pausing traces.
210 */ 
211static void trace_make_results_packets_safe(libtrace_t *trace) {
212        libtrace_thread_t *t = get_thread_descriptor(trace);
213        if (trace->reducer_flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED))
214                libtrace_deque_apply_function(&t->deque, &do_copy_result_packet);
215        else 
216                libtrace_vector_apply_function(&t->vector, &do_copy_result_packet);
217}
218
219/**
220 * Holds threads in a paused state, until released by broadcasting
221 * the condition mutex.
222 */
223static void trace_thread_pause(libtrace_t *trace) {
224        printf("Pausing thread #%d\n", get_thread_table_num(trace));
225        trace_make_results_packets_safe(trace);
226        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
227        trace->perpkts_pausing++;
228        pthread_cond_broadcast(&trace->perpkt_cond);
229        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
230                assert(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock) == 0);
231        }
232        trace->perpkts_pausing--;
233        pthread_cond_broadcast(&trace->perpkt_cond);
234        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
235        printf("Releasing thread #%d\n", get_thread_table_num(trace));
236}
237
238/**
239 * The is the entry point for our packet processing threads.
240 */
241static void* perpkt_threads_entry(void *data) {
242        libtrace_t *trace = (libtrace_t *)data;
243        libtrace_thread_t * t;
244        libtrace_message_t message = {0};
245        libtrace_packet_t *packet = NULL;
246
247        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
248        t = get_thread_table(trace);
249        assert(t);
250        //printf("Yay Started perpkt thread #%d\n", (int) get_thread_table_num(trace));
251        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
252
253        /* ~~~~~~~~~~~ Setup complete now we loop ~~~~~~~~~~~~~~~ */
254        // Send a message to say we've started
255
256        message.code = MESSAGE_STARTED;
257        message.sender = t;
258
259        // Let the per_packet function know we have started
260        (*trace->per_pkt)(trace, NULL, &message, t);
261
262
263        for (;;) {
264                int psize;
265
266                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
267                        switch (message.code) {
268                                case MESSAGE_DO_PAUSE: // This is internal
269                                        // Send message to say we are pausing, TODO consider sender
270                                        message.code = MESSAGE_PAUSING;
271                                        (*trace->per_pkt)(trace, NULL, &message, t);
272                                        // If a hasher thread is running empty input queues so we don't loose data
273                                        if (trace_has_dedicated_hasher(trace)) {
274                                                fprintf(stderr, "Trace is using a hasher thread emptying queues\n");
275                                                // The hasher has stopped by this point, so the queue shouldn't be filling
276                                                while(!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
277                                                        psize = trace_pread_packet(trace, &packet);
278                                                        if (psize > 0) {
279                                                                packet = (*trace->per_pkt)(trace, packet, NULL, t);
280                                                        } else {
281                                                                fprintf(stderr, "Psize=%d empty=%d this is probably EOF or message waiting, but if this continues we have a bug!!!\n", psize, libtrace_ringbuffer_is_empty(&t->rbuffer));
282                                                        }
283                                                }
284                                        }
285                                        // Send a paused message as a final chance to memory copy any packets
286                                        message.code = MESSAGE_PAUSED;
287                                        (*trace->per_pkt)(trace, NULL, &message, t);
288                                        // Now we do the actual pause, this returns when we are done
289                                        trace_thread_pause(trace);
290                                        // Check for new messages as soon as we return
291                                        continue;
292                                case MESSAGE_DO_STOP: // This is internal
293                                        goto stop;
294                        }
295                        (*trace->per_pkt)(trace, NULL, &message, t);
296                        continue;
297                }
298
299                if (trace->perpkt_thread_count == 1) {
300                        if (!packet) {
301                                if (!libtrace_ringbuffer_try_sread_bl(&trace->packet_freelist, (void **) &packet))
302                                        packet = trace_create_packet();
303                        }
304                        assert(packet);
305                        if ((psize = trace_read_packet(trace, packet)) <1) {
306                                break;
307                        }
308                } else {
309                        psize = trace_pread_packet(trace, &packet);
310                }
311
312                if (psize > 0) {
313                        packet = (*trace->per_pkt)(trace, packet, NULL, t);
314                        continue;
315                }
316
317                if (psize == -2)
318                        continue; // We have a message
319
320                if (psize < 1) { // consider sending a message
321                        break;
322                }
323
324        }
325
326
327stop:
328        /* ~~~~~~~~~~~~~~ Trace is finished do tear down ~~~~~~~~~~~~~~~~~~~~~ */
329        // Let the per_packet function know we have stopped
330        message.code = MESSAGE_STOPPED;
331        message.sender = NULL;
332        message.additional.uint64 = 0;
333        (*trace->per_pkt)(trace, NULL, &message, t);
334
335        // And we're at the end free the memories
336        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
337        t->state = THREAD_FINISHED;
338        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
339
340        // Notify only after we've defiantly set the state to finished
341        message.code = MESSAGE_PERPKT_ENDED;
342        message.additional.uint64 = 0;
343        trace_send_message_to_reducer(trace, &message);
344
345        pthread_exit(NULL);
346};
347
348/** True if trace has dedicated hasher thread otherwise false */
349inline int trace_has_dedicated_hasher(libtrace_t * libtrace)
350{
351        return libtrace->hasher_thread.type == THREAD_HASHER;
352}
353
354/**
355 * The start point for our single threaded hasher thread, this will read
356 * and hash a packet from a data source and queue it against the correct
357 * core to process it.
358 */
359static void* hasher_start(void *data) {
360        libtrace_t *trace = (libtrace_t *)data;
361        libtrace_thread_t * t;
362        int i;
363        libtrace_packet_t * packet;
364        libtrace_message_t message = {0};
365
366        assert(trace_has_dedicated_hasher(trace));
367        /* Wait until all threads are started and objects are initialised (ring buffers) */
368        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
369        t = &trace->hasher_thread;
370        assert(t->type == THREAD_HASHER && pthread_equal(pthread_self(), t->tid));
371        printf("Hasher Thread started\n");
372        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
373        int pkt_skipped = 0;
374        /* Read all packets in then hash and queue against the correct thread */
375        while (1) {
376                int thread;
377                if (!pkt_skipped && !libtrace_ringbuffer_try_sread_bl(&trace->packet_freelist, (void **) &packet))
378                        packet = trace_create_packet();
379                assert(packet);
380
381                if (libtrace_halt) // Signal to die has been sent - TODO
382                        break;
383
384                // Check for messages that we expect MESSAGE_DO_PAUSE, (internal messages only)
385                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
386                        switch(message.code) {
387                                case MESSAGE_DO_PAUSE:
388                                        fprintf(stderr, "Pausing hasher thread\n");
389                                        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
390                                        trace->perpkts_pausing++;
391                                        pthread_cond_broadcast(&trace->perpkt_cond);
392                                        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
393                                                assert(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock) == 0);
394                                        }
395                                        // trace->perpkts_pausing--; // Don't do this the pausing thread will do this for us
396                                        pthread_cond_broadcast(&trace->perpkt_cond);
397                                        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
398                                        fprintf(stdout, "Mapper resuming\n");
399                                        break;
400                                case MESSAGE_DO_STOP:
401                                        // Stop called after pause
402                                        assert(trace->started == false);
403                                        assert(trace->state == STATE_FINSHED);
404                                default:
405                                        fprintf(stderr, "Hasher thread didn't expect message code=%d\n", message.code);
406                        }
407                        pkt_skipped = 1;
408                        continue;
409                }
410
411                if ((packet->error = trace_read_packet(trace, packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
412                        break; /* We are EOF or error'd either way we stop  */
413                }
414
415                /* We are guaranteed to have a hash function i.e. != NULL */
416                trace_packet_set_hash(packet, (*trace->hasher)(packet, trace->hasher_data));
417                thread = trace_packet_get_hash(packet) % trace->perpkt_thread_count;
418                /* Blocking write to the correct queue - I'm the only writer */
419                if (trace->perpkt_threads[thread].state != THREAD_FINISHED) {
420                        libtrace_ringbuffer_write(&trace->perpkt_threads[thread].rbuffer, packet);
421                        pkt_skipped = 0;
422                } else {
423                        pkt_skipped = 1; // Reuse that packet no one read it
424                }
425        }
426
427        /* Broadcast our last failed read to all threads */
428        for (i = 0; i < trace->perpkt_thread_count; i++) {
429                libtrace_packet_t * bcast;
430                printf("Broadcasting error/EOF now the trace is over\n");
431                if (i == trace->perpkt_thread_count - 1) {
432                        bcast = packet;
433                } else {
434                        bcast = trace_create_packet();
435                        bcast->error = packet->error;
436                }
437                assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
438                if (trace->perpkt_threads[i].state != THREAD_FINISHED) {
439                        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
440                        // Unlock early otherwise we could deadlock
441                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, bcast);
442                } else {
443                        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
444                }
445        }
446        // We dont need to free packet
447
448        // And we're at the end free the memories
449        t->state = THREAD_FINISHED;
450
451        // Notify only after we've defiantly set the state to finished
452        message.code = MESSAGE_PERPKT_ENDED;
453        message.additional.uint64 = 0;
454        trace_send_message_to_reducer(trace, &message);
455
456        // TODO remove from TTABLE t sometime
457        pthread_exit(NULL);
458};
459
460/**
461 * Moves src into dest(Complete copy) and copies the memory buffer and
462 * its flags from dest into src ready for reuse without needing extra mallocs.
463 */
464static inline void swap_packets(libtrace_packet_t *dest, libtrace_packet_t *src) {
465        // Save the passed in buffer status
466        assert(dest->trace == NULL); // Must be a empty packet
467        void * temp_buf = dest->buffer;
468        buf_control_t temp_buf_control = dest->buf_control;
469        // Completely copy StoredPacket into packet
470        memcpy(dest, src, sizeof(libtrace_packet_t));
471        // Set the buffer settings on the returned packet
472        src->buffer = temp_buf;
473        src->buf_control = temp_buf_control;
474        src->trace = NULL;
475}
476
477/* Our simplest case when a thread becomes ready it can obtain a exclusive
478 * lock to read a packet from the underlying trace.
479 */
480inline static int trace_pread_packet_first_in_first_served(libtrace_t *libtrace, libtrace_packet_t **packet)
481{
482        // We need this to fill the 'first' packet table
483        libtrace_thread_t *t = get_thread_table(libtrace);
484        if (!*packet) {
485                if (!libtrace_ringbuffer_try_sread_bl(&libtrace->packet_freelist, (void **) packet))
486                        *packet = trace_create_packet();
487        }
488        assert(*packet);
489        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
490        /* Read a packet */
491        (*packet)->error = trace_read_packet(libtrace, *packet);
492        // Doing this inside the lock ensures the first packet is always
493        // recorded first
494        if ((*packet)->error > 0)
495                store_first_packet(libtrace, *packet, t);
496
497        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
498        return (*packet)->error;
499}
500
501/**
502 * For the case that we have a dedicated hasher thread
503 * 1. We read a packet from our buffer
504 * 2. Move that into the packet provided (packet)
505 */
506inline static int trace_pread_packet_hasher_thread(libtrace_t *libtrace, libtrace_packet_t **packet)
507{
508        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
509        libtrace_thread_t* t = &libtrace->perpkt_threads[this_thread];
510
511        if (*packet) // Recycle the old get the new
512                if (!libtrace_ringbuffer_try_swrite_bl(&libtrace->packet_freelist, (void *) *packet))
513                        trace_destroy_packet(*packet);
514        *packet = libtrace_ringbuffer_read(&t->rbuffer);
515
516        if (*packet) {
517                return (*packet)->error;
518        } else {
519                // This is how we do a notify, we send a message before hand to note that the trace is over etc.
520                // And this will notify the perpkt thread to read that message, this is easiest
521                // since cases like pause can also be dealt with this way without actually
522                // having to be the end of the stream.
523                fprintf(stderr, "Got a NULL packet from hasher therefore message waiting\n");
524                return -2;
525        }
526}
527
528/**
529 * Tries to read from our queue and returns 1 if a packet was retrieved
530 */
531static inline int try_waiting_queue(libtrace_t *libtrace, libtrace_thread_t * t, libtrace_packet_t **packet, int * ret)
532{
533        libtrace_packet_t* retrived_packet;
534
535        /* Lets see if we have one waiting */
536        if (libtrace_ringbuffer_try_read(&t->rbuffer, (void **) &retrived_packet)) {
537                /* Copy paste from trace_pread_packet_hasher_thread() except that we try read (non-blocking) */
538                assert(retrived_packet);
539
540                if (*packet) // Recycle the old get the new
541                        if (!libtrace_ringbuffer_try_swrite_bl(&libtrace->packet_freelist, (void *) *packet))
542                                trace_destroy_packet(*packet);
543                *packet = retrived_packet;
544                *ret = (*packet)->error;
545                return 1;
546        }
547        return 0;
548}
549
550/**
551 * Allows us to ensure all threads are finished writing to our threads ring_buffer
552 * before returning EOF/error.
553 */
554inline static int trace_handle_finishing_perpkt(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t)
555{
556        /* We are waiting for the condition that another thread ends to check
557         * our queue for new data, once all threads end we can go to finished */
558        bool complete = false;
559        int ret;
560
561        do {
562                // Wait for a thread to end
563                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
564
565                // Check before
566                if (libtrace->perpkts_finishing == libtrace->perpkt_thread_count) {
567                        complete = true;
568                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
569                        continue;
570                }
571
572                assert(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock) == 0);
573
574                // Check after
575                if (libtrace->perpkts_finishing == libtrace->perpkt_thread_count) {
576                        complete = true;
577                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
578                        continue;
579                }
580
581                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
582
583                // Always trying to keep our buffer empty for the unlikely case more threads than buffer space want to write into our queue
584                if(try_waiting_queue(libtrace, t, packet, &ret))
585                        return ret;
586        } while (!complete);
587
588        // We can only end up here once all threads complete
589        try_waiting_queue(libtrace, t, packet, &ret);
590
591        return ret;
592        // TODO rethink this logic fix bug here
593}
594
595/**
596 * Expects the libtrace_lock to not be held
597 */
598inline static int trace_finish_perpkt(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t)
599{
600        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
601        t->state = THREAD_FINISHING;
602        libtrace->perpkts_finishing++;
603        pthread_cond_broadcast(&libtrace->perpkt_cond);
604        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
605        return trace_handle_finishing_perpkt(libtrace, packet, t);
606}
607
608/**
609 * This case is much like the dedicated hasher, except that we will become
610 * hasher if we don't have a a packet waiting.
611 *
612 * Note: This is only every used if we have are doing hashing.
613 *
614 * TODO: Can block on zero copy formats such as ring: and dpdk: if the
615 * queue sizes in total are larger than the ring size.
616 *
617 * 1. We read a packet from our buffer
618 * 2. Move that into the packet provided (packet)
619 */
620inline static int trace_pread_packet_hash_locked(libtrace_t *libtrace, libtrace_packet_t **packet)
621{
622        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
623        libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
624        int thread, ret/*, psize*/;
625
626        while (1) {
627                if(try_waiting_queue(libtrace, t, packet, &ret))
628                        return ret;
629                // Can still block here if another thread is writing to a full queue
630                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
631
632                // Its impossible for our own queue to overfill, because no one can write
633                // when we are in the lock
634                if(try_waiting_queue(libtrace, t, packet, &ret)) {
635                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
636                        return ret;
637                }
638
639                // Another thread cannot write a packet because a queue has filled up. Is it ours?
640                if (libtrace->perpkt_queue_full) {
641                        contention_stats[this_thread].wait_for_fill_complete_hits++;
642                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
643                        continue;
644                }
645
646                if (!*packet) {
647                        if (!libtrace_ringbuffer_try_sread_bl(&libtrace->packet_freelist, (void **) packet))
648                                *packet = trace_create_packet();
649                }
650                assert(*packet);
651
652                // If we fail here we can guarantee that our queue is empty (and no new data will be added because we hold the lock)
653                if (libtrace_halt || ((*packet)->error = trace_read_packet(libtrace, *packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
654                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
655                        if (libtrace_halt)
656                                return 0;
657                        else
658                                return (*packet)->error;
659                }
660
661                trace_packet_set_hash(*packet, (*libtrace->hasher)(*packet, libtrace->hasher_data));
662                thread = trace_packet_get_hash(*packet) % libtrace->perpkt_thread_count;
663                if (thread == this_thread) {
664                        // If it's this thread we must be in order because we checked the buffer once we got the lock
665                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
666                        return (*packet)->error;
667                }
668
669                if (libtrace->perpkt_threads[thread].state != THREAD_FINISHED) {
670                        while (!libtrace_ringbuffer_try_swrite_bl(&libtrace->perpkt_threads[thread].rbuffer, *packet)) {
671                                libtrace->perpkt_queue_full = true;
672                                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
673                                contention_stats[this_thread].full_queue_hits++;
674                                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
675                        }
676                        *packet = NULL;
677                        libtrace->perpkt_queue_full = false;
678                } else {
679                        /* We can get here if the user closes the thread before natural completion/or error */
680                        assert (!"packet_hash_locked() The user terminated the trace in a abnormal manner");
681                }
682                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
683        }
684}
685
686/**
687 * This case is much like the dedicated hasher, except that we will become
688 * hasher if we don't have a a packet waiting.
689 *
690 * TODO: You can loose the tail of a trace if the final thread
691 * fills its own queue and therefore breaks early and doesn't empty the sliding window.
692 *
693 * TODO: Can block on zero copy formats such as ring: and dpdk: if the
694 * queue sizes in total are larger than the ring size.
695 *
696 * 1. We read a packet from our buffer
697 * 2. Move that into the packet provided (packet)
698 */
699inline static int trace_pread_packet_sliding_window(libtrace_t *libtrace, libtrace_packet_t **packet)
700{
701        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
702        libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
703        int ret, i, thread/*, psize*/;
704
705        if (t->state == THREAD_FINISHING)
706                return trace_handle_finishing_perpkt(libtrace, packet, t);
707
708        while (1) {
709                // Check if we have packets ready
710                if(try_waiting_queue(libtrace, t, packet, &ret))
711                        return ret;
712
713                // We limit the number of packets we get to the size of the sliding window
714                // such that it is impossible for any given thread to fail to store a packet
715                assert(sem_wait(&libtrace->sem) == 0);
716                /*~~~~Single threaded read of a packet~~~~*/
717                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
718
719                /* Re-check our queue things we might have data waiting */
720                if(try_waiting_queue(libtrace, t, packet, &ret)) {
721                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
722                        assert(sem_post(&libtrace->sem) == 0);
723                        return ret;
724                }
725
726                // TODO put on *proper* condition variable
727                if (libtrace->perpkt_queue_full) {
728                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
729                        assert(sem_post(&libtrace->sem) == 0);
730                        contention_stats[this_thread].wait_for_fill_complete_hits++;
731                        continue;
732                }
733
734                if (!*packet) {
735                        if (!libtrace_ringbuffer_try_sread_bl(&libtrace->packet_freelist, (void **) packet))
736                                *packet = trace_create_packet();
737                }
738                assert(*packet);
739
740                if (libtrace_halt || ((*packet)->error = trace_read_packet(libtrace, *packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
741                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
742                        assert(sem_post(&libtrace->sem) == 0);
743                        // Finish this thread ensuring that any data written later by another thread is retrieved also
744                        if (libtrace_halt)
745                                return 0;
746                        else
747                                return trace_finish_perpkt(libtrace, packet, t);
748                }
749                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
750
751                /* ~~~~Multiple threads can run the hasher~~~~ */
752                trace_packet_set_hash(*packet, (*libtrace->hasher)(*packet, libtrace->hasher_data));
753
754                /* Yes this is correct opposite read lock for a write operation */
755                assert(pthread_rwlock_rdlock(&libtrace->window_lock) == 0);
756                if (!libtrace_slidingwindow_try_write(&libtrace->sliding_window, trace_packet_get_order(*packet), *packet))
757                        assert(!"Semaphore should stop us from ever overfilling the sliding window");
758                assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
759                *packet = NULL;
760
761                // Always try read any data from the sliding window
762                while (libtrace_slidingwindow_read_ready(&libtrace->sliding_window)) {
763                        assert(pthread_rwlock_wrlock(&libtrace->window_lock) == 0);
764                        if (libtrace->perpkt_queue_full) {
765                                // I might be the holdup in which case if I can read my queue I should do that and return
766                                if(try_waiting_queue(libtrace, t, packet, &ret)) {
767                                        assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
768                                        return ret;
769                                }
770                                assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
771                                continue;
772                        }
773                        // Read greedily as many as we can
774                        while (libtrace_slidingwindow_try_read(&libtrace->sliding_window, (void **) packet, NULL)) {
775                                thread = trace_packet_get_hash(*packet) % libtrace->perpkt_thread_count;
776                                if (libtrace->perpkt_threads[thread].state != THREAD_FINISHED) {
777                                        while (!libtrace_ringbuffer_try_swrite_bl(&libtrace->perpkt_threads[thread].rbuffer, *packet)) {
778                                                if (this_thread == thread)
779                                                {
780                                                        // TODO think about this case more because we have to stop early if this were to happen on the last read
781                                                        // before EOF/error we might not have emptied the sliding window
782                                                        printf("!~!~!~!~!~!~In this Code~!~!~!~!\n");
783                                                        // Its our queue we must have a packet to read out
784                                                        if(try_waiting_queue(libtrace, t, packet, &ret)) {
785                                                                // We must be able to write this now 100% without fail
786                                                                libtrace_ringbuffer_write(&libtrace->perpkt_threads[thread].rbuffer, *packet);
787                                                                assert(sem_post(&libtrace->sem) == 0);
788                                                                assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
789                                                                return ret;
790                                                        } else {
791                                                                assert(!"Our queue is full but I cannot read from it??");
792                                                        }
793                                                }
794                                                // Not us we have to give the other threads a chance to write there packets then
795                                                libtrace->perpkt_queue_full = true;
796                                                assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
797                                                for (i = 0; i < libtrace->perpkt_thread_count-1; i++) // Release all other threads to read there packets
798                                                        assert(sem_post(&libtrace->sem) == 0);
799
800                                                contention_stats[this_thread].full_queue_hits++;
801                                                assert(pthread_rwlock_wrlock(&libtrace->window_lock) == 0);
802                                                // Grab these back
803                                                for (i = 0; i < libtrace->perpkt_thread_count-1; i++) // Release all other threads to read there packets
804                                                        assert(sem_wait(&libtrace->sem) == 0);
805                                                libtrace->perpkt_queue_full = false;
806                                        }
807                                        assert(sem_post(&libtrace->sem) == 0);
808                                        *packet = NULL;
809                                } else {
810                                        // Cannot write to a queue if no ones waiting (I think this is unreachable)
811                                        // in the general case (unless the user ends early without proper clean up).
812                                        assert (!"unreachable code??");
813                                }
814                        }
815                        assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
816                }
817                // Now we go back to checking our queue anyways
818        }
819}
820
821
822/**
823 * For the first packet of each queue we keep a copy and note the system
824 * time it was received at.
825 *
826 * This is used for finding the first packet when playing back a trace
827 * in trace time. And can be used by real time applications to print
828 * results out every XXX seconds.
829 */
830inline void store_first_packet(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t)
831{
832        if (!t->recorded_first) {
833                struct timeval tv;
834                libtrace_packet_t * dup;
835                // For what it's worth we can call these outside of the lock
836                gettimeofday(&tv, NULL);
837                dup = trace_copy_packet(packet);
838                assert(pthread_spin_lock(&libtrace->first_packets.lock) == 0);
839                libtrace->first_packets.packets[t->perpkt_num].packet = dup;
840                //printf("Stored first packet time=%f\n", trace_get_seconds(dup));
841                memcpy(&libtrace->first_packets.packets[t->perpkt_num].tv, &tv, sizeof(tv));
842                // Now update the first
843                libtrace->first_packets.count++;
844                if (libtrace->first_packets.count == 1) {
845                        // We the first entry hence also the first known packet
846                        libtrace->first_packets.first = t->perpkt_num;
847                } else {
848                        // Check if we are newer than the previous 'first' packet
849                        size_t first = libtrace->first_packets.first;
850                        if (trace_get_seconds(dup) <
851                                trace_get_seconds(libtrace->first_packets.packets[first].packet))
852                                libtrace->first_packets.first = t->perpkt_num;
853                }
854                assert(pthread_spin_unlock(&libtrace->first_packets.lock) == 0);
855                libtrace_message_t mesg = {0};
856                mesg.code = MESSAGE_FIRST_PACKET;
857                trace_send_message_to_reducer(libtrace, &mesg);
858                t->recorded_first = true;
859        }
860}
861
862/**
863 * Returns 1 if it's certain that the first packet is truly the first packet
864 * rather than a best guess based upon threads that have published so far.
865 * Otherwise 0 is returned.
866 * It's recommended that this result is stored rather than calling this
867 * function again.
868 */
869DLLEXPORT int retrive_first_packet(libtrace_t *libtrace, libtrace_packet_t **packet, struct timeval **tv)
870{
871        int ret = 0;
872        assert(pthread_spin_lock(&libtrace->first_packets.lock) == 0);
873        if (libtrace->first_packets.count) {
874                *packet = libtrace->first_packets.packets[libtrace->first_packets.first].packet;
875                *tv = &libtrace->first_packets.packets[libtrace->first_packets.first].tv;
876                if (libtrace->first_packets.count == (size_t) libtrace->perpkt_thread_count) {
877                        ret = 1;
878                } else {
879                        struct timeval curr_tv;
880                        // If a second has passed since the first entry we will assume this is the very first packet
881                        gettimeofday(&curr_tv, NULL);
882                        if (curr_tv.tv_sec > (*tv)->tv_sec) {
883                                if(curr_tv.tv_usec > (*tv)->tv_usec || curr_tv.tv_sec - (*tv)->tv_sec > 1) {
884                                        ret = 1;
885                                }
886                        }
887                }
888        } else {
889                *packet = NULL;
890                *tv = NULL;
891        }
892        assert(pthread_spin_unlock(&libtrace->first_packets.lock) == 0);
893        return ret;
894}
895
896
897DLLEXPORT uint64_t tv_to_usec(struct timeval *tv)
898{
899        return (uint64_t) tv->tv_sec*1000000ull + (uint64_t) tv->tv_usec;
900}
901
902inline static struct timeval usec_to_tv(uint64_t usec)
903{
904        struct timeval tv;
905        tv.tv_sec = usec / 1000000;
906        tv.tv_usec = usec % 1000000;
907        return tv;
908}
909
910/** Similar to delay_tracetime but send messages to all threads periodically */
911static void* keepalive_entry(void *data) {
912        struct timeval prev, next;
913        libtrace_message_t message = {0};
914        libtrace_t *trace = (libtrace_t *)data;
915        uint64_t next_release;
916        fprintf(stderr, "keepalive thread is starting\n");
917
918        gettimeofday(&prev, NULL);
919        message.code = MESSAGE_TICK;
920        while (trace->state != STATE_FINSHED) {
921                fd_set rfds;
922                next_release = tv_to_usec(&prev) + (trace->tick_interval * 1000);
923                gettimeofday(&next, NULL);
924                if (next_release > tv_to_usec(&next)) {
925                        next = usec_to_tv(next_release - tv_to_usec(&next));
926                        // Wait for timeout or a message
927                        FD_ZERO(&rfds);
928                FD_SET(libtrace_message_queue_get_fd(&trace->keepalive_thread.messages), &rfds);
929                        if (select(libtrace_message_queue_get_fd(&trace->keepalive_thread.messages)+1, &rfds, NULL, NULL, &next) == 1) {
930                                libtrace_message_t msg;
931                                libtrace_message_queue_get(&trace->keepalive_thread.messages, &msg);
932                                assert(msg.code == MESSAGE_DO_STOP);
933                                goto done;
934                        }
935                }
936                prev = usec_to_tv(next_release);
937                if (trace->state == STATE_RUNNING) {
938                        message.additional.uint64 = tv_to_usec(&prev);
939                        trace_send_message_to_perpkts(trace, &message);
940                }
941        }
942done:
943        fprintf(stderr, "keepalive thread is finishing\n");
944        trace->keepalive_thread.state = THREAD_FINISHED;
945        return NULL;
946}
947
948/**
949 * Delays a packets playback so the playback will be in trace time
950 */
951static inline void delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t) {
952        struct timeval curr_tv, pkt_tv;
953        uint64_t next_release = t->tracetime_offset_usec; // Time at which to release the packet
954        uint64_t curr_usec;
955        /* Tracetime we might delay releasing this packet */
956        if (!t->tracetime_offset_usec) {
957                libtrace_packet_t * first_pkt;
958                struct timeval *sys_tv;
959                int64_t initial_offset;
960                int stable = retrive_first_packet(libtrace, &first_pkt, &sys_tv);
961                assert(first_pkt);
962                pkt_tv = trace_get_timeval(first_pkt);
963                initial_offset = (int64_t)tv_to_usec(sys_tv) - (int64_t)tv_to_usec(&pkt_tv);
964                if (stable)
965                        // 0->1 because 0 is used to mean unset
966                        t->tracetime_offset_usec = initial_offset ? initial_offset: 1;
967                next_release = initial_offset;
968        }
969        /* next_release == offset */
970        pkt_tv = trace_get_timeval(packet);
971        next_release += tv_to_usec(&pkt_tv);
972        gettimeofday(&curr_tv, NULL);
973        curr_usec = tv_to_usec(&curr_tv);
974        if (next_release > curr_usec) {
975                // We need to wait
976                struct timeval delay_tv = usec_to_tv(next_release-curr_usec);
977                //printf("WAITING for %d.%d next=%"PRIu64" curr=%"PRIu64" seconds packettime %f\n", delay_tv.tv_sec, delay_tv.tv_usec, next_release, curr_usec, trace_get_seconds(packet));
978                select(0, NULL, NULL, NULL, &delay_tv);
979        }
980}
981
982/* Read one packet from the trace into a buffer. Note that this function will
983 * block until a packet is read (or EOF is reached).
984 *
985 * @param libtrace      the libtrace opaque pointer
986 * @param packet        the packet opaque pointer
987 * @returns 0 on EOF, negative value on error
988 *
989 * Note this is identical to read_packet but calls pread_packet instead of
990 * read packet in the format.
991 *
992 */
993static inline int trace_pread_packet_wrapper(libtrace_t *libtrace, libtrace_packet_t *packet) {
994
995        assert(libtrace && "You called trace_read_packet() with a NULL libtrace parameter!\n");
996        if (trace_is_err(libtrace))
997                return -1;
998        if (!libtrace->started) {
999                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,"You must call libtrace_start() before trace_read_packet()\n");
1000                return -1;
1001        }
1002        if (!(packet->buf_control==TRACE_CTRL_PACKET || packet->buf_control==TRACE_CTRL_EXTERNAL)) {
1003                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,"Packet passed to trace_read_packet() is invalid\n");
1004                return -1;
1005        }
1006        assert(packet);
1007
1008        if (libtrace->format->read_packet) {
1009                do {
1010                        size_t ret;
1011                        /* Finalise the packet, freeing any resources the format module
1012                         * may have allocated it and zeroing all data associated with it.
1013                         */
1014                        trace_fin_packet(packet);
1015                        /* Store the trace we are reading from into the packet opaque
1016                         * structure */
1017                        packet->trace = libtrace;
1018                        ret=libtrace->format->pread_packet(libtrace,packet);
1019                        if (ret==(size_t)-1 || ret==(size_t)-2 || ret==0) {
1020                                return ret;
1021                        }
1022                        if (libtrace->filter) {
1023                                /* If the filter doesn't match, read another
1024                                 * packet
1025                                 */
1026                                if (!trace_apply_filter(libtrace->filter,packet)){
1027                                        ++libtrace->filtered_packets;
1028                                        continue;
1029                                }
1030                        }
1031                        if (libtrace->snaplen>0) {
1032                                /* Snap the packet */
1033                                trace_set_capture_length(packet,
1034                                                libtrace->snaplen);
1035                        }
1036                        trace_packet_set_order(packet, libtrace->accepted_packets);
1037                        ++libtrace->accepted_packets;
1038                        return ret;
1039                } while(1);
1040        }
1041        trace_set_err(libtrace,TRACE_ERR_UNSUPPORTED,"This format does not support reading packets\n");
1042        return ~0U;
1043}
1044
1045/**
1046 * Read a packet from the parallel trace
1047 */
1048DLLEXPORT int trace_pread_packet(libtrace_t *libtrace, libtrace_packet_t **packet)
1049{
1050        int ret;
1051        libtrace_thread_t *t = get_thread_table(libtrace);
1052
1053        // Cleanup the packet passed back
1054        if (*packet)
1055                trace_fin_packet(*packet);
1056
1057        if (trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
1058                if (!*packet)
1059                        *packet = trace_create_packet();
1060                ret = trace_pread_packet_wrapper(libtrace, *packet);
1061        } else if (trace_has_dedicated_hasher(libtrace)) {
1062                ret = trace_pread_packet_hasher_thread(libtrace, packet);
1063        } else if (!trace_has_dedicated_hasher(libtrace)) {
1064                /* We don't care about which core a packet goes to */
1065                ret = trace_pread_packet_first_in_first_served(libtrace, packet);
1066        } /* else if (libtrace->reducer_flags & PERPKT_USE_SLIDING_WINDOW) {
1067                ret = trace_pread_packet_sliding_window(libtrace, packet);
1068        } else {
1069                ret = trace_pread_packet_hash_locked(libtrace, packet);
1070        }*/
1071
1072        // Formats can also optionally do this internally to ensure the first
1073        // packet is always reported correctly
1074        if (ret > 0) {
1075                store_first_packet(libtrace, *packet, t);
1076                if (libtrace->tracetime)
1077                        delay_tracetime(libtrace, *packet, t);
1078        }
1079
1080        return ret;
1081}
1082
1083/* Starts perpkt threads
1084 * @return threads_started
1085 */
1086static inline int trace_start_perpkt_threads (libtrace_t *libtrace) {
1087        int i;
1088
1089        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1090                libtrace_thread_t *t = &libtrace->perpkt_threads[i];
1091                assert(pthread_create(&t->tid, NULL, perpkt_threads_entry, (void *) libtrace) == 0);
1092        }
1093        return libtrace->perpkt_thread_count;
1094}
1095
1096/* Start an input trace in a parallel fashion.
1097 *
1098 * @param libtrace the input trace to start
1099 * @param global_blob some global data you can share with the new perpkt threads
1100 * @returns 0 on success
1101 */
1102DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob, fn_per_pkt per_pkt, fn_reducer reducer)
1103{
1104        int i;
1105        sigset_t sig_before, sig_block_all;
1106
1107        assert(libtrace);
1108        if (trace_is_err(libtrace))
1109                return -1;
1110        if (libtrace->state == STATE_PAUSED) {
1111                assert (libtrace->perpkts_pausing != 0);
1112                fprintf(stderr, "Restarting trace\n");
1113                UNUSED int err;
1114                if (libtrace->perpkt_thread_count > 1 && trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
1115                        printf("This format has direct support for p's\n");
1116                        err = libtrace->format->pstart_input(libtrace);
1117                } else {
1118                        if (libtrace->format->start_input) {
1119                                err = libtrace->format->start_input(libtrace);
1120                        }
1121                }
1122                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1123                libtrace->started = true;
1124                libtrace->state = STATE_RUNNING;
1125                assert(pthread_cond_broadcast(&libtrace->perpkt_cond) == 0);
1126                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1127                return 0;
1128        }
1129
1130        assert(libtrace->state == STATE_NEW);
1131        libtrace_parallel = 1;
1132
1133        // Store the user defined things against the trace
1134        libtrace->global_blob = global_blob;
1135        libtrace->per_pkt = per_pkt;
1136        libtrace->reducer = reducer;
1137        libtrace->perpkts_finishing = 0;
1138
1139        assert(pthread_mutex_init(&libtrace->libtrace_lock, NULL) == 0);
1140        assert(pthread_cond_init(&libtrace->perpkt_cond, NULL) == 0);
1141        assert(pthread_rwlock_init(&libtrace->window_lock, NULL) == 0);
1142        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1143
1144        // Set default buffer sizes
1145        if (libtrace->perpkt_buffer_size <= 0)
1146                libtrace->perpkt_buffer_size = 1000;
1147
1148        if (libtrace->perpkt_thread_count <= 0) {
1149                // TODO add BSD support
1150                libtrace->perpkt_thread_count = sysconf(_SC_NPROCESSORS_ONLN);
1151                if (libtrace->perpkt_thread_count <= 0)
1152                        // Lets just use one
1153                        libtrace->perpkt_thread_count = 1;
1154        }
1155
1156        if(libtrace->packet_freelist_size <= 0)
1157                libtrace->packet_freelist_size = (libtrace->perpkt_buffer_size + 1) * libtrace->perpkt_thread_count;
1158
1159        if(libtrace->packet_freelist_size <
1160                (libtrace->perpkt_buffer_size + 1) * libtrace->perpkt_thread_count)
1161                fprintf(stderr, "WARNING deadlocks may occur and extra memory allocating buffer sizes (packet_freelist_size) mismatched\n");
1162
1163        libtrace->started=true; // Before we start the threads otherwise we could have issues
1164        libtrace->state = STATE_RUNNING;
1165        /* Disable signals - Pthread signal handling */
1166
1167        sigemptyset(&sig_block_all);
1168
1169        assert(pthread_sigmask(SIG_SETMASK, &sig_block_all, &sig_before) == 0);
1170
1171        // If we are using a hasher start it
1172        if (libtrace->hasher || libtrace->hasher_thread.type == THREAD_HASHER) {
1173                libtrace_thread_t *t = &libtrace->hasher_thread;
1174                t->trace = libtrace;
1175                t->ret = NULL;
1176                t->type = THREAD_HASHER;
1177                t->state = THREAD_RUNNING;
1178                libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
1179                assert(pthread_create(&t->tid, NULL, hasher_start, (void *) libtrace) == 0);
1180        } else {
1181                libtrace->hasher_thread.type = THREAD_EMPTY;
1182        }
1183        libtrace_ringbuffer_init(&libtrace->packet_freelist, libtrace->packet_freelist_size, LIBTRACE_RINGBUFFER_BLOCKING);
1184        libtrace_slidingwindow_init(&libtrace->sliding_window, libtrace->packet_freelist_size, 0);
1185        assert(sem_init(&libtrace->sem, 0, libtrace->packet_freelist_size) == 0);
1186        // This will be applied to every new thread that starts, i.e. they will block all signals
1187        // Lets start a fixed number of reading threads
1188
1189        // For now we never have a dedicated thread for the reducer
1190        // i.e. This main thread is used as the reducer
1191        libtrace->reducer_thread.tid = pthread_self();
1192        libtrace->reducer_thread.type = THREAD_REDUCER;
1193        libtrace->reducer_thread.state = THREAD_RUNNING;
1194        libtrace_message_queue_init(&libtrace->reducer_thread.messages, sizeof(libtrace_message_t));
1195
1196        /* Ready some storages */
1197        libtrace->first_packets.first = 0;
1198        libtrace->first_packets.count = 0;
1199        assert(pthread_spin_init(&libtrace->first_packets.lock, 0) == 0);
1200        libtrace->first_packets.packets = calloc(libtrace->perpkt_thread_count, sizeof(struct  __packet_storage_magic_type));
1201
1202
1203        /* Start all of our perpkt threads */
1204        libtrace->perpkt_threads = calloc(sizeof(libtrace_thread_t), libtrace->perpkt_thread_count);
1205        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1206                libtrace_thread_t *t = &libtrace->perpkt_threads[i];
1207                t->trace = libtrace;
1208                t->ret = NULL;
1209                t->type = THREAD_PERPKT;
1210                t->state = THREAD_RUNNING;
1211                t->user_data = NULL;
1212                // t->tid DONE on create
1213                t->perpkt_num = i;
1214                if (libtrace->hasher)
1215                        libtrace_ringbuffer_init(&t->rbuffer, libtrace->perpkt_buffer_size, LIBTRACE_RINGBUFFER_BLOCKING);
1216                // Depending on the mode vector or deque might be chosen
1217                libtrace_vector_init(&t->vector, sizeof(libtrace_result_t));
1218                libtrace_deque_init(&t->deque, sizeof(libtrace_result_t));
1219                libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
1220                t->tmp_key = 0;
1221                t->tmp_data = NULL;
1222                t->recorded_first = false;
1223                assert(pthread_spin_init(&t->tmp_spinlock, 0) == 0);
1224                t->tracetime_offset_usec = 0;;
1225        }
1226
1227        int threads_started = 0;
1228        /* Setup the trace and start our threads */
1229        if (libtrace->perpkt_thread_count > 1 && trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
1230                printf("This format has direct support for p's\n");
1231                threads_started = libtrace->format->pstart_input(libtrace);
1232        } else {
1233                if (libtrace->format->start_input) {
1234                        threads_started=libtrace->format->start_input(libtrace);
1235                }
1236        }
1237        if (threads_started == 0)
1238                threads_started = trace_start_perpkt_threads(libtrace);
1239
1240        if (libtrace->tick_interval > 0) {
1241                libtrace->keepalive_thread.type = THREAD_KEEPALIVE;
1242                libtrace->keepalive_thread.state = THREAD_RUNNING;
1243                libtrace_message_queue_init(&libtrace->keepalive_thread.messages, sizeof(libtrace_message_t));
1244                assert(pthread_create(&libtrace->keepalive_thread.tid, NULL, keepalive_entry, (void *) libtrace) == 0);
1245        }
1246
1247        // Revert back - Allow signals again
1248        assert(pthread_sigmask(SIG_SETMASK, &sig_before, NULL) == 0);
1249        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1250
1251        if (threads_started < 0)
1252                // Error
1253                return threads_started;
1254
1255        // TODO fix these leaks etc
1256        if (libtrace->perpkt_thread_count != threads_started)
1257                printf("Warning started threads not equal requested s=%d r=%d", threads_started, libtrace->perpkt_thread_count);
1258
1259
1260        return 0;
1261}
1262
1263/**
1264 * Pauses a trace, this should only be called by the main thread
1265 * 1. Set started = false
1266 * 2. All perpkt threads are paused waiting on a condition var
1267 * 3. Then call ppause on the underlying format if found
1268 * 4. Return with perpkts_pausing set to perpkt_count (Used when restarting so we reuse the threads)
1269 *
1270 * Once done you should be able to modify the trace setup and call pstart again
1271 * TODO handle changing thread numbers
1272 */
1273DLLEXPORT int trace_ppause(libtrace_t *libtrace)
1274{
1275        libtrace_thread_t *t;
1276        int i;
1277        assert(libtrace);
1278        if (!libtrace->started || libtrace->state != STATE_RUNNING) {
1279                fprintf(stderr, "pause failed started=%d state=%s (%d)\n", libtrace->started, get_trace_state_name(libtrace->state), libtrace->state);
1280                trace_set_err(libtrace,TRACE_ERR_BAD_STATE, "You must call trace_start() before calling trace_ppause()");
1281                return -1;
1282        }
1283
1284        t = get_thread_table(libtrace);
1285
1286        // Set pausing
1287        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1288        libtrace->state = STATE_PAUSING;
1289        pthread_cond_broadcast(&libtrace->perpkt_cond);
1290        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1291
1292        // Special case handle the hasher thread case
1293        if (trace_has_dedicated_hasher(libtrace)) {
1294                fprintf(stderr, "Hasher thread running we deal with this special!\n");
1295                libtrace_message_t message = {0};
1296                message.code = MESSAGE_DO_PAUSE;
1297                trace_send_message_to_thread(libtrace, &libtrace->hasher_thread, &message);
1298                // Wait for it to pause
1299                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1300                while (1 != libtrace->perpkts_pausing) {
1301                        assert(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock) == 0);
1302                }
1303                libtrace->perpkts_pausing--; // Do this on the hasher's behalf
1304                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1305        }
1306
1307        fprintf(stderr, "Sending messages \n");
1308        // Stop threads, skip this one if its a perpkt
1309        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1310                if (&libtrace->perpkt_threads[i] != t) {
1311                        libtrace_message_t message = {0};
1312                        message.code = MESSAGE_DO_PAUSE;
1313                        trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message);
1314                        if(trace_has_dedicated_hasher(libtrace)) {
1315                                // The hasher has stopped and other threads have messages waiting therefore
1316                                // If the queues are empty the other threads would have no data
1317                                // So send some NULL packets to simply ask the threads to check there message queues
1318                                // We are the only writer since hasher has paused
1319                                libtrace_ringbuffer_write(&libtrace->perpkt_threads[i].rbuffer, NULL);
1320                        }
1321                } else {
1322                        fprintf(stderr, "Mapper threads should not be used to pause a trace this could cause any number of problems!!\n");
1323                }
1324        }
1325
1326        // Formats must support native message handling if a message is ready
1327        // Approach per Perry's suggestion is a non-blocking read
1328        // followed by a blocking read. XXX STRIP THIS OUT
1329
1330        if (t) {
1331                // A perpkt is doing the pausing interesting fake a extra thread paused
1332                // We rely on the user to not return before starting the trace again
1333                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1334                libtrace->perpkts_pausing++;
1335                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1336        }
1337
1338        fprintf(stderr, "Asking threads to pause\n");
1339
1340        // Wait for all threads to pause
1341        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1342        while (libtrace->perpkt_thread_count != libtrace->perpkts_pausing) {
1343                assert(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock) == 0);
1344        }
1345        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1346
1347        fprintf(stderr, "Threads have paused\n");
1348
1349        if (trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
1350                libtrace->started = false;
1351                if (libtrace->format->ppause_input)
1352                        libtrace->format->ppause_input(libtrace);
1353                // TODO What happens if we don't have pause input??
1354        } else {
1355                int err;
1356                fprintf(stderr, "Trace is not parallel so we are doing a normal pause %s\n", libtrace->uridata);
1357                err = trace_pause(libtrace);
1358                // We should handle this a bit better
1359                if (err)
1360                        return err;
1361        }
1362
1363        // Only set as paused after the pause has been called on the trace
1364        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1365        libtrace->state = STATE_PAUSED;
1366        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1367        return 0;
1368}
1369
1370/**
1371 * Stop trace finish prematurely as though it meet an EOF
1372 * This should only be called by the main thread
1373 * 1. Calls ppause
1374 * 2. Sends a message asking for threads to finish
1375 *
1376 */
1377DLLEXPORT int trace_pstop(libtrace_t *libtrace)
1378{
1379        int i;
1380        libtrace_message_t message = {0};
1381        assert(libtrace);
1382        if (!libtrace->started) {
1383                trace_set_err(libtrace,TRACE_ERR_BAD_STATE, "You must call trace_start() before calling trace_pstop()");
1384                return -1;
1385        }
1386
1387        // Ensure all threads have paused and the underlying trace format has
1388        // been closed and all packets associated are cleaned up
1389        trace_ppause(libtrace);
1390
1391        // Now send a message asking the threads to stop
1392        // This will be retrieved before trying to read another packet
1393       
1394        message.code = MESSAGE_DO_STOP;
1395        trace_send_message_to_perpkts(libtrace, &message);
1396        if (trace_has_dedicated_hasher(libtrace))
1397                trace_send_message_to_thread(libtrace, &libtrace->hasher_thread, &message);
1398       
1399        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1400                trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message);
1401        }
1402
1403        // Now release the threads and let them stop
1404        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1405        libtrace->state = STATE_FINSHED;
1406        assert(pthread_cond_broadcast(&libtrace->perpkt_cond) == 0);
1407        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1408        return 0;
1409}
1410
1411/**
1412 * Set the hasher type along with a selected function, if hardware supports
1413 * that generic type of hashing it will be used otherwise the supplied
1414 * hasher function will be used and passed data when called.
1415 *
1416 * @return 0 if successful otherwise -1 on error
1417 */
1418DLLEXPORT int trace_set_hasher(libtrace_t *trace, enum hasher_types type, fn_hasher hasher, void *data) {
1419        int ret = -1;
1420        if (type == HASHER_HARDWARE || (type == HASHER_CUSTOM && !hasher) || (type == HASHER_BALANCE && hasher)) {
1421                return -1;
1422        }
1423
1424        // Save the requirements
1425        trace->hasher_type = type;
1426        if (hasher) {
1427                trace->hasher = hasher;
1428                trace->hasher_data = data;
1429        } else {
1430                trace->hasher = NULL;
1431                // TODO consider how to handle freeing this
1432                trace->hasher_data = NULL;
1433        }
1434
1435        // Try push this to hardware - NOTE hardware could do custom if
1436        // there is a more efficient way to apply it, in this case
1437        // it will simply grab the function out of libtrace_t
1438        if (trace->format->pconfig_input)
1439                ret = trace->format->pconfig_input(trace, TRACE_OPTION_SET_HASHER, &type);
1440
1441        if (ret == -1) {
1442                // We have to deal with this ourself
1443                // This most likely means single threaded reading of the trace
1444                if (!hasher) {
1445                        switch (type)
1446                        {
1447                                case HASHER_CUSTOM:
1448                                case HASHER_BALANCE:
1449                                        return 0;
1450                                case HASHER_BIDIRECTIONAL:
1451                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
1452                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
1453                                        toeplitz_init_config(trace->hasher_data, 1);
1454                                        return 0;
1455                                case HASHER_UNIDIRECTIONAL:
1456                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
1457                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
1458                                        toeplitz_init_config(trace->hasher_data, 0);
1459                                        return 0;
1460                                case HASHER_HARDWARE:
1461                                        return -1;
1462                        }
1463                        return -1;
1464                }
1465        } else {
1466                // The hardware is dealing with this yay
1467                trace->hasher_type = HASHER_HARDWARE;
1468        }
1469
1470        return 0;
1471}
1472
1473// Waits for all threads to finish
1474DLLEXPORT void trace_join(libtrace_t *libtrace) {
1475        int i;
1476
1477        /* Firstly wait for the perpkt threads to finish, since these are
1478         * user controlled */
1479        for (i=0; i< libtrace->perpkt_thread_count; i++) {
1480                //printf("Waiting to join with perpkt #%d\n", i);
1481                assert(pthread_join(libtrace->perpkt_threads[i].tid, NULL) == 0);
1482                //printf("Joined with perpkt #%d\n", i);
1483                // So we must do our best effort to empty the queue - so
1484                // the producer (or any other threads) don't block.
1485                libtrace_packet_t * packet;
1486                // Mark that we are no longer accepting packets
1487                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1488                libtrace->perpkt_threads[i].state = THREAD_FINISHED; // Important we are finished before we empty the buffer
1489                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1490                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
1491                        if (packet) // This could be NULL iff the perpkt finishes early
1492                                trace_destroy_packet(packet);
1493        }
1494
1495        /* Now the hasher */
1496        // XXX signal it to stop
1497        if (trace_has_dedicated_hasher(libtrace)) {
1498                fprintf(stderr, "Waiting to join with the hasher\n");
1499                pthread_join(libtrace->hasher_thread.tid, NULL);
1500                fprintf(stderr, "Joined with with the hasher\n");
1501                libtrace->hasher_thread.state = THREAD_FINISHED;
1502        }
1503
1504        // Now that everything is finished nothing can be touching our
1505        // buffers so clean them up
1506        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1507                // Its possible 1 packet got added by the reducer (or 1 per any other thread) since we cleaned up
1508                // if they lost timeslice before-during a write
1509                libtrace_packet_t * packet;
1510                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
1511                        trace_destroy_packet(packet);
1512                if (libtrace->hasher) {
1513                        assert(libtrace_ringbuffer_is_empty(&libtrace->perpkt_threads[i].rbuffer));
1514                        libtrace_ringbuffer_destroy(&libtrace->perpkt_threads[i].rbuffer);
1515                }
1516                // Cannot destroy vector yet, this happens with trace_destroy
1517        }
1518        libtrace->state = STATE_FINSHED;
1519       
1520        // Wait for the tick (keepalive) thread if its been started
1521        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE && libtrace->keepalive_thread.state == THREAD_RUNNING) {
1522                libtrace_message_t msg = {0};
1523                msg.code = MESSAGE_DO_STOP;
1524                fprintf(stderr, "Waiting to join with the keepalive\n");
1525                trace_send_message_to_thread(libtrace, &libtrace->keepalive_thread, &msg);
1526                pthread_join(libtrace->keepalive_thread.tid, NULL);
1527                fprintf(stderr, "Joined with with the keepalive\n");
1528        }
1529        // Lets mark this as done for now
1530        libtrace->state = STATE_JOINED;
1531}
1532
1533DLLEXPORT int libtrace_thread_get_message_count(libtrace_t * libtrace)
1534{
1535        libtrace_thread_t * t = get_thread_descriptor(libtrace);
1536        assert(t);
1537        return libtrace_message_queue_count(&t->messages);
1538}
1539
1540DLLEXPORT int libtrace_thread_get_message(libtrace_t * libtrace, libtrace_message_t * message)
1541{
1542        libtrace_thread_t * t = get_thread_descriptor(libtrace);
1543        assert(t);
1544        return libtrace_message_queue_get(&t->messages, message);
1545}
1546
1547DLLEXPORT int libtrace_thread_try_get_message(libtrace_t * libtrace, libtrace_message_t * message)
1548{
1549        libtrace_thread_t * t = get_thread_descriptor(libtrace);
1550        assert(t);
1551        return libtrace_message_queue_try_get(&t->messages, message);
1552}
1553
1554/**
1555 * Return backlog indicator
1556 */
1557DLLEXPORT int trace_post_reduce(libtrace_t *libtrace)
1558{
1559        libtrace_message_t message = {0};
1560        message.code = MESSAGE_POST_REDUCE;
1561        message.sender = get_thread_descriptor(libtrace);
1562        return libtrace_message_queue_put(&libtrace->reducer_thread.messages, (void *) &message);
1563}
1564
1565/**
1566 * Return backlog indicator
1567 */
1568DLLEXPORT int trace_send_message_to_reducer(libtrace_t * libtrace, libtrace_message_t * message)
1569{
1570        //printf("Sending message code=%d to reducer\n", message->code);
1571        message->sender = get_thread_descriptor(libtrace);
1572        return libtrace_message_queue_put(&libtrace->reducer_thread.messages, message);
1573}
1574
1575/**
1576 *
1577 */
1578DLLEXPORT int trace_send_message_to_thread(libtrace_t * libtrace, libtrace_thread_t *t, libtrace_message_t * message)
1579{
1580        //printf("Sending message code=%d to reducer\n", message->code);
1581        message->sender = get_thread_descriptor(libtrace);
1582        return libtrace_message_queue_put(&t->messages, message);
1583}
1584
1585DLLEXPORT int trace_send_message_to_perpkts(libtrace_t * libtrace, libtrace_message_t * message)
1586{
1587        int i;
1588        message->sender = get_thread_descriptor(libtrace);
1589        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1590                libtrace_message_queue_put(&libtrace->perpkt_threads[i].messages, message);
1591        }
1592        //printf("Sending message code=%d to reducer\n", message->code);
1593        return 0;
1594}
1595
1596DLLEXPORT void libtrace_result_set_key(libtrace_result_t * result, uint64_t key) {
1597        result->key = key;
1598}
1599DLLEXPORT uint64_t libtrace_result_get_key(libtrace_result_t * result) {
1600        return result->key;
1601}
1602DLLEXPORT void libtrace_result_set_value(libtrace_result_t * result, void * value) {
1603        result->value = value;
1604}
1605DLLEXPORT void* libtrace_result_get_value(libtrace_result_t * result) {
1606        return result->value;
1607}
1608DLLEXPORT void libtrace_result_set_key_value(libtrace_result_t * result, uint64_t key, void * value) {
1609        result->key = key;
1610        result->value = value;
1611}
1612DLLEXPORT void trace_destroy_result(libtrace_result_t ** result) {
1613        free(*result);
1614        result = NULL;
1615        // TODO automatically back with a free list!!
1616}
1617
1618DLLEXPORT void * trace_get_global(libtrace_t *trace)
1619{
1620        return trace->global_blob;
1621}
1622
1623DLLEXPORT void * trace_set_global(libtrace_t *trace, void * data)
1624{
1625        if (trace->global_blob && trace->global_blob != data) {
1626                void * ret = trace->global_blob;
1627                trace->global_blob = data;
1628                return ret;
1629        } else {
1630                trace->global_blob = data;
1631                return NULL;
1632        }
1633}
1634
1635DLLEXPORT void * trace_get_tls(libtrace_thread_t *t)
1636{
1637        return t->user_data;
1638}
1639
1640DLLEXPORT void * trace_set_tls(libtrace_thread_t *t, void * data)
1641{
1642        if(t->user_data && t->user_data != data) {
1643                void *ret = t->user_data;
1644                t->user_data = data;
1645                return ret;
1646        } else {
1647                t->user_data = data;
1648                return NULL;
1649        }
1650}
1651
1652/**
1653 * Publish to the reduce queue, return
1654 */
1655DLLEXPORT void trace_publish_result(libtrace_t *libtrace, uint64_t key, void * value) {
1656        libtrace_result_t res;
1657        res.is_packet = 0;
1658        // Who am I???
1659        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
1660        libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
1661        // Now put it into my table
1662        UNUSED static __thread int count = 0;
1663
1664
1665        libtrace_result_set_key_value(&res, key, value);
1666        /*
1667        if (count == 1)
1668                printf("My vector size is %d\n", libtrace_vector_get_size(&t->vector));
1669        count = (count+1) %1000;
1670        libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :)
1671        */
1672        /*if (count == 1)
1673                printf("My vector size is %d\n", libtrace_deque_get_size(&t->deque));
1674        count = (count+1)%1000;*/
1675        if (libtrace->reducer_flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED)) {
1676                if (libtrace_deque_get_size(&t->deque) >= 800) {
1677                        trace_post_reduce(libtrace);
1678                }
1679                //while (libtrace_deque_get_size(&t->deque) >= 1000)
1680                //      sched_yield();
1681                libtrace_deque_push_back(&t->deque, &res); // Automatically locking for us :)
1682        } else {
1683                //while (libtrace_vector_get_size(&t->vector) >= 1000)
1684                //      sched_yield();
1685
1686                if (libtrace_vector_get_size(&t->vector) >= 800) {
1687                        trace_post_reduce(libtrace);
1688                }
1689                libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :)
1690        }
1691}
1692
1693DLLEXPORT void trace_publish_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
1694        libtrace_result_t res;
1695        // Who am I???
1696        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
1697        libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
1698        // Now put it into my table
1699        UNUSED static __thread int count = 0;
1700
1701        res.is_packet = 1;
1702        libtrace_result_set_key_value(&res, trace_packet_get_order(packet), packet);
1703        /*
1704        if (count == 1)
1705                printf("My vector size is %d\n", libtrace_vector_get_size(&t->vector));
1706        count = (count+1) %1000;
1707        libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :)
1708        */
1709        /*if (count == 1)
1710                printf("My vector size is %d\n", libtrace_deque_get_size(&t->deque));
1711        count = (count+1)%1000;*/
1712        if (libtrace->reducer_flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED)) {
1713                if (libtrace_deque_get_size(&t->deque) >= 800) {
1714                        trace_post_reduce(libtrace);
1715                }
1716                //while (libtrace_deque_get_size(&t->deque) >= 1000)
1717                //      sched_yield();
1718                libtrace_deque_push_back(&t->deque, &res); // Automatically locking for us :)
1719        } else {
1720                //while (libtrace_vector_get_size(&t->vector) >= 1000)
1721                //      sched_yield();
1722
1723                if (libtrace_vector_get_size(&t->vector) >= 800) {
1724                        trace_post_reduce(libtrace);
1725                }
1726                libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :)
1727        }
1728}
1729
1730
1731static int compareres(const void* p1, const void* p2)
1732{
1733        if (libtrace_result_get_key((libtrace_result_t *) p1) < libtrace_result_get_key((libtrace_result_t *) p2))
1734                return -1;
1735        if (libtrace_result_get_key((libtrace_result_t *) p1) == libtrace_result_get_key((libtrace_result_t *) p2))
1736                return 0;
1737        else
1738                return 1;
1739}
1740
1741DLLEXPORT int trace_get_results(libtrace_t *libtrace, libtrace_vector_t * results) {
1742        int i;
1743        int flags = libtrace->reducer_flags; // Hint these aren't a changing
1744
1745        libtrace_vector_empty(results);
1746
1747        /* Here we assume queues are in order ascending order and they want
1748         * the smallest result first. If they are not in order the results
1749         * may not be in order.
1750         */
1751        if (flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED)) {
1752                int live_count = 0;
1753                bool live[libtrace->perpkt_thread_count]; // Set if a trace is alive
1754                uint64_t key[libtrace->perpkt_thread_count]; // Cached keys
1755                uint64_t min_key = UINT64_MAX; // XXX use max int here stdlimit.h?
1756                int min_queue = -1;
1757
1758                /* Loop through check all are alive (have data) and find the smallest */
1759                for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1760                        libtrace_queue_t *v = &libtrace->perpkt_threads[i].deque;
1761                        if (libtrace_deque_get_size(v) != 0) {
1762                                libtrace_result_t r;
1763                                libtrace_deque_peek_front(v, (void *) &r);
1764                                live_count++;
1765                                live[i] = 1;
1766                                key[i] = libtrace_result_get_key(&r);
1767                                if (i==0 || min_key > key[i]) {
1768                                        min_key = key[i];
1769                                        min_queue = i;
1770                                }
1771                        } else {
1772                                live[i] = 0;
1773                        }
1774                }
1775
1776                /* Now remove the smallest and loop - special case if all threads have joined we always flush whats left */
1777                while ((live_count == libtrace->perpkt_thread_count) || (live_count &&
1778                                ((flags & REDUCE_SEQUENTIAL && min_key == libtrace->expected_key) ||
1779                                libtrace->state == STATE_JOINED))) {
1780                        /* Get the minimum queue and then do stuff */
1781                        libtrace_result_t r;
1782
1783                        assert (libtrace_deque_pop_front(&libtrace->perpkt_threads[min_queue].deque, (void *) &r) == 1);
1784                        libtrace_vector_push_back(results, &r);
1785
1786                        // We expect the key we read +1 now
1787                        libtrace->expected_key = key[min_queue] + 1;
1788
1789                        // Now update the one we just removed
1790                        if (libtrace_deque_get_size(&libtrace->perpkt_threads[min_queue].deque) )
1791                        {
1792                                libtrace_deque_peek_front(&libtrace->perpkt_threads[min_queue].deque, (void *) &r);
1793                                key[min_queue] = libtrace_result_get_key(&r);
1794                                if (key[min_queue] <= min_key) {
1795                                        // We are still the smallest, might be out of order though :(
1796                                        min_key = key[min_queue];
1797                                } else {
1798                                        min_key = key[min_queue]; // Update our minimum
1799                                        // Check all find the smallest again - all are alive
1800                                        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1801                                                if (live[i] && min_key > key[i]) {
1802                                                        min_key = key[i];
1803                                                        min_queue = i;
1804                                                }
1805                                        }
1806                                }
1807                        } else {
1808                                live[min_queue] = 0;
1809                                live_count--;
1810                                min_key = UINT64_MAX; // Update our minimum
1811                                // Check all find the smallest again - all are alive
1812                                for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1813                                        // Still not 100% TODO (what if order is wrong or not increasing)
1814                                        if (live[i] && min_key >= key[i]) {
1815                                                min_key = key[i];
1816                                                min_queue = i;
1817                                        }
1818                                }
1819                        }
1820                }
1821        } else { // Queues are not in order - return all results in the queue
1822                for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1823                        libtrace_vector_append(results, &libtrace->perpkt_threads[i].vector);
1824                }
1825                if (flags & REDUCE_SORT) {
1826                        qsort(results->elements, results->size, results->element_size, &compareres);
1827                }
1828        }
1829        return libtrace_vector_get_size(results);
1830}
1831
1832DLLEXPORT uint64_t trace_packet_get_order(libtrace_packet_t * packet) {
1833        return packet->order;
1834}
1835
1836DLLEXPORT uint64_t trace_packet_get_hash(libtrace_packet_t * packet) {
1837        return packet->hash;
1838}
1839
1840DLLEXPORT void trace_packet_set_order(libtrace_packet_t * packet, uint64_t order) {
1841        packet->order = order;
1842}
1843
1844DLLEXPORT void trace_packet_set_hash(libtrace_packet_t * packet, uint64_t hash) {
1845        packet->hash = hash;
1846}
1847
1848DLLEXPORT int trace_finished(libtrace_t * libtrace) {
1849        int i;
1850        int b = 0;
1851        // TODO I don't like using this so much
1852        //assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1853        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1854                if (libtrace->perpkt_threads[i].state == THREAD_RUNNING)
1855                        b++;
1856        }
1857        //assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1858        return !b;
1859}
1860
1861DLLEXPORT int trace_parallel_config(libtrace_t *libtrace, trace_parallel_option_t option, void *value)
1862{
1863        UNUSED int ret = -1;
1864        switch (option) {
1865                case TRACE_OPTION_TICK_INTERVAL:
1866                        libtrace->tick_interval = *((int *) value);
1867                        return 1;
1868                case TRACE_OPTION_SET_HASHER:
1869                        return trace_set_hasher(libtrace, (enum hasher_types) *((int *) value), NULL, NULL);
1870                case TRACE_OPTION_SET_PERPKT_BUFFER_SIZE:
1871                        libtrace->perpkt_buffer_size = *((int *) value);
1872                        return 1;
1873                case TRACE_OPTION_SET_PACKET_FREELIST_SIZE:
1874                        libtrace->packet_freelist_size = *((int *) value);
1875                        return 1;
1876                case TRACE_OPTION_SET_PERPKT_THREAD_COUNT:
1877                        libtrace->perpkt_thread_count = *((int *) value);
1878                        return 1;
1879                case TRACE_DROP_OUT_OF_ORDER:
1880                        if (*((int *) value))
1881                                libtrace->reducer_flags |= REDUCE_DROP_OOO;
1882                        else
1883                                libtrace->reducer_flags &= ~REDUCE_DROP_OOO;
1884                        return 1;
1885                case TRACE_OPTION_SEQUENTIAL:
1886                        if (*((int *) value))
1887                                libtrace->reducer_flags |= REDUCE_SEQUENTIAL;
1888                        else
1889                                libtrace->reducer_flags &= ~REDUCE_SEQUENTIAL;
1890                        return 1;
1891                case TRACE_OPTION_ORDERED:
1892                        if (*((int *) value))
1893                                libtrace->reducer_flags |= REDUCE_ORDERED;
1894                        else
1895                                libtrace->reducer_flags &= ~REDUCE_ORDERED;
1896                        return 1;
1897                case TRACE_OPTION_USE_DEDICATED_HASHER:
1898                        if (*((int *) value))
1899                                libtrace->hasher_thread.type = THREAD_HASHER;
1900                        else
1901                                libtrace->hasher_thread.type = THREAD_EMPTY;
1902                        return 1;
1903                case TRACE_OPTION_USE_SLIDING_WINDOW_BUFFER:
1904                        if (*((int *) value))
1905                                libtrace->reducer_flags |= PERPKT_USE_SLIDING_WINDOW;
1906                        else
1907                                libtrace->reducer_flags &= ~PERPKT_USE_SLIDING_WINDOW;
1908                        return 1;
1909                case TRACE_OPTION_TRACETIME:
1910                        if(*((int *) value))
1911                                libtrace->tracetime = 1;
1912                        else
1913                                libtrace->tracetime = 0;
1914                        return 0;
1915        }
1916        return 0;
1917}
1918
1919DLLEXPORT libtrace_packet_t* trace_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet) {
1920        libtrace_packet_t* result;
1921        if (!libtrace_ringbuffer_try_sread_bl(&libtrace->packet_freelist, (void **) &result))
1922                result = trace_create_packet();
1923        assert(result);
1924        swap_packets(result, packet); // Move the current packet into our copy
1925        return result;
1926}
1927
1928DLLEXPORT void trace_free_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet) {
1929        // Try write back the packet
1930        assert(packet);
1931        // Always release any resources this might be holding such as a slot in a ringbuffer
1932        trace_fin_packet(packet);
1933        if (!libtrace_ringbuffer_try_swrite_bl(&libtrace->packet_freelist, packet)) {
1934                /* We couldn't, oh well lets just destroy it - XXX consider non managed formats i.e. rings buffers loosing packets and jamming up :( */
1935                //assert(1 == 90);
1936                trace_destroy_packet(packet);
1937        }
1938}
1939
1940DLLEXPORT libtrace_info_t *trace_get_information(libtrace_t * libtrace) {
1941        if (libtrace->format)
1942                return &libtrace->format->info;
1943        else
1944                return NULL;
1945}
Note: See TracBrowser for help on using the repository browser.