source: lib/trace_parallel.c @ 60e8e86

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 60e8e86 was 60e8e86, checked in by Richard Sanger <rsangerarj@…>, 8 years ago

Add in a variable libtrace_parallel, first set when using the parallel functions. This ensures backwards compatibility with trace_destroy() calling fin_packet(), which will can segfault if the trace is already destroyed first like many existing programs do.
This means the tests pass again.

  • Property mode set to 100644
File size: 59.4 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * All rights reserved.
8 *
9 * This code has been developed by the University of Waikato WAND
10 * research group. For further information please see http://www.wand.net.nz/
11 *
12 * libtrace is free software; you can redistribute it and/or modify
13 * it under the terms of the GNU General Public License as published by
14 * the Free Software Foundation; either version 2 of the License, or
15 * (at your option) any later version.
16 *
17 * libtrace is distributed in the hope that it will be useful,
18 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 * GNU General Public License for more details.
21 *
22 * You should have received a copy of the GNU General Public License
23 * along with libtrace; if not, write to the Free Software
24 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25 *
26 * $Id$
27 *
28 */
29
30
31#define _GNU_SOURCE
32#include "common.h"
33#include "config.h"
34#include <assert.h>
35#include <errno.h>
36#include <fcntl.h>
37#include <stdio.h>
38#include <stdlib.h>
39#include <string.h>
40#include <sys/stat.h>
41#include <sys/types.h>
42#ifndef WIN32
43#include <sys/socket.h>
44#endif
45#include <stdarg.h>
46#include <sys/param.h>
47
48#ifdef HAVE_LIMITS_H
49#  include <limits.h>
50#endif
51
52#ifdef HAVE_SYS_LIMITS_H
53#  include <sys/limits.h>
54#endif
55
56#ifdef HAVE_NET_IF_ARP_H
57#  include <net/if_arp.h>
58#endif
59
60#ifdef HAVE_NET_IF_H
61#  include <net/if.h>
62#endif
63
64#ifdef HAVE_NETINET_IN_H
65#  include <netinet/in.h>
66#endif
67
68#ifdef HAVE_NET_ETHERNET_H
69#  include <net/ethernet.h>
70#endif
71
72#ifdef HAVE_NETINET_IF_ETHER_H
73#  include <netinet/if_ether.h>
74#endif
75
76#include <time.h>
77#ifdef WIN32
78#include <sys/timeb.h>
79#endif
80
81#include "libtrace.h"
82#include "libtrace_int.h"
83
84#ifdef HAVE_PCAP_BPF_H
85#  include <pcap-bpf.h>
86#else
87#  ifdef HAVE_NET_BPF_H
88#    include <net/bpf.h>
89#  endif
90#endif
91
92
93#include "libtrace_int.h"
94#include "format_helper.h"
95#include "rt_protocol.h"
96#include "hash_toeplitz.h"
97
98#include <pthread.h>
99#include <signal.h>
100
101
102extern int libtrace_parallel;
103
104struct multithreading_stats {
105        uint64_t full_queue_hits;
106        uint64_t wait_for_fill_complete_hits;
107} contention_stats[1024];
108
109
110/**
111 * @return True if the format supports parallel threads.
112 */
113static inline bool trace_supports_parallel(libtrace_t *trace)
114{
115        assert(trace);
116        assert(trace->format);
117        if (trace->format->pstart_input)
118                return true;
119        else
120                return false;
121        //return trace->format->pstart_input;
122}
123
124DLLEXPORT void print_contention_stats(libtrace_t *libtrace) {
125        int i;
126        struct multithreading_stats totals = {0};
127        for (i = 0; i < libtrace->mapper_thread_count ; i++) {
128                printf("\nStats for mapper thread#%d\n", i);
129                printf("\tfull_queue_hits: %"PRIu64"\n", contention_stats[i].full_queue_hits);
130                totals.full_queue_hits += contention_stats[i].full_queue_hits;
131                printf("\twait_for_fill_complete_hits: %"PRIu64"\n", contention_stats[i].wait_for_fill_complete_hits);
132                totals.wait_for_fill_complete_hits += contention_stats[i].wait_for_fill_complete_hits;
133        }
134        printf("\nTotals for mapper threads\n");
135        printf("\tfull_queue_hits: %"PRIu64"\n", totals.full_queue_hits);
136        printf("\twait_for_fill_complete_hits: %"PRIu64"\n", totals.wait_for_fill_complete_hits);
137
138        return;
139}
140
141inline void libtrace_zero_thread(libtrace_thread_t * t) {
142        t->trace = NULL;
143        t->ret = NULL;
144        t->type = THREAD_EMPTY;
145        libtrace_zero_ringbuffer(&t->rbuffer);
146        libtrace_zero_vector(&t->vector);
147        libtrace_zero_deque(&t->deque);
148        t->recorded_first = false;
149        t->map_num = -1;
150}
151
152// Ints are aligned int is atomic so safe to read and write at same time
153// However write must be locked, read doesn't (We never try read before written to table)
154libtrace_thread_t * get_thread_table(libtrace_t *libtrace) {
155        int i = 0;
156        pthread_t tid = pthread_self();
157
158        for (;i<libtrace->mapper_thread_count ;++i) {
159                if (pthread_equal(tid, libtrace->mapper_threads[i].tid))
160                        return &libtrace->mapper_threads[i];
161        }
162        return NULL;
163}
164
165int get_thread_table_num(libtrace_t *libtrace);
166DLLEXPORT int get_thread_table_num(libtrace_t *libtrace) {
167        int i = 0;
168        pthread_t tid = pthread_self();
169        for (;i<libtrace->mapper_thread_count; ++i) {
170                if (pthread_equal(tid, libtrace->mapper_threads[i].tid))
171                        return i;
172        }
173        return -1;
174}
175
176static libtrace_thread_t * get_thread_descriptor(libtrace_t *libtrace) {
177        libtrace_thread_t *ret;
178        if (!(ret = get_thread_table(libtrace))) {
179                pthread_t tid = pthread_self();
180                // Check if we are reducer or something else
181                if (pthread_equal(tid, libtrace->reducer_thread.tid))
182                        ret = &libtrace->reducer_thread;
183                else if (pthread_equal(tid, libtrace->hasher_thread.tid))
184                        ret = &libtrace->hasher_thread;
185                else
186                        ret = NULL;
187        }
188        return ret;
189}
190
191/**
192 * Holds threads in a paused state, until released by broadcasting
193 * the condition mutex.
194 */
195static void trace_thread_pause(libtrace_t *trace) {
196        printf("Pausing thread #%d\n", get_thread_table_num(trace));
197        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
198        trace->perpkt_pausing++;
199        pthread_cond_broadcast(&trace->perpkt_cond);
200        while (!trace->started) {
201                assert(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock) == 0);
202        }
203        trace->perpkt_pausing--;
204        pthread_cond_broadcast(&trace->perpkt_cond);
205        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
206        printf("Releasing thread #%d\n", get_thread_table_num(trace));
207}
208
209void* mapper_start(void *data) {
210        libtrace_t *trace = (libtrace_t *)data;
211        libtrace_thread_t * t;
212        libtrace_message_t message;
213        libtrace_packet_t *packet = NULL;
214
215        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
216        t = get_thread_table(trace);
217        assert(t);
218        //printf("Yay Started Mapper thread #%d\n", (int) get_thread_table_num(trace));
219        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
220
221        /* ~~~~~~~~~~~ Setup complete now we loop ~~~~~~~~~~~~~~~ */
222        // Send a message to say we've started
223
224        message.code = MESSAGE_STARTED;
225        message.sender = t;
226        message.additional = NULL;
227
228        // Let the per_packet function know we have started
229        (*trace->per_pkt)(trace, NULL, &message, t);
230
231
232        for (;;) {
233                int psize;
234
235                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
236                        switch (message.code) {
237                                case MESSAGE_PAUSE:
238                                        trace_thread_pause(trace);
239                                        break;
240                                case MESSAGE_STOP:
241                                        goto stop;
242                        }
243                        (*trace->per_pkt)(trace, NULL, &message, t);
244                        continue;
245                }
246
247                if (trace->mapper_thread_count == 1) {
248                        if (!packet) {
249                                if (!libtrace_ringbuffer_try_sread_bl(&trace->packet_freelist, (void **) &packet))
250                                        packet = trace_create_packet();
251                        }
252                        assert(packet);
253                        if ((psize = trace_read_packet(trace, packet)) <1) {
254                                break;
255                        }
256                } else {
257                        psize = trace_pread_packet(trace, &packet);
258                }
259
260                if (psize > 0) {
261                        packet = (*trace->per_pkt)(trace, packet, NULL, t);
262                        continue;
263                }
264
265                if (psize == -2)
266                        continue; // We have a message
267
268                if (psize < 1) { // consider sending a message
269                        break;
270                }
271
272        }
273
274
275stop:
276        /* ~~~~~~~~~~~~~~ Trace is finished do tear down ~~~~~~~~~~~~~~~~~~~~~ */
277        // Let the per_packet function know we have stopped
278        message.code = MESSAGE_STOPPED;
279        message.sender = message.additional = NULL;
280        (*trace->per_pkt)(trace, NULL, &message, t);
281
282        // And we're at the end free the memories
283        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
284        t->state = THREAD_FINISHED;
285        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
286
287        // Notify only after we've defiantly set the state to finished
288        message.code = MESSAGE_MAPPER_ENDED;
289        message.additional = NULL;
290        trace_send_message_to_reducer(trace, &message);
291
292        pthread_exit(NULL);
293};
294
295/** True if trace has dedicated hasher thread otherwise false */
296inline int trace_has_dedicated_hasher(libtrace_t * libtrace);
297inline int trace_has_dedicated_hasher(libtrace_t * libtrace)
298{
299        return libtrace->hasher_thread.type == THREAD_HASHER;
300}
301
302/**
303 * The start point for our single threaded hasher thread, this will read
304 * and hash a packet from a data source and queue it against the correct
305 * core to process it.
306 */
307static void* hasher_start(void *data) {
308        libtrace_t *trace = (libtrace_t *)data;
309        libtrace_thread_t * t;
310        int i;
311        libtrace_packet_t * packet;
312
313        assert(trace_has_dedicated_hasher(trace));
314        /* Wait until all threads are started and objects are initialised (ring buffers) */
315        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
316        t = &trace->hasher_thread;
317        assert(t->type == THREAD_HASHER && pthread_equal(pthread_self(), t->tid));
318        printf("Hasher Thread started\n");
319        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
320        int pkt_skipped = 0;
321        /* Read all packets in then hash and queue against the correct thread */
322        while (1) {
323                int thread;
324                if (!pkt_skipped && !libtrace_ringbuffer_try_sread_bl(&trace->packet_freelist, (void **) &packet))
325                        packet = trace_create_packet();
326                assert(packet);
327
328                if (libtrace_halt) // Signal to die has been sent - TODO
329                        break;
330
331                if ((packet->error = trace_read_packet(trace, packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
332                        break; /* We are EOF or error'd either way we stop  */
333                }
334
335                /* We are guaranteed to have a hash function i.e. != NULL */
336                trace_packet_set_hash(packet, (*trace->hasher)(packet, trace->hasher_data));
337                thread = trace_packet_get_hash(packet) % trace->mapper_thread_count;
338                /* Blocking write to the correct queue - I'm the only writer */
339                if (trace->mapper_threads[thread].state != THREAD_FINISHED) {
340                        libtrace_ringbuffer_write(&trace->mapper_threads[thread].rbuffer, packet);
341                        pkt_skipped = 0;
342                } else {
343                        pkt_skipped = 1; // Reuse that packet no one read it
344                }
345        }
346
347        /* Broadcast our last failed read to all threads */
348        for (i = 0; i < trace->mapper_thread_count; i++) {
349                libtrace_packet_t * bcast;
350                printf("Broadcasting error/EOF now the trace is over\n");
351                if (i == trace->mapper_thread_count - 1) {
352                        bcast = packet;
353                } else {
354                        bcast = trace_create_packet();
355                        bcast->error = packet->error;
356                }
357                assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
358                if (trace->mapper_threads[i].state != THREAD_FINISHED) {
359                        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
360                        // Unlock early otherwise we could deadlock
361                        libtrace_ringbuffer_write(&trace->mapper_threads[i].rbuffer, NULL);
362                } else {
363                        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
364                }
365        }
366        // We dont need to free packet
367
368        // And we're at the end free the memories
369        t->state = THREAD_FINISHED;
370
371        // Notify only after we've defiantly set the state to finished
372        libtrace_message_t message;
373        message.code = MESSAGE_MAPPER_ENDED;
374        message.additional = NULL;
375        trace_send_message_to_reducer(trace, &message);
376
377        // TODO remove from TTABLE t sometime
378        pthread_exit(NULL);
379};
380
381/**
382 * Moves src into dest(Complete copy) and copies the memory buffer and
383 * its flags from dest into src ready for reuse without needing extra mallocs.
384 */
385static inline void swap_packets(libtrace_packet_t *dest, libtrace_packet_t *src) {
386        // Save the passed in buffer status
387        assert(dest->trace == NULL); // Must be a empty packet
388        void * temp_buf = dest->buffer;
389        buf_control_t temp_buf_control = dest->buf_control;
390        // Completely copy StoredPacket into packet
391        memcpy(dest, src, sizeof(libtrace_packet_t));
392        // Set the buffer settings on the returned packet
393        src->buffer = temp_buf;
394        src->buf_control = temp_buf_control;
395        src->trace = NULL;
396}
397
398/* Our simplest case when a thread becomes ready it can obtain a exclusive
399 * lock to read a packet from the underlying trace.
400 */
401inline static int trace_pread_packet_first_in_first_served(libtrace_t *libtrace, libtrace_packet_t **packet)
402{
403        // We need this to fill the 'first' packet table
404        libtrace_thread_t *t = get_thread_table(libtrace);
405        if (!*packet) {
406                if (!libtrace_ringbuffer_try_sread_bl(&libtrace->packet_freelist, (void **) packet))
407                        *packet = trace_create_packet();
408        }
409        assert(*packet);
410        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
411        /* Read a packet */
412        (*packet)->error = trace_read_packet(libtrace, *packet);
413        // Doing this inside the lock ensures the first packet is always
414        // recorded first
415        store_first_packet(libtrace, *packet, t);
416
417        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
418        return (*packet)->error;
419}
420
421/**
422 * For the case that we have a dedicated hasher thread
423 * 1. We read a packet from our buffer
424 * 2. Move that into the packet provided (packet)
425 */
426inline static int trace_pread_packet_hasher_thread(libtrace_t *libtrace, libtrace_packet_t **packet)
427{
428        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
429        libtrace_thread_t* t = &libtrace->mapper_threads[this_thread];
430
431        if (*packet) // Recycle the old get the new
432                if (!libtrace_ringbuffer_try_swrite_bl(&libtrace->packet_freelist, (void *) *packet))
433                        trace_destroy_packet(*packet);
434        *packet = libtrace_ringbuffer_read(&t->rbuffer);
435
436        if (*packet) {
437                return 1;
438        } else {
439                printf("Got a NULL packet the trace is over\n");
440                return -1; // We are done for some reason
441        }
442}
443
444/**
445 * Tries to read from our queue and returns 1 if a packet was retrieved
446 */
447static inline int try_waiting_queue(libtrace_t *libtrace, libtrace_thread_t * t, libtrace_packet_t **packet, int * ret)
448{
449        libtrace_packet_t* retrived_packet;
450
451        /* Lets see if we have one waiting */
452        if (libtrace_ringbuffer_try_read(&t->rbuffer, (void **) &retrived_packet)) {
453                /* Copy paste from trace_pread_packet_hasher_thread() except that we try read (non-blocking) */
454                assert(retrived_packet);
455
456                if (*packet) // Recycle the old get the new
457                        if (!libtrace_ringbuffer_try_swrite_bl(&libtrace->packet_freelist, (void *) *packet))
458                                trace_destroy_packet(*packet);
459                *packet = retrived_packet;
460                *ret = (*packet)->error;
461                return 1;
462        }
463        return 0;
464}
465
466/**
467 * Allows us to ensure all threads are finished writing to our threads ring_buffer
468 * before returning EOF/error.
469 */
470inline static int trace_handle_finishing_mapper(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t)
471{
472        /* We are waiting for the condition that another thread ends to check
473         * our queue for new data, once all threads end we can go to finished */
474        bool complete = false;
475        int ret;
476
477        do {
478                // Wait for a thread to end
479                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
480
481                // Check before
482                if (libtrace->mappers_finishing == libtrace->mapper_thread_count) {
483                        complete = true;
484                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
485                        continue;
486                }
487
488                assert(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock) == 0);
489
490                // Check after
491                if (libtrace->mappers_finishing == libtrace->mapper_thread_count) {
492                        complete = true;
493                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
494                        continue;
495                }
496
497                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
498
499                // Always trying to keep our buffer empty for the unlikely case more threads than buffer space want to write into our queue
500                if(try_waiting_queue(libtrace, t, packet, &ret))
501                        return ret;
502        } while (!complete);
503
504        // We can only end up here once all threads complete
505        try_waiting_queue(libtrace, t, packet, &ret);
506
507        return ret;
508        // TODO rethink this logic fix bug here
509}
510
511/**
512 * Expects the libtrace_lock to not be held
513 */
514inline static int trace_finish_mapper(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t)
515{
516        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
517        t->state = THREAD_FINISHING;
518        libtrace->mappers_finishing++;
519        pthread_cond_broadcast(&libtrace->perpkt_cond);
520        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
521        return trace_handle_finishing_mapper(libtrace, packet, t);
522}
523
524/**
525 * This case is much like the dedicated hasher, except that we will become
526 * hasher if we don't have a a packet waiting.
527 *
528 * Note: This is only every used if we have are doing hashing.
529 *
530 * TODO: Can block on zero copy formats such as ring: and dpdk: if the
531 * queue sizes in total are larger than the ring size.
532 *
533 * 1. We read a packet from our buffer
534 * 2. Move that into the packet provided (packet)
535 */
536inline static int trace_pread_packet_hash_locked(libtrace_t *libtrace, libtrace_packet_t **packet)
537{
538        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
539        libtrace_thread_t * t = &libtrace->mapper_threads[this_thread];
540        int thread, ret, psize;
541
542        while (1) {
543                if(try_waiting_queue(libtrace, t, packet, &ret))
544                        return ret;
545                // Can still block here if another thread is writing to a full queue
546                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
547
548                // Its impossible for our own queue to overfill, because no one can write
549                // when we are in the lock
550                if(try_waiting_queue(libtrace, t, packet, &ret)) {
551                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
552                        return ret;
553                }
554
555                // Another thread cannot write a packet because a queue has filled up. Is it ours?
556                if (libtrace->mapper_queue_full) {
557                        contention_stats[this_thread].wait_for_fill_complete_hits++;
558                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
559                        continue;
560                }
561
562                if (!*packet) {
563                        if (!libtrace_ringbuffer_try_sread_bl(&libtrace->packet_freelist, (void **) packet))
564                                *packet = trace_create_packet();
565                }
566                assert(*packet);
567
568                // If we fail here we can guarantee that our queue is empty (and no new data will be added because we hold the lock)
569                if (libtrace_halt || ((*packet)->error = trace_read_packet(libtrace, *packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
570                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
571                        if (libtrace_halt)
572                                return 0;
573                        else
574                                return (*packet)->error;
575                }
576
577                trace_packet_set_hash(*packet, (*libtrace->hasher)(*packet, libtrace->hasher_data));
578                thread = trace_packet_get_hash(*packet) % libtrace->mapper_thread_count;
579                if (thread == this_thread) {
580                        // If it's this thread we must be in order because we checked the buffer once we got the lock
581                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
582                        return (*packet)->error;
583                }
584
585                if (libtrace->mapper_threads[thread].state != THREAD_FINISHED) {
586                        while (!libtrace_ringbuffer_try_swrite_bl(&libtrace->mapper_threads[thread].rbuffer, *packet)) {
587                                libtrace->mapper_queue_full = true;
588                                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
589                                contention_stats[this_thread].full_queue_hits++;
590                                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
591                        }
592                        *packet = NULL;
593                        libtrace->mapper_queue_full = false;
594                } else {
595                        /* We can get here if the user closes the thread before natural completion/or error */
596                        assert (!"packet_hash_locked() The user terminated the trace in a abnormal manner");
597                }
598                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
599        }
600}
601
602/**
603 * This case is much like the dedicated hasher, except that we will become
604 * hasher if we don't have a a packet waiting.
605 *
606 * TODO: You can loose the tail of a trace if the final thread
607 * fills its own queue and therefore breaks early and doesn't empty the sliding window.
608 *
609 * TODO: Can block on zero copy formats such as ring: and dpdk: if the
610 * queue sizes in total are larger than the ring size.
611 *
612 * 1. We read a packet from our buffer
613 * 2. Move that into the packet provided (packet)
614 */
615inline static int trace_pread_packet_sliding_window(libtrace_t *libtrace, libtrace_packet_t **packet)
616{
617        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
618        libtrace_thread_t * t = &libtrace->mapper_threads[this_thread];
619        int ret, i, thread, psize;
620
621        if (t->state == THREAD_FINISHING)
622                return trace_handle_finishing_mapper(libtrace, packet, t);
623
624        while (1) {
625                // Check if we have packets ready
626                if(try_waiting_queue(libtrace, t, packet, &ret))
627                        return ret;
628
629                // We limit the number of packets we get to the size of the sliding window
630                // such that it is impossible for any given thread to fail to store a packet
631                assert(sem_wait(&libtrace->sem) == 0);
632                /*~~~~Single threaded read of a packet~~~~*/
633                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
634
635                /* Re-check our queue things we might have data waiting */
636                if(try_waiting_queue(libtrace, t, packet, &ret)) {
637                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
638                        assert(sem_post(&libtrace->sem) == 0);
639                        return ret;
640                }
641
642                // TODO put on *proper* condition variable
643                if (libtrace->mapper_queue_full) {
644                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
645                        assert(sem_post(&libtrace->sem) == 0);
646                        contention_stats[this_thread].wait_for_fill_complete_hits++;
647                        continue;
648                }
649
650                if (!*packet) {
651                        if (!libtrace_ringbuffer_try_sread_bl(&libtrace->packet_freelist, (void **) packet))
652                                *packet = trace_create_packet();
653                }
654                assert(*packet);
655
656                if (libtrace_halt || ((*packet)->error = trace_read_packet(libtrace, *packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
657                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
658                        assert(sem_post(&libtrace->sem) == 0);
659                        // Finish this thread ensuring that any data written later by another thread is retrieved also
660                        if (libtrace_halt)
661                                return 0;
662                        else
663                                return trace_finish_mapper(libtrace, packet, t);
664                }
665                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
666
667                /* ~~~~Multiple threads can run the hasher~~~~ */
668                trace_packet_set_hash(*packet, (*libtrace->hasher)(*packet, libtrace->hasher_data));
669
670                /* Yes this is correct opposite read lock for a write operation */
671                assert(pthread_rwlock_rdlock(&libtrace->window_lock) == 0);
672                if (!libtrace_slidingwindow_try_write(&libtrace->sliding_window, trace_packet_get_order(*packet), *packet))
673                        assert(!"Semaphore should stop us from ever overfilling the sliding window");
674                assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
675                *packet = NULL;
676
677                // Always try read any data from the sliding window
678                while (libtrace_slidingwindow_read_ready(&libtrace->sliding_window)) {
679                        assert(pthread_rwlock_wrlock(&libtrace->window_lock) == 0);
680                        if (libtrace->mapper_queue_full) {
681                                // I might be the holdup in which case if I can read my queue I should do that and return
682                                if(try_waiting_queue(libtrace, t, packet, &ret)) {
683                                        assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
684                                        return ret;
685                                }
686                                assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
687                                continue;
688                        }
689                        // Read greedily as many as we can
690                        while (libtrace_slidingwindow_try_read(&libtrace->sliding_window, (void **) packet, NULL)) {
691                                thread = trace_packet_get_hash(*packet) % libtrace->mapper_thread_count;
692                                if (libtrace->mapper_threads[thread].state != THREAD_FINISHED) {
693                                        while (!libtrace_ringbuffer_try_swrite_bl(&libtrace->mapper_threads[thread].rbuffer, *packet)) {
694                                                if (this_thread == thread)
695                                                {
696                                                        // TODO think about this case more because we have to stop early if this were to happen on the last read
697                                                        // before EOF/error we might not have emptied the sliding window
698                                                        printf("!~!~!~!~!~!~In this Code~!~!~!~!\n");
699                                                        // Its our queue we must have a packet to read out
700                                                        if(try_waiting_queue(libtrace, t, packet, &ret)) {
701                                                                // We must be able to write this now 100% without fail
702                                                                libtrace_ringbuffer_write(&libtrace->mapper_threads[thread].rbuffer, *packet);
703                                                                assert(sem_post(&libtrace->sem) == 0);
704                                                                assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
705                                                                return ret;
706                                                        } else {
707                                                                assert(!"Our queue is full but I cannot read from it??");
708                                                        }
709                                                }
710                                                // Not us we have to give the other threads a chance to write there packets then
711                                                libtrace->mapper_queue_full = true;
712                                                assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
713                                                for (i = 0; i < libtrace->mapper_thread_count-1; i++) // Release all other threads to read there packets
714                                                        assert(sem_post(&libtrace->sem) == 0);
715
716                                                contention_stats[this_thread].full_queue_hits++;
717                                                assert(pthread_rwlock_wrlock(&libtrace->window_lock) == 0);
718                                                // Grab these back
719                                                for (i = 0; i < libtrace->mapper_thread_count-1; i++) // Release all other threads to read there packets
720                                                        assert(sem_wait(&libtrace->sem) == 0);
721                                                libtrace->mapper_queue_full = false;
722                                        }
723                                        assert(sem_post(&libtrace->sem) == 0);
724                                        *packet = NULL;
725                                } else {
726                                        // Cannot write to a queue if no ones waiting (I think this is unreachable)
727                                        // in the general case (unless the user ends early without proper clean up).
728                                        assert (!"unreachable code??");
729                                }
730                        }
731                        assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
732                }
733                // Now we go back to checking our queue anyways
734        }
735}
736
737
738/**
739 * For the first packet of each queue we keep a copy and note the system
740 * time it was received at.
741 *
742 * This is used for finding the first packet when playing back a trace
743 * in trace time. And can be used by real time applications to print
744 * results out every XXX seconds.
745 */
746inline void store_first_packet(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t)
747{
748        if (!t->recorded_first) {
749                struct timeval tv;
750                libtrace_packet_t * dup;
751                // For what it's worth we can call these outside of the lock
752                gettimeofday(&tv, NULL);
753                dup = trace_copy_packet(packet);
754                assert(pthread_spin_lock(&libtrace->first_packets.lock) == 0);
755                libtrace->first_packets.packets[t->map_num].packet = dup;
756                //printf("Stored first packet time=%f\n", trace_get_seconds(dup));
757                memcpy(&libtrace->first_packets.packets[t->map_num].tv, &tv, sizeof(tv));
758                // Now update the first
759                libtrace->first_packets.count++;
760                if (libtrace->first_packets.count == 1) {
761                        // We the first entry hence also the first known packet
762                        libtrace->first_packets.first = t->map_num;
763                } else {
764                        // Check if we are newer than the previous 'first' packet
765                        size_t first = libtrace->first_packets.first;
766                        if (trace_get_seconds(dup) <
767                                trace_get_seconds(libtrace->first_packets.packets[first].packet))
768                                libtrace->first_packets.first = t->map_num;
769                }
770                assert(pthread_spin_unlock(&libtrace->first_packets.lock) == 0);
771                libtrace_message_t mesg;
772                mesg.code = MESSAGE_FIRST_PACKET;
773                mesg.additional = NULL;
774                trace_send_message_to_reducer(libtrace, &mesg);
775                t->recorded_first = true;
776        }
777}
778
779/**
780 * Returns 1 if its certain that the first packet is truly the first packet
781 * rather than a best guess based upon threads that have published so far.
782 * Otherwise 0 is returned.
783 * It's recommended that this result is stored rather than calling this
784 * function again.
785 */
786DLLEXPORT int retrive_first_packet(libtrace_t *libtrace, libtrace_packet_t **packet, struct timeval **tv)
787{
788        int ret = 0;
789        assert(pthread_spin_lock(&libtrace->first_packets.lock) == 0);
790        if (libtrace->first_packets.count) {
791                *packet = libtrace->first_packets.packets[libtrace->first_packets.first].packet;
792                *tv = &libtrace->first_packets.packets[libtrace->first_packets.first].tv;
793                if (libtrace->first_packets.count == libtrace->mapper_thread_count) {
794                        ret = 1;
795                } else {
796                        struct timeval curr_tv;
797                        // If a second has passed since the first entry we will assume this is the very first packet
798                        gettimeofday(&curr_tv, NULL);
799                        if (curr_tv.tv_sec > (*tv)->tv_sec) {
800                                if(curr_tv.tv_usec > (*tv)->tv_usec || curr_tv.tv_sec - (*tv)->tv_sec > 1) {
801                                        ret = 1;
802                                }
803                        }
804                }
805        } else {
806                *packet = NULL;
807                *tv = NULL;
808        }
809        assert(pthread_spin_unlock(&libtrace->first_packets.lock) == 0);
810        return ret;
811}
812
813
814DLLEXPORT inline uint64_t tv_to_usec(struct timeval *tv)
815{
816        return (uint64_t) tv->tv_sec*1000000ull + (uint64_t) tv->tv_usec;
817}
818
819inline static struct timeval usec_to_tv(uint64_t usec)
820{
821        struct timeval tv;
822        tv.tv_sec = usec / 1000000;
823        tv.tv_usec = usec % 1000000;
824        return tv;
825}
826
827
828/**
829 * Delays a packets playback so the playback will be in trace time
830 */
831static inline void delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t) {
832        struct timeval curr_tv, pkt_tv;
833        uint64_t next_release = t->tracetime_offset_usec; // Time at which to release the packet
834        uint64_t curr_usec;
835        /* Tracetime we might delay releasing this packet */
836        if (!t->tracetime_offset_usec) {
837                libtrace_packet_t * first_pkt;
838                struct timeval *sys_tv;
839                int64_t initial_offset;
840                int stable = retrive_first_packet(libtrace, &first_pkt, &sys_tv);
841                assert(first_pkt);
842                pkt_tv = trace_get_timeval(first_pkt);
843                initial_offset = (int64_t)tv_to_usec(sys_tv) - (int64_t)tv_to_usec(&pkt_tv);
844                if (stable)
845                        // 0->1 because 0 is used to mean unset
846                        t->tracetime_offset_usec = initial_offset ? initial_offset: 1;
847                next_release = initial_offset;
848        }
849        /* next_release == offset */
850        pkt_tv = trace_get_timeval(packet);
851        next_release += tv_to_usec(&pkt_tv);
852        gettimeofday(&curr_tv, NULL);
853        curr_usec = tv_to_usec(&curr_tv);
854        if (next_release > curr_usec) {
855                // We need to wait
856                struct timeval delay_tv = usec_to_tv(next_release-curr_usec);
857                //printf("WAITING for %d.%d next=%"PRIu64" curr=%"PRIu64" seconds packettime %f\n", delay_tv.tv_sec, delay_tv.tv_usec, next_release, curr_usec, trace_get_seconds(packet));
858                select(0, NULL, NULL, NULL, &delay_tv);
859        }
860}
861
862/* Read one packet from the trace into a buffer. Note that this function will
863 * block until a packet is read (or EOF is reached).
864 *
865 * @param libtrace      the libtrace opaque pointer
866 * @param packet        the packet opaque pointer
867 * @returns 0 on EOF, negative value on error
868 *
869 * Note this is identical to read_packet but calls pread_packet instead of
870 * read packet in the format.
871 *
872 */
873static inline int trace_pread_packet_wrapper(libtrace_t *libtrace, libtrace_packet_t *packet) {
874
875        assert(libtrace && "You called trace_read_packet() with a NULL libtrace parameter!\n");
876        if (trace_is_err(libtrace))
877                return -1;
878        if (!libtrace->started) {
879                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,"You must call libtrace_start() before trace_read_packet()\n");
880                return -1;
881        }
882        if (!(packet->buf_control==TRACE_CTRL_PACKET || packet->buf_control==TRACE_CTRL_EXTERNAL)) {
883                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,"Packet passed to trace_read_packet() is invalid\n");
884                return -1;
885        }
886        assert(packet);
887
888        if (libtrace->format->read_packet) {
889                do {
890                        size_t ret;
891                        /* Finalise the packet, freeing any resources the format module
892                         * may have allocated it and zeroing all data associated with it.
893                         */
894                        trace_fin_packet(packet);
895                        /* Store the trace we are reading from into the packet opaque
896                         * structure */
897                        packet->trace = libtrace;
898                        ret=libtrace->format->pread_packet(libtrace,packet);
899                        if (ret==(size_t)-1 || ret==(size_t)-2 || ret==0) {
900                                return ret;
901                        }
902                        if (libtrace->filter) {
903                                /* If the filter doesn't match, read another
904                                 * packet
905                                 */
906                                if (!trace_apply_filter(libtrace->filter,packet)){
907                                        ++libtrace->filtered_packets;
908                                        continue;
909                                }
910                        }
911                        if (libtrace->snaplen>0) {
912                                /* Snap the packet */
913                                trace_set_capture_length(packet,
914                                                libtrace->snaplen);
915                        }
916                        trace_packet_set_order(packet, libtrace->accepted_packets);
917                        ++libtrace->accepted_packets;
918                        return ret;
919                } while(1);
920        }
921        trace_set_err(libtrace,TRACE_ERR_UNSUPPORTED,"This format does not support reading packets\n");
922        return ~0U;
923}
924
925/**
926 * Read a packet from the parallel trace
927 */
928DLLEXPORT int trace_pread_packet(libtrace_t *libtrace, libtrace_packet_t **packet)
929{
930        int ret;
931        libtrace_thread_t *t = get_thread_table(libtrace);
932
933        // Cleanup the packet passed back
934        if (*packet)
935                trace_fin_packet(*packet);
936
937        if (libtrace->format->pread_packet) {
938                if (!*packet)
939                        *packet = trace_create_packet();
940                ret = trace_pread_packet_wrapper(libtrace, *packet);
941        } else  if (!libtrace->hasher) {
942                /* We don't care about which core a packet goes to */
943                ret =  trace_pread_packet_first_in_first_served(libtrace, packet);
944        } else if (trace_has_dedicated_hasher(libtrace)) {
945                ret = trace_pread_packet_hasher_thread(libtrace, packet);
946        } else if (libtrace->reducer_flags & MAPPER_USE_SLIDING_WINDOW) {
947                ret = trace_pread_packet_sliding_window(libtrace, packet);
948        } else {
949                ret = trace_pread_packet_hash_locked(libtrace, packet);
950        }
951
952        // Formats can also optionally do this internally to ensure the first
953        // packet is always reported correctly
954        if (ret > 0) {
955                store_first_packet(libtrace, *packet, t);
956                if (libtrace->tracetime)
957                        delay_tracetime(libtrace, *packet, t);
958        }
959
960        return ret;
961}
962
963/* Starts perpkt threads
964 * @return threads_started
965 */
966static inline int trace_start_perpkt_threads (libtrace_t *libtrace) {
967        int i;
968
969        for (i = 0; i < libtrace->mapper_thread_count; i++) {
970                libtrace_thread_t *t = &libtrace->mapper_threads[i];
971                assert(pthread_create(&t->tid, NULL, mapper_start, (void *) libtrace) == 0);
972        }
973        return libtrace->mapper_thread_count;
974}
975
976/* Start an input trace in a parallel fashion.
977 *
978 * @param libtrace      the input trace to start
979 * @param global_blob some global data you can share with the new thread
980 * @returns 0 on success
981 */
982DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob, fn_per_pkt per_pkt, fn_reducer reducer)
983{
984        int i;
985        sigset_t sig_before, sig_block_all;
986
987        assert(libtrace);
988        if (trace_is_err(libtrace))
989                return -1;;
990        if (libtrace->perpkt_pausing != 0) {
991                printf("Restarting trace\n");
992                libtrace->format->pstart_input(libtrace);
993                // TODO empty any queues out here //
994                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
995                libtrace->started = true;
996                assert(pthread_cond_broadcast(&libtrace->perpkt_cond) == 0);
997                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
998                return 0;
999        }
1000
1001        libtrace_parallel = 1;
1002
1003        // Store the user defined things against the trace
1004        libtrace->global_blob = global_blob;
1005        libtrace->per_pkt = per_pkt;
1006        libtrace->reducer = reducer;
1007        libtrace->mappers_finishing = 0;
1008        // libtrace->hasher = &rand_hash; /* Hasher now set via option */
1009
1010        assert(pthread_mutex_init(&libtrace->libtrace_lock, NULL) == 0);
1011        assert(pthread_cond_init(&libtrace->perpkt_cond, NULL) == 0);
1012        assert(pthread_rwlock_init(&libtrace->window_lock, NULL) == 0);
1013        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1014
1015        // Set default buffer sizes
1016        if (libtrace->mapper_buffer_size <= 0)
1017                libtrace->mapper_buffer_size = 1000;
1018
1019        if (libtrace->mapper_thread_count <= 0)
1020                libtrace->mapper_thread_count = 2; // XXX scale to system
1021
1022        if(libtrace->packet_freelist_size <= 0)
1023                libtrace->packet_freelist_size = (libtrace->mapper_buffer_size + 1) * libtrace->mapper_thread_count;
1024
1025        if(libtrace->packet_freelist_size <
1026                (libtrace->mapper_buffer_size + 1) * libtrace->mapper_thread_count)
1027                fprintf(stderr, "WARNING deadlocks may occur and extra memory allocating buffer sizes (packet_freelist_size) mismatched\n");
1028
1029        libtrace->started=true; // Before we start the threads otherwise we could have issues
1030        /* Disable signals - Pthread signal handling */
1031
1032        sigemptyset(&sig_block_all);
1033
1034        assert(pthread_sigmask(SIG_SETMASK, &sig_block_all, &sig_before) == 0);
1035
1036        // If we are using a hasher start it
1037        if (libtrace->hasher && libtrace->hasher_thread.type == THREAD_HASHER) {
1038                libtrace_thread_t *t = &libtrace->hasher_thread;
1039                t->trace = libtrace;
1040                t->ret = NULL;
1041                t->type = THREAD_HASHER;
1042                t->state = THREAD_RUNNING;
1043                assert(pthread_create(&t->tid, NULL, hasher_start, (void *) libtrace) == 0);
1044        } else {
1045                libtrace->hasher_thread.type = THREAD_EMPTY;
1046        }
1047        libtrace_ringbuffer_init(&libtrace->packet_freelist, libtrace->packet_freelist_size, LIBTRACE_RINGBUFFER_POLLING);
1048        libtrace_slidingwindow_init(&libtrace->sliding_window, libtrace->packet_freelist_size, 0);
1049        assert(sem_init(&libtrace->sem, 0, libtrace->packet_freelist_size) == 0);
1050        // This will be applied to every new thread that starts, i.e. they will block all signals
1051        // Lets start a fixed number of reading threads
1052
1053        // For now we never have a dedicated thread for the reducer
1054        // i.e. This main thread is used as the reducer
1055        libtrace->reducer_thread.tid = pthread_self();
1056        libtrace->reducer_thread.type = THREAD_REDUCER;
1057        libtrace->reducer_thread.state = THREAD_RUNNING;
1058        libtrace_message_queue_init(&libtrace->reducer_thread.messages, sizeof(libtrace_message_t));
1059
1060        /* Ready some storages */
1061        libtrace->first_packets.first = 0;
1062        libtrace->first_packets.count = 0;
1063        assert(pthread_spin_init(&libtrace->first_packets.lock, 0) == 0);
1064        libtrace->first_packets.packets = calloc(libtrace->mapper_thread_count, sizeof(struct  __packet_storage_magic_type));
1065
1066
1067        /* Start all of our mapper threads */
1068        libtrace->mapper_threads = calloc(sizeof(libtrace_thread_t), libtrace->mapper_thread_count);
1069        for (i = 0; i < libtrace->mapper_thread_count; i++) {
1070                libtrace_thread_t *t = &libtrace->mapper_threads[i];
1071                t->trace = libtrace;
1072                t->ret = NULL;
1073                t->type = THREAD_MAPPER;
1074                t->state = THREAD_RUNNING;
1075                t->user_data = NULL;
1076                // t->tid DONE on create
1077                t->map_num = i;
1078                if (libtrace->hasher)
1079                        libtrace_ringbuffer_init(&t->rbuffer, libtrace->mapper_buffer_size, LIBTRACE_RINGBUFFER_POLLING);
1080                // Depending on the mode vector or deque might be chosen
1081                libtrace_vector_init(&t->vector, sizeof(libtrace_result_t));
1082                libtrace_deque_init(&t->deque, sizeof(libtrace_result_t));
1083                libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
1084                t->tmp_key = 0;
1085                t->tmp_data = NULL;
1086                t->recorded_first = false;
1087                assert(pthread_spin_init(&t->tmp_spinlock, 0) == 0);
1088                t->tracetime_offset_usec = 0;;
1089        }
1090
1091        int threads_started = 0;
1092        /* Setup the trace and start our threads */
1093        if (libtrace->mapper_thread_count > 1 && libtrace->format->pstart_input) {
1094                printf("This format has direct support for p's\n");
1095                threads_started = libtrace->format->pstart_input(libtrace);
1096        } else {
1097                if (libtrace->format->start_input) {
1098                        threads_started=libtrace->format->start_input(libtrace);
1099                }
1100        }
1101        if (threads_started == 0)
1102                threads_started = trace_start_perpkt_threads(libtrace);
1103
1104
1105        // Revert back - Allow signals again
1106        assert(pthread_sigmask(SIG_SETMASK, &sig_before, NULL) == 0);
1107        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1108
1109        if (threads_started < 0)
1110                // Error
1111                return threads_started;
1112
1113        // TODO fix these leaks etc
1114        if (libtrace->mapper_thread_count != threads_started)
1115                printf("Warning started threads not equal requested s=%d r=%d", threads_started, libtrace->mapper_thread_count);
1116
1117
1118        return 0;
1119}
1120
1121/**
1122 * Pauses a trace, this should only be called by the main thread
1123 * 1. Set started = false
1124 * 2. All perpkt threads are paused waiting on a condition var
1125 * 3. Then call ppause on the underlying format if found
1126 * 4. Return with perpkt_pausing set to mapper_count (Used when restarting so we reuse the threads)
1127 *
1128 * Once done you should be a able to modify the trace setup and call pstart again
1129 * TODO handle changing thread numbers
1130 */
1131DLLEXPORT int trace_ppause(libtrace_t *libtrace)
1132{
1133        libtrace_thread_t *t;
1134        int i;
1135        assert(libtrace);
1136        if (!libtrace->started) {
1137                trace_set_err(libtrace,TRACE_ERR_BAD_STATE, "You must call trace_start() before calling trace_ppause()");
1138                return -1;
1139        }
1140
1141        t = get_thread_table(libtrace);
1142
1143        // Set paused
1144        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1145        libtrace->started = false;
1146        pthread_cond_broadcast(&libtrace->perpkt_cond);
1147        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1148
1149        printf("Sending messages \n");
1150        // Stop threads, skip this one if its a mapper
1151        for (i = 0; i < libtrace->mapper_thread_count; i++) {
1152                if (&libtrace->mapper_threads[i] != t) {
1153                        libtrace_message_t message;
1154                        message.code = MESSAGE_PAUSE;
1155                        message.additional = NULL;
1156                        trace_send_message_to_thread(libtrace, &libtrace->mapper_threads[i], &message);
1157                }
1158        }
1159
1160        // Formats must support native message handling if a message is ready
1161        // Approach per Perry's suggestion is a non-blocking read
1162        // followed by a blocking read. XXX STRIP THIS OUT
1163
1164        if (t) {
1165                // A mapper is doing the pausing interesting fake a extra thread paused
1166                // We rely on the user to not return before starting the trace again
1167                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1168                libtrace->perpkt_pausing++;
1169                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1170        }
1171
1172        printf("Threads are pausing\n");
1173
1174        // Do a early pause to kick threads out - XXX testing for int
1175        if (libtrace->format->pause_input)
1176                        libtrace->format->pause_input(libtrace);
1177
1178        // Wait for all threads to pause
1179        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1180        while (libtrace->mapper_thread_count != libtrace->perpkt_pausing) {
1181                assert(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock) == 0);
1182        }
1183        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1184
1185        printf("Threads have paused\n");
1186
1187        if (trace_supports_parallel(libtrace)) {
1188                if (libtrace->format->ppause_input)
1189                        libtrace->format->ppause_input(libtrace);
1190                // TODO What happens if we don't have pause input??
1191        } else {
1192                printf("Trace is not parallel so we are doing a normal pause %s\n", libtrace->uridata);
1193                // This doesn't really work because this could be called by any thread
1194                // Maybe we should grab the lock here??
1195                if (libtrace->format->pause_input)
1196                        libtrace->format->pause_input(libtrace);
1197                // TODO What happens if we don't have pause input??
1198        }
1199
1200        return 0;
1201}
1202
1203/**
1204 * Stop trace finish prematurely as though it meet an EOF
1205 * This should only be called by the main thread
1206 * 1. Calls ppause
1207 * 2. Sends a message asking for threads to finish
1208 *
1209 */
1210DLLEXPORT int trace_pstop(libtrace_t *libtrace)
1211{
1212        int i;
1213        if (!libtrace->started) {
1214                trace_set_err(libtrace,TRACE_ERR_BAD_STATE, "You must call trace_start() before calling trace_pstop()");
1215                return -1;
1216        }
1217
1218        // Ensure all threads have paused and the underlying trace format has
1219        // been closed
1220        trace_ppause(libtrace);
1221
1222        // Now send a message asking the threads to stop
1223        // This will be retrieved before trying to read another packet
1224        for (i = 0; i < libtrace->mapper_thread_count; i++) {
1225                libtrace_message_t message;
1226                message.code = MESSAGE_STOP;
1227                message.additional = NULL;
1228                trace_send_message_to_thread(libtrace, &libtrace->mapper_threads[i], &message);
1229        }
1230
1231        // Now release the threads and let them stop
1232        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1233        libtrace->started = true;
1234        assert(pthread_cond_broadcast(&libtrace->perpkt_cond) == 0);
1235        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1236        return 0;
1237}
1238
1239/**
1240 * Set the hasher type along with a selected function, if hardware supports
1241 * that generic type of hashing it will be used otherwise the supplied
1242 * hasher function will be used and passed data when called.
1243 *
1244 * @return 0 if successful otherwise -1 on error
1245 */
1246DLLEXPORT int trace_set_hasher(libtrace_t *trace, enum hasher_types type, fn_hasher hasher, void *data) {
1247        int ret = -1;
1248        if (type == HASHER_HARDWARE || (type == HASHER_CUSTOM && !hasher) || (type == HASHER_BALANCE && hasher)) {
1249                return -1;
1250        }
1251
1252        // Save the requirements
1253        trace->hasher_type = type;
1254        if (hasher) {
1255                trace->hasher = hasher;
1256                trace->hasher_data = hasher;
1257        } else {
1258                trace->hasher = NULL;
1259                // TODO consider how to handle freeing this
1260                trace->hasher_data = NULL;
1261        }
1262
1263        // Try push this to hardware - NOTE hardware could do custom if
1264        // there is a more efficient way to apply it, in this case
1265        // it will simply grab the function out of libtrace_t
1266        if (trace->format->pconfig_input)
1267                ret = trace->format->pconfig_input(trace, TRACE_OPTION_SET_HASHER, &type);
1268
1269        if (ret == -1) {
1270                // We have to deal with this ourself
1271                // This most likely means single threaded reading of the trace
1272                if (!hasher) {
1273                        switch (type)
1274                        {
1275                                case HASHER_CUSTOM:
1276                                case HASHER_BALANCE:
1277                                        return 0;
1278                                case HASHER_BIDIRECTIONAL:
1279                                        trace->hasher = toeplitz_hash_packet;
1280                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
1281                                        toeplitz_init_config(trace->hasher_data, 1);
1282                                        return 0;
1283                                case HASHER_UNIDIRECTIONAL:
1284                                        trace->hasher = toeplitz_hash_packet;
1285                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
1286                                        toeplitz_init_config(trace->hasher_data, 1);
1287                                        return 0;
1288                                case HASHER_HARDWARE:
1289                                        return -1;
1290                        }
1291                        return -1;
1292                }
1293        } else {
1294                // The hardware is dealing with this yay
1295                trace->hasher_type = HASHER_HARDWARE;
1296        }
1297
1298        return 0;
1299}
1300
1301// Waits for all threads to finish
1302DLLEXPORT void trace_join(libtrace_t *libtrace) {
1303        int i;
1304
1305        /* Firstly wait for the mapper threads to finish, since these are
1306         * user controlled */
1307        for (i=0; i< libtrace->mapper_thread_count; i++) {
1308                //printf("Waiting to join with mapper #%d\n", i);
1309                assert(pthread_join(libtrace->mapper_threads[i].tid, NULL) == 0);
1310                //printf("Joined with mapper #%d\n", i);
1311                // So we must do our best effort to empty the queue - so
1312                // the producer (or any other threads) don't block.
1313                libtrace_packet_t * packet;
1314                // Mark that we are no longer accepting packets
1315                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1316                libtrace->mapper_threads[i].state = THREAD_FINISHED; // Important we are finished before we empty the buffer
1317                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1318                while(libtrace_ringbuffer_try_read(&libtrace->mapper_threads[i].rbuffer, (void **) &packet))
1319                        if (packet) // This could be NULL iff the mapper finishes early
1320                                trace_destroy_packet(packet);
1321        }
1322
1323        /* Now the hasher */
1324        // XXX signal it to stop
1325        if (trace_has_dedicated_hasher(libtrace)) {
1326                printf("Waiting to join with the hasher\n");
1327                pthread_join(libtrace->hasher_thread.tid, NULL);
1328                printf("Joined with with the hasher\n");
1329                libtrace->hasher_thread.state = THREAD_FINISHED;
1330        }
1331
1332        // Now that everything is finished nothing can be touching our
1333        // buffers so clean them up
1334        for (i = 0; i < libtrace->mapper_thread_count; i++) {
1335                // Its possible 1 packet got added by the reducer (or 1 per any other thread) since we cleaned up
1336                // if they lost timeslice before-during a write
1337                libtrace_packet_t * packet;
1338                while(libtrace_ringbuffer_try_read(&libtrace->mapper_threads[i].rbuffer, (void **) &packet))
1339                        trace_destroy_packet(packet);
1340                if (libtrace->hasher) {
1341                        assert(libtrace_ringbuffer_is_empty(&libtrace->mapper_threads[i].rbuffer));
1342                        libtrace_ringbuffer_destroy(&libtrace->mapper_threads[i].rbuffer);
1343                }
1344                // Cannot destroy vector yet, this happens with trace_destroy
1345        }
1346
1347        // Lets mark this as done for now
1348        libtrace->joined = true;
1349}
1350
1351// Don't use extra overhead = :( directly place in storage structure using
1352// post
1353DLLEXPORT libtrace_result_t *trace_create_result()
1354{
1355        libtrace_result_t *result = malloc(sizeof(libtrace_result_t));
1356        assert(result);
1357        result->key = 0;
1358        result->value = NULL;
1359        // TODO automatically back with a free list!!
1360        return result;
1361}
1362
1363DLLEXPORT int libtrace_thread_get_message_count(libtrace_t * libtrace)
1364{
1365        libtrace_thread_t * t = get_thread_descriptor(libtrace);
1366        assert(t);
1367        return libtrace_message_queue_count(&t->messages);
1368}
1369
1370DLLEXPORT int libtrace_thread_get_message(libtrace_t * libtrace, libtrace_message_t * message)
1371{
1372        libtrace_thread_t * t = get_thread_descriptor(libtrace);
1373        assert(t);
1374        return libtrace_message_queue_get(&t->messages, message);
1375}
1376
1377DLLEXPORT int libtrace_thread_try_get_message(libtrace_t * libtrace, libtrace_message_t * message)
1378{
1379        libtrace_thread_t * t = get_thread_descriptor(libtrace);
1380        assert(t);
1381        return libtrace_message_queue_try_get(&t->messages, message);
1382}
1383
1384/**
1385 * Return backlog indicator
1386 */
1387DLLEXPORT int trace_post_reduce(libtrace_t *libtrace)
1388{
1389        libtrace_message_t message = {0};
1390        message.code = MESSAGE_POST_REDUCE;
1391        message.additional = NULL;
1392        message.sender = get_thread_descriptor(libtrace);
1393        return libtrace_message_queue_put(&libtrace->reducer_thread.messages, (void *) &message);
1394}
1395
1396/**
1397 * Return backlog indicator
1398 */
1399DLLEXPORT int trace_send_message_to_reducer(libtrace_t * libtrace, libtrace_message_t * message)
1400{
1401        //printf("Sending message code=%d to reducer\n", message->code);
1402        message->sender = get_thread_descriptor(libtrace);
1403        return libtrace_message_queue_put(&libtrace->reducer_thread.messages, message);
1404}
1405
1406/**
1407 *
1408 */
1409DLLEXPORT int trace_send_message_to_thread(libtrace_t * libtrace, libtrace_thread_t *t, libtrace_message_t * message)
1410{
1411        //printf("Sending message code=%d to reducer\n", message->code);
1412        message->sender = get_thread_descriptor(libtrace);
1413        return libtrace_message_queue_put(&t->messages, message);
1414}
1415
1416DLLEXPORT void libtrace_result_set_key(libtrace_result_t * result, uint64_t key) {
1417        result->key = key;
1418}
1419DLLEXPORT uint64_t libtrace_result_get_key(libtrace_result_t * result) {
1420        return result->key;
1421}
1422DLLEXPORT void libtrace_result_set_value(libtrace_result_t * result, void * value) {
1423        result->value = value;
1424}
1425DLLEXPORT void* libtrace_result_get_value(libtrace_result_t * result) {
1426        return result->value;
1427}
1428DLLEXPORT void libtrace_result_set_key_value(libtrace_result_t * result, uint64_t key, void * value) {
1429        result->key = key;
1430        result->value = value;
1431}
1432DLLEXPORT void trace_destroy_result(libtrace_result_t ** result) {
1433        free(*result);
1434        result = NULL;
1435        // TODO automatically back with a free list!!
1436}
1437
1438DLLEXPORT void * trace_get_global(libtrace_t *trace)
1439{
1440        return trace->global_blob;
1441}
1442
1443DLLEXPORT void * trace_set_global(libtrace_t *trace, void * data)
1444{
1445        if (trace->global_blob && trace->global_blob != data) {
1446                void * ret = trace->global_blob;
1447                trace->global_blob = data;
1448                return ret;
1449        } else {
1450                trace->global_blob = data;
1451                return NULL;
1452        }
1453}
1454
1455DLLEXPORT void * trace_get_tls(libtrace_thread_t *t)
1456{
1457        return t->user_data;
1458}
1459
1460DLLEXPORT void * trace_set_tls(libtrace_thread_t *t, void * data)
1461{
1462        if(t->user_data && t->user_data != data) {
1463                void *ret = t->user_data;
1464                t->user_data = data;
1465                return ret;
1466        } else {
1467                t->user_data = data;
1468                return NULL;
1469        }
1470}
1471
1472/**
1473 * Note: This function grabs a lock and expects trace_update_inprogress_result
1474 * to be called to release the lock.
1475 *
1476 * Expected to be used in trace-time situations to allow a result to be pending
1477 * a publish that can be taken by the reducer before publish if it wants to
1478 * publish a result. Such as publish a result every second but a queue hasn't
1479 * processed a packet (or is overloaded) and hasn't published yet.
1480 *
1481 * Currently this only supports a single temporary result,
1482 * as such if a key is different to the current temporary result the existing
1483 * one will be published and NULL returned.
1484 */
1485DLLEXPORT void * trace_retrive_inprogress_result(libtrace_t *libtrace, uint64_t key)
1486{
1487        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
1488        libtrace_thread_t * t = &libtrace->mapper_threads[this_thread];
1489
1490        assert (pthread_spin_lock(&t->tmp_spinlock) == 0);
1491        if (t->tmp_key != key) {
1492                if (t->tmp_data) {
1493                        //printf("publising data key=%"PRIu64"\n", t->tmp_key);
1494                        trace_publish_result(libtrace, t->tmp_key, t->tmp_data);
1495                }
1496                t->tmp_data = NULL;
1497                t->tmp_key = key;
1498        }
1499        return t->tmp_data;
1500}
1501
1502/**
1503 * Updates a temporary result and releases the lock previously grabbed by trace_retrive_inprogress_result
1504 */
1505DLLEXPORT void trace_update_inprogress_result(libtrace_t *libtrace, uint64_t key, void * value)
1506{
1507        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
1508        libtrace_thread_t * t = &libtrace->mapper_threads[this_thread];
1509        if (t->tmp_key != key) {
1510                if (t->tmp_data) {
1511                        printf("BAD publihsing data key=%"PRIu64"\n", t->tmp_key);
1512                        trace_publish_result(libtrace, t->tmp_key, t->tmp_data);
1513                }
1514                t->tmp_key = key;
1515        }
1516        t->tmp_data = value;
1517        assert (pthread_spin_unlock(&t->tmp_spinlock) == 0);
1518}
1519
1520/**
1521 * Publish to the reduce queue, return
1522 */
1523DLLEXPORT void trace_publish_result(libtrace_t *libtrace, uint64_t key, void * value) {
1524        libtrace_result_t res;
1525        // Who am I???
1526        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
1527        libtrace_thread_t * t = &libtrace->mapper_threads[this_thread];
1528        // Now put it into my table
1529        static __thread int count = 0;
1530
1531
1532        libtrace_result_set_key_value(&res, key, value);
1533        /*
1534        if (count == 1)
1535                printf("My vector size is %d\n", libtrace_vector_get_size(&t->vector));
1536        count = (count+1) %1000;
1537        libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :)
1538        */
1539        /*if (count == 1)
1540                printf("My vector size is %d\n", libtrace_deque_get_size(&t->deque));
1541        count = (count+1)%1000;*/
1542        if (libtrace->reducer_flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED)) {
1543                if (libtrace_deque_get_size(&t->deque) >= 800) {
1544                        trace_post_reduce(libtrace);
1545                }
1546                //while (libtrace_deque_get_size(&t->deque) >= 1000)
1547                //      sched_yield();
1548                libtrace_deque_push_back(&t->deque, &res); // Automatically locking for us :)
1549        } else {
1550                //while (libtrace_vector_get_size(&t->vector) >= 1000)
1551                //      sched_yield();
1552
1553                if (libtrace_deque_get_size(&t->deque) >= 800) {
1554                        trace_post_reduce(libtrace);
1555                }
1556                libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :)
1557        }
1558}
1559
1560
1561static int compareres(const void* p1, const void* p2)
1562{
1563        if (libtrace_result_get_key((libtrace_result_t *) p1) < libtrace_result_get_key((libtrace_result_t *) p2))
1564                return -1;
1565        if (libtrace_result_get_key((libtrace_result_t *) p1) == libtrace_result_get_key((libtrace_result_t *) p2))
1566                return 0;
1567        else
1568                return 1;
1569}
1570
1571/* Retrieves all results with the key requested from the temporary result
1572 * holding zone.
1573 */
1574DLLEXPORT int trace_get_results_check_temp(libtrace_t *libtrace, libtrace_vector_t *results, uint64_t key)
1575{
1576        int i;
1577
1578        libtrace_vector_empty(results);
1579        // Check all of the temp queues
1580        for (i = 0; i < libtrace->mapper_thread_count; ++i) {
1581                libtrace_result_t r = {0,0};
1582                assert (pthread_spin_lock(&libtrace->mapper_threads[i].tmp_spinlock) == 0);
1583                if (libtrace->mapper_threads[i].tmp_key == key) {
1584                        libtrace_result_set_key_value(&r, key, libtrace->mapper_threads[i].tmp_data);
1585                        libtrace->mapper_threads[i].tmp_data = NULL;
1586                        printf("Found in temp queue\n");
1587                }
1588                assert (pthread_spin_unlock(&libtrace->mapper_threads[i].tmp_spinlock) == 0);
1589                if (libtrace_result_get_value(&r)) {
1590                        // Got a result still in temporary
1591                        printf("Publishing from temp queue\n");
1592                        libtrace_vector_push_back(results, &r);
1593                } else {
1594                        // This might be waiting on the actual queue
1595                        libtrace_queue_t *v = &libtrace->mapper_threads[i].deque;
1596                        if (libtrace_deque_peek_front(v, (void *) &r) &&
1597                                        libtrace_result_get_value(&r)) {
1598                                assert (libtrace_deque_pop_front(&libtrace->mapper_threads[i].deque, (void *) &r) == 1);
1599                                printf("Found in real queue\n");
1600                                libtrace_vector_push_back(results, &r);
1601                        } // else no data (probably means no packets)
1602                        else {
1603                                printf("Result missing in real queue\n");
1604                        }
1605                }
1606        }
1607        //printf("Loop done yo, that means we've got #%d results to print fool!\n", libtrace_vector_get_size(results));
1608        return libtrace_vector_get_size(results);
1609}
1610
1611DLLEXPORT int trace_get_results(libtrace_t *libtrace, libtrace_vector_t * results) {
1612        int i;
1613        int flags = libtrace->reducer_flags; // Hint these aren't a changing
1614
1615        libtrace_vector_empty(results);
1616
1617        /* Here we assume queues are in order ascending order and they want
1618         * the smallest result first. If they are not in order the results
1619         * may not be in order.
1620         */
1621        if (flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED)) {
1622                int live_count = 0;
1623                bool live[libtrace->mapper_thread_count]; // Set if a trace is alive
1624                uint64_t key[libtrace->mapper_thread_count]; // Cached keys
1625                uint64_t min_key = UINT64_MAX; // XXX use max int here stdlimit.h?
1626                int min_queue = -1;
1627
1628                /* Loop through check all are alive (have data) and find the smallest */
1629                for (i = 0; i < libtrace->mapper_thread_count; ++i) {
1630                        libtrace_queue_t *v = &libtrace->mapper_threads[i].deque;
1631                        if (libtrace_deque_get_size(v) != 0) {
1632                                libtrace_result_t r;
1633                                libtrace_deque_peek_front(v, (void *) &r);
1634                                live_count++;
1635                                live[i] = 1;
1636                                key[i] = libtrace_result_get_key(&r);
1637                                if (i==0 || min_key > key[i]) {
1638                                        min_key = key[i];
1639                                        min_queue = i;
1640                                }
1641                        } else {
1642                                live[i] = 0;
1643                        }
1644                }
1645
1646                /* Now remove the smallest and loop - special case if all threads have joined we always flush whats left */
1647                while ((live_count == libtrace->mapper_thread_count) || (live_count &&
1648                                ((flags & REDUCE_SEQUENTIAL && min_key == libtrace->expected_key) ||
1649                                libtrace->joined))) {
1650                        /* Get the minimum queue and then do stuff */
1651                        libtrace_result_t r;
1652
1653                        assert (libtrace_deque_pop_front(&libtrace->mapper_threads[min_queue].deque, (void *) &r) == 1);
1654                        libtrace_vector_push_back(results, &r);
1655
1656                        // We expect the key we read +1 now
1657                        libtrace->expected_key = key[min_queue] + 1;
1658
1659                        // Now update the one we just removed
1660                        if (libtrace_deque_get_size(&libtrace->mapper_threads[min_queue].deque) )
1661                        {
1662                                libtrace_deque_peek_front(&libtrace->mapper_threads[min_queue].deque, (void *) &r);
1663                                key[min_queue] = libtrace_result_get_key(&r);
1664                                if (key[min_queue] <= min_key) {
1665                                        // We are still the smallest, might be out of order though :(
1666                                        min_key = key[min_queue];
1667                                } else {
1668                                        min_key = key[min_queue]; // Update our minimum
1669                                        // Check all find the smallest again - all are alive
1670                                        for (i = 0; i < libtrace->mapper_thread_count; ++i) {
1671                                                if (live[i] && min_key > key[i]) {
1672                                                        min_key = key[i];
1673                                                        min_queue = i;
1674                                                }
1675                                        }
1676                                }
1677                        } else {
1678                                live[min_queue] = 0;
1679                                live_count--;
1680                                min_key = UINT64_MAX; // Update our minimum
1681                                // Check all find the smallest again - all are alive
1682                                for (i = 0; i < libtrace->mapper_thread_count; ++i) {
1683                                        // Still not 100% TODO (what if order is wrong or not increasing)
1684                                        if (live[i] && min_key >= key[i]) {
1685                                                min_key = key[i];
1686                                                min_queue = i;
1687                                        }
1688                                }
1689                        }
1690                }
1691        } else { // Queues are not in order - return all results in the queue
1692                for (i = 0; i < libtrace->mapper_thread_count; i++) {
1693                        libtrace_vector_append(results, &libtrace->mapper_threads[i].vector);
1694                }
1695                if (flags & REDUCE_SORT) {
1696                        qsort(results->elements, results->size, results->element_size, &compareres);
1697                }
1698        }
1699        return libtrace_vector_get_size(results);
1700}
1701
1702DLLEXPORT uint64_t trace_packet_get_order(libtrace_packet_t * packet) {
1703        return packet->order;
1704}
1705
1706DLLEXPORT uint64_t trace_packet_get_hash(libtrace_packet_t * packet) {
1707        return packet->hash;
1708}
1709
1710DLLEXPORT void trace_packet_set_order(libtrace_packet_t * packet, uint64_t order) {
1711        packet->order = order;
1712}
1713
1714DLLEXPORT void trace_packet_set_hash(libtrace_packet_t * packet, uint64_t hash) {
1715        packet->hash = hash;
1716}
1717
1718DLLEXPORT int trace_finished(libtrace_t * libtrace) {
1719        int i;
1720        int b = 0;
1721        // TODO I don't like using this so much
1722        //assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1723        for (i = 0; i < libtrace->mapper_thread_count; i++) {
1724                if (libtrace->mapper_threads[i].state == THREAD_RUNNING)
1725                        b++;
1726        }
1727        //assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1728        return !b;
1729}
1730
1731DLLEXPORT int trace_parallel_config(libtrace_t *libtrace, trace_parallel_option_t option, void *value)
1732{
1733        int ret = -1;
1734        switch (option) {
1735                case TRACE_OPTION_SET_HASHER:
1736                        return trace_set_hasher(libtrace, (enum hasher_types) *((int *) value), NULL, NULL);
1737                case TRACE_OPTION_SET_MAPPER_BUFFER_SIZE:
1738                        libtrace->mapper_buffer_size = *((int *) value);
1739                        return 1;
1740                case TRACE_OPTION_SET_PACKET_FREELIST_SIZE:
1741                        libtrace->packet_freelist_size = *((int *) value);
1742                        return 1;
1743                case TRACE_OPTION_SET_MAPPER_THREAD_COUNT:
1744                        libtrace->mapper_thread_count = *((int *) value);
1745                        return 1;
1746                case TRACE_DROP_OUT_OF_ORDER:
1747                        if (*((int *) value))
1748                                libtrace->reducer_flags |= REDUCE_DROP_OOO;
1749                        else
1750                                libtrace->reducer_flags &= ~REDUCE_DROP_OOO;
1751                        return 1;
1752                case TRACE_OPTION_SEQUENTIAL:
1753                        if (*((int *) value))
1754                                libtrace->reducer_flags |= REDUCE_SEQUENTIAL;
1755                        else
1756                                libtrace->reducer_flags &= ~REDUCE_SEQUENTIAL;
1757                        return 1;
1758                case TRACE_OPTION_ORDERED:
1759                        if (*((int *) value))
1760                                libtrace->reducer_flags |= REDUCE_ORDERED;
1761                        else
1762                                libtrace->reducer_flags &= ~REDUCE_ORDERED;
1763                        return 1;
1764                case TRACE_OPTION_USE_DEDICATED_HASHER:
1765                        if (*((int *) value))
1766                                libtrace->hasher_thread.type = THREAD_HASHER;
1767                        else
1768                                libtrace->hasher_thread.type = THREAD_EMPTY;
1769                        return 1;
1770                case TRACE_OPTION_USE_SLIDING_WINDOW_BUFFER:
1771                        if (*((int *) value))
1772                                libtrace->reducer_flags |= MAPPER_USE_SLIDING_WINDOW;
1773                        else
1774                                libtrace->reducer_flags &= ~MAPPER_USE_SLIDING_WINDOW;
1775                        return 1;
1776                case TRACE_OPTION_TRACETIME:
1777                        if(*((int *) value))
1778                                libtrace->tracetime = 1;
1779                        else
1780                                libtrace->tracetime = 0;
1781                        return 0;
1782        }
1783        return 0;
1784}
1785
1786DLLEXPORT libtrace_packet_t* trace_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet) {
1787        libtrace_packet_t* result;
1788        if (!libtrace_ringbuffer_try_sread_bl(&libtrace->packet_freelist, (void **) &result))
1789                result = trace_create_packet();
1790        assert(result);
1791        swap_packets(result, packet); // Move the current packet into our copy
1792        return result;
1793}
1794
1795DLLEXPORT void trace_free_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet) {
1796        // Try write back the packet
1797        assert(packet);
1798        // Always release any resources this might be holding such as a slot in a ringbuffer
1799        trace_fin_packet(packet);
1800        if (!libtrace_ringbuffer_try_swrite_bl(&libtrace->packet_freelist, packet)) {
1801                /* We couldn't, oh well lets just destroy it - XXX consider non managed formats i.e. rings buffers loosing packets and jamming up :( */
1802                //assert(1 == 90);
1803                trace_destroy_packet(packet);
1804        }
1805}
Note: See TracBrowser for help on using the repository browser.