source: lib/trace_parallel.c @ 17a3dff

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 17a3dff was 17a3dff, checked in by Richard Sanger <rsangerarj@…>, 8 years ago

Rename from google map/reduce framework names to something more meaningful.
Rename mapper to perpkt since this is what it actually is in libtrace.

  • Property mode set to 100644
File size: 59.5 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * All rights reserved.
8 *
9 * This code has been developed by the University of Waikato WAND
10 * research group. For further information please see http://www.wand.net.nz/
11 *
12 * libtrace is free software; you can redistribute it and/or modify
13 * it under the terms of the GNU General Public License as published by
14 * the Free Software Foundation; either version 2 of the License, or
15 * (at your option) any later version.
16 *
17 * libtrace is distributed in the hope that it will be useful,
18 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 * GNU General Public License for more details.
21 *
22 * You should have received a copy of the GNU General Public License
23 * along with libtrace; if not, write to the Free Software
24 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25 *
26 * $Id$
27 *
28 */
29
30
31#define _GNU_SOURCE
32#include "common.h"
33#include "config.h"
34#include <assert.h>
35#include <errno.h>
36#include <fcntl.h>
37#include <stdio.h>
38#include <stdlib.h>
39#include <string.h>
40#include <sys/stat.h>
41#include <sys/types.h>
42#ifndef WIN32
43#include <sys/socket.h>
44#endif
45#include <stdarg.h>
46#include <sys/param.h>
47
48#ifdef HAVE_LIMITS_H
49#  include <limits.h>
50#endif
51
52#ifdef HAVE_SYS_LIMITS_H
53#  include <sys/limits.h>
54#endif
55
56#ifdef HAVE_NET_IF_ARP_H
57#  include <net/if_arp.h>
58#endif
59
60#ifdef HAVE_NET_IF_H
61#  include <net/if.h>
62#endif
63
64#ifdef HAVE_NETINET_IN_H
65#  include <netinet/in.h>
66#endif
67
68#ifdef HAVE_NET_ETHERNET_H
69#  include <net/ethernet.h>
70#endif
71
72#ifdef HAVE_NETINET_IF_ETHER_H
73#  include <netinet/if_ether.h>
74#endif
75
76#include <time.h>
77#ifdef WIN32
78#include <sys/timeb.h>
79#endif
80
81#include "libtrace.h"
82#include "libtrace_int.h"
83
84#ifdef HAVE_PCAP_BPF_H
85#  include <pcap-bpf.h>
86#else
87#  ifdef HAVE_NET_BPF_H
88#    include <net/bpf.h>
89#  endif
90#endif
91
92
93#include "libtrace_int.h"
94#include "format_helper.h"
95#include "rt_protocol.h"
96#include "hash_toeplitz.h"
97
98#include <pthread.h>
99#include <signal.h>
100
101
102extern int libtrace_parallel;
103
104struct multithreading_stats {
105        uint64_t full_queue_hits;
106        uint64_t wait_for_fill_complete_hits;
107} contention_stats[1024];
108
109
110/**
111 * @return True if the format supports parallel threads.
112 */
113static inline bool trace_supports_parallel(libtrace_t *trace)
114{
115        assert(trace);
116        assert(trace->format);
117        if (trace->format->pstart_input)
118                return true;
119        else
120                return false;
121        //return trace->format->pstart_input;
122}
123
124DLLEXPORT void print_contention_stats(libtrace_t *libtrace) {
125        int i;
126        struct multithreading_stats totals = {0};
127        for (i = 0; i < libtrace->perpkt_thread_count ; i++) {
128                printf("\nStats for perpkt thread#%d\n", i);
129                printf("\tfull_queue_hits: %"PRIu64"\n", contention_stats[i].full_queue_hits);
130                totals.full_queue_hits += contention_stats[i].full_queue_hits;
131                printf("\twait_for_fill_complete_hits: %"PRIu64"\n", contention_stats[i].wait_for_fill_complete_hits);
132                totals.wait_for_fill_complete_hits += contention_stats[i].wait_for_fill_complete_hits;
133        }
134        printf("\nTotals for perpkt threads\n");
135        printf("\tfull_queue_hits: %"PRIu64"\n", totals.full_queue_hits);
136        printf("\twait_for_fill_complete_hits: %"PRIu64"\n", totals.wait_for_fill_complete_hits);
137
138        return;
139}
140
141inline void libtrace_zero_thread(libtrace_thread_t * t) {
142        t->trace = NULL;
143        t->ret = NULL;
144        t->type = THREAD_EMPTY;
145        libtrace_zero_ringbuffer(&t->rbuffer);
146        libtrace_zero_vector(&t->vector);
147        libtrace_zero_deque(&t->deque);
148        t->recorded_first = false;
149        t->perpkt_num = -1;
150}
151
152// Ints are aligned int is atomic so safe to read and write at same time
153// However write must be locked, read doesn't (We never try read before written to table)
154libtrace_thread_t * get_thread_table(libtrace_t *libtrace) {
155        int i = 0;
156        pthread_t tid = pthread_self();
157
158        for (;i<libtrace->perpkt_thread_count ;++i) {
159                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
160                        return &libtrace->perpkt_threads[i];
161        }
162        return NULL;
163}
164
165int get_thread_table_num(libtrace_t *libtrace) {
166        int i = 0;
167        pthread_t tid = pthread_self();
168        for (;i<libtrace->perpkt_thread_count; ++i) {
169                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
170                        return i;
171        }
172        return -1;
173}
174
175static libtrace_thread_t * get_thread_descriptor(libtrace_t *libtrace) {
176        libtrace_thread_t *ret;
177        if (!(ret = get_thread_table(libtrace))) {
178                pthread_t tid = pthread_self();
179                // Check if we are reducer or something else
180                if (pthread_equal(tid, libtrace->reducer_thread.tid))
181                        ret = &libtrace->reducer_thread;
182                else if (pthread_equal(tid, libtrace->hasher_thread.tid))
183                        ret = &libtrace->hasher_thread;
184                else
185                        ret = NULL;
186        }
187        return ret;
188}
189
190/**
191 * Holds threads in a paused state, until released by broadcasting
192 * the condition mutex.
193 */
194static void trace_thread_pause(libtrace_t *trace) {
195        printf("Pausing thread #%d\n", get_thread_table_num(trace));
196        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
197        trace->perpkts_pausing++;
198        pthread_cond_broadcast(&trace->perpkt_cond);
199        while (!trace->started) {
200                assert(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock) == 0);
201        }
202        trace->perpkts_pausing--;
203        pthread_cond_broadcast(&trace->perpkt_cond);
204        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
205        printf("Releasing thread #%d\n", get_thread_table_num(trace));
206}
207
208/**
209 * The is the entry point for our packet processing threads.
210 */
211static void* perpkt_threads_entry(void *data) {
212        libtrace_t *trace = (libtrace_t *)data;
213        libtrace_thread_t * t;
214        libtrace_message_t message;
215        libtrace_packet_t *packet = NULL;
216
217        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
218        t = get_thread_table(trace);
219        assert(t);
220        //printf("Yay Started perpkt thread #%d\n", (int) get_thread_table_num(trace));
221        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
222
223        /* ~~~~~~~~~~~ Setup complete now we loop ~~~~~~~~~~~~~~~ */
224        // Send a message to say we've started
225
226        message.code = MESSAGE_STARTED;
227        message.sender = t;
228        message.additional = NULL;
229
230        // Let the per_packet function know we have started
231        (*trace->per_pkt)(trace, NULL, &message, t);
232
233
234        for (;;) {
235                int psize;
236
237                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
238                        switch (message.code) {
239                                case MESSAGE_PAUSE:
240                                        trace_thread_pause(trace);
241                                        break;
242                                case MESSAGE_STOP:
243                                        goto stop;
244                        }
245                        (*trace->per_pkt)(trace, NULL, &message, t);
246                        continue;
247                }
248
249                if (trace->perpkt_thread_count == 1) {
250                        if (!packet) {
251                                if (!libtrace_ringbuffer_try_sread_bl(&trace->packet_freelist, (void **) &packet))
252                                        packet = trace_create_packet();
253                        }
254                        assert(packet);
255                        if ((psize = trace_read_packet(trace, packet)) <1) {
256                                break;
257                        }
258                } else {
259                        psize = trace_pread_packet(trace, &packet);
260                }
261
262                if (psize > 0) {
263                        packet = (*trace->per_pkt)(trace, packet, NULL, t);
264                        continue;
265                }
266
267                if (psize == -2)
268                        continue; // We have a message
269
270                if (psize < 1) { // consider sending a message
271                        break;
272                }
273
274        }
275
276
277stop:
278        /* ~~~~~~~~~~~~~~ Trace is finished do tear down ~~~~~~~~~~~~~~~~~~~~~ */
279        // Let the per_packet function know we have stopped
280        message.code = MESSAGE_STOPPED;
281        message.sender = message.additional = NULL;
282        (*trace->per_pkt)(trace, NULL, &message, t);
283
284        // And we're at the end free the memories
285        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
286        t->state = THREAD_FINISHED;
287        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
288
289        // Notify only after we've defiantly set the state to finished
290        message.code = MESSAGE_PERPKT_ENDED;
291        message.additional = NULL;
292        trace_send_message_to_reducer(trace, &message);
293
294        pthread_exit(NULL);
295};
296
297/** True if trace has dedicated hasher thread otherwise false */
298inline int trace_has_dedicated_hasher(libtrace_t * libtrace);
299inline int trace_has_dedicated_hasher(libtrace_t * libtrace)
300{
301        return libtrace->hasher_thread.type == THREAD_HASHER;
302}
303
304/**
305 * The start point for our single threaded hasher thread, this will read
306 * and hash a packet from a data source and queue it against the correct
307 * core to process it.
308 */
309static void* hasher_start(void *data) {
310        libtrace_t *trace = (libtrace_t *)data;
311        libtrace_thread_t * t;
312        int i;
313        libtrace_packet_t * packet;
314
315        assert(trace_has_dedicated_hasher(trace));
316        /* Wait until all threads are started and objects are initialised (ring buffers) */
317        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
318        t = &trace->hasher_thread;
319        assert(t->type == THREAD_HASHER && pthread_equal(pthread_self(), t->tid));
320        printf("Hasher Thread started\n");
321        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
322        int pkt_skipped = 0;
323        /* Read all packets in then hash and queue against the correct thread */
324        while (1) {
325                int thread;
326                if (!pkt_skipped && !libtrace_ringbuffer_try_sread_bl(&trace->packet_freelist, (void **) &packet))
327                        packet = trace_create_packet();
328                assert(packet);
329
330                if (libtrace_halt) // Signal to die has been sent - TODO
331                        break;
332
333                if ((packet->error = trace_read_packet(trace, packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
334                        break; /* We are EOF or error'd either way we stop  */
335                }
336
337                /* We are guaranteed to have a hash function i.e. != NULL */
338                trace_packet_set_hash(packet, (*trace->hasher)(packet, trace->hasher_data));
339                thread = trace_packet_get_hash(packet) % trace->perpkt_thread_count;
340                /* Blocking write to the correct queue - I'm the only writer */
341                if (trace->perpkt_threads[thread].state != THREAD_FINISHED) {
342                        libtrace_ringbuffer_write(&trace->perpkt_threads[thread].rbuffer, packet);
343                        pkt_skipped = 0;
344                } else {
345                        pkt_skipped = 1; // Reuse that packet no one read it
346                }
347        }
348
349        /* Broadcast our last failed read to all threads */
350        for (i = 0; i < trace->perpkt_thread_count; i++) {
351                libtrace_packet_t * bcast;
352                printf("Broadcasting error/EOF now the trace is over\n");
353                if (i == trace->perpkt_thread_count - 1) {
354                        bcast = packet;
355                } else {
356                        bcast = trace_create_packet();
357                        bcast->error = packet->error;
358                }
359                assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
360                if (trace->perpkt_threads[i].state != THREAD_FINISHED) {
361                        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
362                        // Unlock early otherwise we could deadlock
363                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, NULL);
364                } else {
365                        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
366                }
367        }
368        // We dont need to free packet
369
370        // And we're at the end free the memories
371        t->state = THREAD_FINISHED;
372
373        // Notify only after we've defiantly set the state to finished
374        libtrace_message_t message;
375        message.code = MESSAGE_PERPKT_ENDED;
376        message.additional = NULL;
377        trace_send_message_to_reducer(trace, &message);
378
379        // TODO remove from TTABLE t sometime
380        pthread_exit(NULL);
381};
382
383/**
384 * Moves src into dest(Complete copy) and copies the memory buffer and
385 * its flags from dest into src ready for reuse without needing extra mallocs.
386 */
387static inline void swap_packets(libtrace_packet_t *dest, libtrace_packet_t *src) {
388        // Save the passed in buffer status
389        assert(dest->trace == NULL); // Must be a empty packet
390        void * temp_buf = dest->buffer;
391        buf_control_t temp_buf_control = dest->buf_control;
392        // Completely copy StoredPacket into packet
393        memcpy(dest, src, sizeof(libtrace_packet_t));
394        // Set the buffer settings on the returned packet
395        src->buffer = temp_buf;
396        src->buf_control = temp_buf_control;
397        src->trace = NULL;
398}
399
400/* Our simplest case when a thread becomes ready it can obtain a exclusive
401 * lock to read a packet from the underlying trace.
402 */
403inline static int trace_pread_packet_first_in_first_served(libtrace_t *libtrace, libtrace_packet_t **packet)
404{
405        // We need this to fill the 'first' packet table
406        libtrace_thread_t *t = get_thread_table(libtrace);
407        if (!*packet) {
408                if (!libtrace_ringbuffer_try_sread_bl(&libtrace->packet_freelist, (void **) packet))
409                        *packet = trace_create_packet();
410        }
411        assert(*packet);
412        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
413        /* Read a packet */
414        (*packet)->error = trace_read_packet(libtrace, *packet);
415        // Doing this inside the lock ensures the first packet is always
416        // recorded first
417        store_first_packet(libtrace, *packet, t);
418
419        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
420        return (*packet)->error;
421}
422
423/**
424 * For the case that we have a dedicated hasher thread
425 * 1. We read a packet from our buffer
426 * 2. Move that into the packet provided (packet)
427 */
428inline static int trace_pread_packet_hasher_thread(libtrace_t *libtrace, libtrace_packet_t **packet)
429{
430        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
431        libtrace_thread_t* t = &libtrace->perpkt_threads[this_thread];
432
433        if (*packet) // Recycle the old get the new
434                if (!libtrace_ringbuffer_try_swrite_bl(&libtrace->packet_freelist, (void *) *packet))
435                        trace_destroy_packet(*packet);
436        *packet = libtrace_ringbuffer_read(&t->rbuffer);
437
438        if (*packet) {
439                return 1;
440        } else {
441                printf("Got a NULL packet the trace is over\n");
442                return -1; // We are done for some reason
443        }
444}
445
446/**
447 * Tries to read from our queue and returns 1 if a packet was retrieved
448 */
449static inline int try_waiting_queue(libtrace_t *libtrace, libtrace_thread_t * t, libtrace_packet_t **packet, int * ret)
450{
451        libtrace_packet_t* retrived_packet;
452
453        /* Lets see if we have one waiting */
454        if (libtrace_ringbuffer_try_read(&t->rbuffer, (void **) &retrived_packet)) {
455                /* Copy paste from trace_pread_packet_hasher_thread() except that we try read (non-blocking) */
456                assert(retrived_packet);
457
458                if (*packet) // Recycle the old get the new
459                        if (!libtrace_ringbuffer_try_swrite_bl(&libtrace->packet_freelist, (void *) *packet))
460                                trace_destroy_packet(*packet);
461                *packet = retrived_packet;
462                *ret = (*packet)->error;
463                return 1;
464        }
465        return 0;
466}
467
468/**
469 * Allows us to ensure all threads are finished writing to our threads ring_buffer
470 * before returning EOF/error.
471 */
472inline static int trace_handle_finishing_perpkt(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t)
473{
474        /* We are waiting for the condition that another thread ends to check
475         * our queue for new data, once all threads end we can go to finished */
476        bool complete = false;
477        int ret;
478
479        do {
480                // Wait for a thread to end
481                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
482
483                // Check before
484                if (libtrace->perpkts_finishing == libtrace->perpkt_thread_count) {
485                        complete = true;
486                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
487                        continue;
488                }
489
490                assert(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock) == 0);
491
492                // Check after
493                if (libtrace->perpkts_finishing == libtrace->perpkt_thread_count) {
494                        complete = true;
495                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
496                        continue;
497                }
498
499                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
500
501                // Always trying to keep our buffer empty for the unlikely case more threads than buffer space want to write into our queue
502                if(try_waiting_queue(libtrace, t, packet, &ret))
503                        return ret;
504        } while (!complete);
505
506        // We can only end up here once all threads complete
507        try_waiting_queue(libtrace, t, packet, &ret);
508
509        return ret;
510        // TODO rethink this logic fix bug here
511}
512
513/**
514 * Expects the libtrace_lock to not be held
515 */
516inline static int trace_finish_perpkt(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t)
517{
518        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
519        t->state = THREAD_FINISHING;
520        libtrace->perpkts_finishing++;
521        pthread_cond_broadcast(&libtrace->perpkt_cond);
522        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
523        return trace_handle_finishing_perpkt(libtrace, packet, t);
524}
525
526/**
527 * This case is much like the dedicated hasher, except that we will become
528 * hasher if we don't have a a packet waiting.
529 *
530 * Note: This is only every used if we have are doing hashing.
531 *
532 * TODO: Can block on zero copy formats such as ring: and dpdk: if the
533 * queue sizes in total are larger than the ring size.
534 *
535 * 1. We read a packet from our buffer
536 * 2. Move that into the packet provided (packet)
537 */
538inline static int trace_pread_packet_hash_locked(libtrace_t *libtrace, libtrace_packet_t **packet)
539{
540        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
541        libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
542        int thread, ret/*, psize*/;
543
544        while (1) {
545                if(try_waiting_queue(libtrace, t, packet, &ret))
546                        return ret;
547                // Can still block here if another thread is writing to a full queue
548                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
549
550                // Its impossible for our own queue to overfill, because no one can write
551                // when we are in the lock
552                if(try_waiting_queue(libtrace, t, packet, &ret)) {
553                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
554                        return ret;
555                }
556
557                // Another thread cannot write a packet because a queue has filled up. Is it ours?
558                if (libtrace->perpkt_queue_full) {
559                        contention_stats[this_thread].wait_for_fill_complete_hits++;
560                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
561                        continue;
562                }
563
564                if (!*packet) {
565                        if (!libtrace_ringbuffer_try_sread_bl(&libtrace->packet_freelist, (void **) packet))
566                                *packet = trace_create_packet();
567                }
568                assert(*packet);
569
570                // If we fail here we can guarantee that our queue is empty (and no new data will be added because we hold the lock)
571                if (libtrace_halt || ((*packet)->error = trace_read_packet(libtrace, *packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
572                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
573                        if (libtrace_halt)
574                                return 0;
575                        else
576                                return (*packet)->error;
577                }
578
579                trace_packet_set_hash(*packet, (*libtrace->hasher)(*packet, libtrace->hasher_data));
580                thread = trace_packet_get_hash(*packet) % libtrace->perpkt_thread_count;
581                if (thread == this_thread) {
582                        // If it's this thread we must be in order because we checked the buffer once we got the lock
583                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
584                        return (*packet)->error;
585                }
586
587                if (libtrace->perpkt_threads[thread].state != THREAD_FINISHED) {
588                        while (!libtrace_ringbuffer_try_swrite_bl(&libtrace->perpkt_threads[thread].rbuffer, *packet)) {
589                                libtrace->perpkt_queue_full = true;
590                                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
591                                contention_stats[this_thread].full_queue_hits++;
592                                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
593                        }
594                        *packet = NULL;
595                        libtrace->perpkt_queue_full = false;
596                } else {
597                        /* We can get here if the user closes the thread before natural completion/or error */
598                        assert (!"packet_hash_locked() The user terminated the trace in a abnormal manner");
599                }
600                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
601        }
602}
603
604/**
605 * This case is much like the dedicated hasher, except that we will become
606 * hasher if we don't have a a packet waiting.
607 *
608 * TODO: You can loose the tail of a trace if the final thread
609 * fills its own queue and therefore breaks early and doesn't empty the sliding window.
610 *
611 * TODO: Can block on zero copy formats such as ring: and dpdk: if the
612 * queue sizes in total are larger than the ring size.
613 *
614 * 1. We read a packet from our buffer
615 * 2. Move that into the packet provided (packet)
616 */
617inline static int trace_pread_packet_sliding_window(libtrace_t *libtrace, libtrace_packet_t **packet)
618{
619        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
620        libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
621        int ret, i, thread/*, psize*/;
622
623        if (t->state == THREAD_FINISHING)
624                return trace_handle_finishing_perpkt(libtrace, packet, t);
625
626        while (1) {
627                // Check if we have packets ready
628                if(try_waiting_queue(libtrace, t, packet, &ret))
629                        return ret;
630
631                // We limit the number of packets we get to the size of the sliding window
632                // such that it is impossible for any given thread to fail to store a packet
633                assert(sem_wait(&libtrace->sem) == 0);
634                /*~~~~Single threaded read of a packet~~~~*/
635                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
636
637                /* Re-check our queue things we might have data waiting */
638                if(try_waiting_queue(libtrace, t, packet, &ret)) {
639                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
640                        assert(sem_post(&libtrace->sem) == 0);
641                        return ret;
642                }
643
644                // TODO put on *proper* condition variable
645                if (libtrace->perpkt_queue_full) {
646                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
647                        assert(sem_post(&libtrace->sem) == 0);
648                        contention_stats[this_thread].wait_for_fill_complete_hits++;
649                        continue;
650                }
651
652                if (!*packet) {
653                        if (!libtrace_ringbuffer_try_sread_bl(&libtrace->packet_freelist, (void **) packet))
654                                *packet = trace_create_packet();
655                }
656                assert(*packet);
657
658                if (libtrace_halt || ((*packet)->error = trace_read_packet(libtrace, *packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
659                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
660                        assert(sem_post(&libtrace->sem) == 0);
661                        // Finish this thread ensuring that any data written later by another thread is retrieved also
662                        if (libtrace_halt)
663                                return 0;
664                        else
665                                return trace_finish_perpkt(libtrace, packet, t);
666                }
667                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
668
669                /* ~~~~Multiple threads can run the hasher~~~~ */
670                trace_packet_set_hash(*packet, (*libtrace->hasher)(*packet, libtrace->hasher_data));
671
672                /* Yes this is correct opposite read lock for a write operation */
673                assert(pthread_rwlock_rdlock(&libtrace->window_lock) == 0);
674                if (!libtrace_slidingwindow_try_write(&libtrace->sliding_window, trace_packet_get_order(*packet), *packet))
675                        assert(!"Semaphore should stop us from ever overfilling the sliding window");
676                assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
677                *packet = NULL;
678
679                // Always try read any data from the sliding window
680                while (libtrace_slidingwindow_read_ready(&libtrace->sliding_window)) {
681                        assert(pthread_rwlock_wrlock(&libtrace->window_lock) == 0);
682                        if (libtrace->perpkt_queue_full) {
683                                // I might be the holdup in which case if I can read my queue I should do that and return
684                                if(try_waiting_queue(libtrace, t, packet, &ret)) {
685                                        assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
686                                        return ret;
687                                }
688                                assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
689                                continue;
690                        }
691                        // Read greedily as many as we can
692                        while (libtrace_slidingwindow_try_read(&libtrace->sliding_window, (void **) packet, NULL)) {
693                                thread = trace_packet_get_hash(*packet) % libtrace->perpkt_thread_count;
694                                if (libtrace->perpkt_threads[thread].state != THREAD_FINISHED) {
695                                        while (!libtrace_ringbuffer_try_swrite_bl(&libtrace->perpkt_threads[thread].rbuffer, *packet)) {
696                                                if (this_thread == thread)
697                                                {
698                                                        // TODO think about this case more because we have to stop early if this were to happen on the last read
699                                                        // before EOF/error we might not have emptied the sliding window
700                                                        printf("!~!~!~!~!~!~In this Code~!~!~!~!\n");
701                                                        // Its our queue we must have a packet to read out
702                                                        if(try_waiting_queue(libtrace, t, packet, &ret)) {
703                                                                // We must be able to write this now 100% without fail
704                                                                libtrace_ringbuffer_write(&libtrace->perpkt_threads[thread].rbuffer, *packet);
705                                                                assert(sem_post(&libtrace->sem) == 0);
706                                                                assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
707                                                                return ret;
708                                                        } else {
709                                                                assert(!"Our queue is full but I cannot read from it??");
710                                                        }
711                                                }
712                                                // Not us we have to give the other threads a chance to write there packets then
713                                                libtrace->perpkt_queue_full = true;
714                                                assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
715                                                for (i = 0; i < libtrace->perpkt_thread_count-1; i++) // Release all other threads to read there packets
716                                                        assert(sem_post(&libtrace->sem) == 0);
717
718                                                contention_stats[this_thread].full_queue_hits++;
719                                                assert(pthread_rwlock_wrlock(&libtrace->window_lock) == 0);
720                                                // Grab these back
721                                                for (i = 0; i < libtrace->perpkt_thread_count-1; i++) // Release all other threads to read there packets
722                                                        assert(sem_wait(&libtrace->sem) == 0);
723                                                libtrace->perpkt_queue_full = false;
724                                        }
725                                        assert(sem_post(&libtrace->sem) == 0);
726                                        *packet = NULL;
727                                } else {
728                                        // Cannot write to a queue if no ones waiting (I think this is unreachable)
729                                        // in the general case (unless the user ends early without proper clean up).
730                                        assert (!"unreachable code??");
731                                }
732                        }
733                        assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
734                }
735                // Now we go back to checking our queue anyways
736        }
737}
738
739
740/**
741 * For the first packet of each queue we keep a copy and note the system
742 * time it was received at.
743 *
744 * This is used for finding the first packet when playing back a trace
745 * in trace time. And can be used by real time applications to print
746 * results out every XXX seconds.
747 */
748inline void store_first_packet(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t)
749{
750        if (!t->recorded_first) {
751                struct timeval tv;
752                libtrace_packet_t * dup;
753                // For what it's worth we can call these outside of the lock
754                gettimeofday(&tv, NULL);
755                dup = trace_copy_packet(packet);
756                assert(pthread_spin_lock(&libtrace->first_packets.lock) == 0);
757                libtrace->first_packets.packets[t->perpkt_num].packet = dup;
758                //printf("Stored first packet time=%f\n", trace_get_seconds(dup));
759                memcpy(&libtrace->first_packets.packets[t->perpkt_num].tv, &tv, sizeof(tv));
760                // Now update the first
761                libtrace->first_packets.count++;
762                if (libtrace->first_packets.count == 1) {
763                        // We the first entry hence also the first known packet
764                        libtrace->first_packets.first = t->perpkt_num;
765                } else {
766                        // Check if we are newer than the previous 'first' packet
767                        size_t first = libtrace->first_packets.first;
768                        if (trace_get_seconds(dup) <
769                                trace_get_seconds(libtrace->first_packets.packets[first].packet))
770                                libtrace->first_packets.first = t->perpkt_num;
771                }
772                assert(pthread_spin_unlock(&libtrace->first_packets.lock) == 0);
773                libtrace_message_t mesg;
774                mesg.code = MESSAGE_FIRST_PACKET;
775                mesg.additional = NULL;
776                trace_send_message_to_reducer(libtrace, &mesg);
777                t->recorded_first = true;
778        }
779}
780
781/**
782 * Returns 1 if it's certain that the first packet is truly the first packet
783 * rather than a best guess based upon threads that have published so far.
784 * Otherwise 0 is returned.
785 * It's recommended that this result is stored rather than calling this
786 * function again.
787 */
788DLLEXPORT int retrive_first_packet(libtrace_t *libtrace, libtrace_packet_t **packet, struct timeval **tv)
789{
790        int ret = 0;
791        assert(pthread_spin_lock(&libtrace->first_packets.lock) == 0);
792        if (libtrace->first_packets.count) {
793                *packet = libtrace->first_packets.packets[libtrace->first_packets.first].packet;
794                *tv = &libtrace->first_packets.packets[libtrace->first_packets.first].tv;
795                if (libtrace->first_packets.count == libtrace->perpkt_thread_count) {
796                        ret = 1;
797                } else {
798                        struct timeval curr_tv;
799                        // If a second has passed since the first entry we will assume this is the very first packet
800                        gettimeofday(&curr_tv, NULL);
801                        if (curr_tv.tv_sec > (*tv)->tv_sec) {
802                                if(curr_tv.tv_usec > (*tv)->tv_usec || curr_tv.tv_sec - (*tv)->tv_sec > 1) {
803                                        ret = 1;
804                                }
805                        }
806                }
807        } else {
808                *packet = NULL;
809                *tv = NULL;
810        }
811        assert(pthread_spin_unlock(&libtrace->first_packets.lock) == 0);
812        return ret;
813}
814
815
816DLLEXPORT inline uint64_t tv_to_usec(struct timeval *tv)
817{
818        return (uint64_t) tv->tv_sec*1000000ull + (uint64_t) tv->tv_usec;
819}
820
821inline static struct timeval usec_to_tv(uint64_t usec)
822{
823        struct timeval tv;
824        tv.tv_sec = usec / 1000000;
825        tv.tv_usec = usec % 1000000;
826        return tv;
827}
828
829
830/**
831 * Delays a packets playback so the playback will be in trace time
832 */
833static inline void delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t) {
834        struct timeval curr_tv, pkt_tv;
835        uint64_t next_release = t->tracetime_offset_usec; // Time at which to release the packet
836        uint64_t curr_usec;
837        /* Tracetime we might delay releasing this packet */
838        if (!t->tracetime_offset_usec) {
839                libtrace_packet_t * first_pkt;
840                struct timeval *sys_tv;
841                int64_t initial_offset;
842                int stable = retrive_first_packet(libtrace, &first_pkt, &sys_tv);
843                assert(first_pkt);
844                pkt_tv = trace_get_timeval(first_pkt);
845                initial_offset = (int64_t)tv_to_usec(sys_tv) - (int64_t)tv_to_usec(&pkt_tv);
846                if (stable)
847                        // 0->1 because 0 is used to mean unset
848                        t->tracetime_offset_usec = initial_offset ? initial_offset: 1;
849                next_release = initial_offset;
850        }
851        /* next_release == offset */
852        pkt_tv = trace_get_timeval(packet);
853        next_release += tv_to_usec(&pkt_tv);
854        gettimeofday(&curr_tv, NULL);
855        curr_usec = tv_to_usec(&curr_tv);
856        if (next_release > curr_usec) {
857                // We need to wait
858                struct timeval delay_tv = usec_to_tv(next_release-curr_usec);
859                //printf("WAITING for %d.%d next=%"PRIu64" curr=%"PRIu64" seconds packettime %f\n", delay_tv.tv_sec, delay_tv.tv_usec, next_release, curr_usec, trace_get_seconds(packet));
860                select(0, NULL, NULL, NULL, &delay_tv);
861        }
862}
863
864/* Read one packet from the trace into a buffer. Note that this function will
865 * block until a packet is read (or EOF is reached).
866 *
867 * @param libtrace      the libtrace opaque pointer
868 * @param packet        the packet opaque pointer
869 * @returns 0 on EOF, negative value on error
870 *
871 * Note this is identical to read_packet but calls pread_packet instead of
872 * read packet in the format.
873 *
874 */
875static inline int trace_pread_packet_wrapper(libtrace_t *libtrace, libtrace_packet_t *packet) {
876
877        assert(libtrace && "You called trace_read_packet() with a NULL libtrace parameter!\n");
878        if (trace_is_err(libtrace))
879                return -1;
880        if (!libtrace->started) {
881                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,"You must call libtrace_start() before trace_read_packet()\n");
882                return -1;
883        }
884        if (!(packet->buf_control==TRACE_CTRL_PACKET || packet->buf_control==TRACE_CTRL_EXTERNAL)) {
885                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,"Packet passed to trace_read_packet() is invalid\n");
886                return -1;
887        }
888        assert(packet);
889
890        if (libtrace->format->read_packet) {
891                do {
892                        size_t ret;
893                        /* Finalise the packet, freeing any resources the format module
894                         * may have allocated it and zeroing all data associated with it.
895                         */
896                        trace_fin_packet(packet);
897                        /* Store the trace we are reading from into the packet opaque
898                         * structure */
899                        packet->trace = libtrace;
900                        ret=libtrace->format->pread_packet(libtrace,packet);
901                        if (ret==(size_t)-1 || ret==(size_t)-2 || ret==0) {
902                                return ret;
903                        }
904                        if (libtrace->filter) {
905                                /* If the filter doesn't match, read another
906                                 * packet
907                                 */
908                                if (!trace_apply_filter(libtrace->filter,packet)){
909                                        ++libtrace->filtered_packets;
910                                        continue;
911                                }
912                        }
913                        if (libtrace->snaplen>0) {
914                                /* Snap the packet */
915                                trace_set_capture_length(packet,
916                                                libtrace->snaplen);
917                        }
918                        trace_packet_set_order(packet, libtrace->accepted_packets);
919                        ++libtrace->accepted_packets;
920                        return ret;
921                } while(1);
922        }
923        trace_set_err(libtrace,TRACE_ERR_UNSUPPORTED,"This format does not support reading packets\n");
924        return ~0U;
925}
926
927/**
928 * Read a packet from the parallel trace
929 */
930DLLEXPORT int trace_pread_packet(libtrace_t *libtrace, libtrace_packet_t **packet)
931{
932        int ret;
933        libtrace_thread_t *t = get_thread_table(libtrace);
934
935        // Cleanup the packet passed back
936        if (*packet)
937                trace_fin_packet(*packet);
938
939        if (libtrace->format->pread_packet) {
940                if (!*packet)
941                        *packet = trace_create_packet();
942                ret = trace_pread_packet_wrapper(libtrace, *packet);
943        } else  if (!libtrace->hasher) {
944                /* We don't care about which core a packet goes to */
945                ret =  trace_pread_packet_first_in_first_served(libtrace, packet);
946        } else if (trace_has_dedicated_hasher(libtrace)) {
947                ret = trace_pread_packet_hasher_thread(libtrace, packet);
948        } else if (libtrace->reducer_flags & PERPKT_USE_SLIDING_WINDOW) {
949                ret = trace_pread_packet_sliding_window(libtrace, packet);
950        } else {
951                ret = trace_pread_packet_hash_locked(libtrace, packet);
952        }
953
954        // Formats can also optionally do this internally to ensure the first
955        // packet is always reported correctly
956        if (ret > 0) {
957                store_first_packet(libtrace, *packet, t);
958                if (libtrace->tracetime)
959                        delay_tracetime(libtrace, *packet, t);
960        }
961
962        return ret;
963}
964
965/* Starts perpkt threads
966 * @return threads_started
967 */
968static inline int trace_start_perpkt_threads (libtrace_t *libtrace) {
969        int i;
970
971        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
972                libtrace_thread_t *t = &libtrace->perpkt_threads[i];
973                assert(pthread_create(&t->tid, NULL, perpkt_threads_entry, (void *) libtrace) == 0);
974        }
975        return libtrace->perpkt_thread_count;
976}
977
978/* Start an input trace in a parallel fashion.
979 *
980 * @param libtrace      the input trace to start
981 * @param global_blob some global data you can share with the new thread
982 * @returns 0 on success
983 */
984DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob, fn_per_pkt per_pkt, fn_reducer reducer)
985{
986        int i;
987        sigset_t sig_before, sig_block_all;
988
989        assert(libtrace);
990        if (trace_is_err(libtrace))
991                return -1;;
992        if (libtrace->perpkts_pausing != 0) {
993                printf("Restarting trace\n");
994                libtrace->format->pstart_input(libtrace);
995                // TODO empty any queues out here //
996                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
997                libtrace->started = true;
998                assert(pthread_cond_broadcast(&libtrace->perpkt_cond) == 0);
999                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1000                return 0;
1001        }
1002
1003        libtrace_parallel = 1;
1004
1005        // Store the user defined things against the trace
1006        libtrace->global_blob = global_blob;
1007        libtrace->per_pkt = per_pkt;
1008        libtrace->reducer = reducer;
1009        libtrace->perpkts_finishing = 0;
1010        // libtrace->hasher = &rand_hash; /* Hasher now set via option */
1011
1012        assert(pthread_mutex_init(&libtrace->libtrace_lock, NULL) == 0);
1013        assert(pthread_cond_init(&libtrace->perpkt_cond, NULL) == 0);
1014        assert(pthread_rwlock_init(&libtrace->window_lock, NULL) == 0);
1015        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1016
1017        // Set default buffer sizes
1018        if (libtrace->perpkt_buffer_size <= 0)
1019                libtrace->perpkt_buffer_size = 1000;
1020
1021        if (libtrace->perpkt_thread_count <= 0)
1022                libtrace->perpkt_thread_count = 2; // XXX scale to system
1023
1024        if(libtrace->packet_freelist_size <= 0)
1025                libtrace->packet_freelist_size = (libtrace->perpkt_buffer_size + 1) * libtrace->perpkt_thread_count;
1026
1027        if(libtrace->packet_freelist_size <
1028                (libtrace->perpkt_buffer_size + 1) * libtrace->perpkt_thread_count)
1029                fprintf(stderr, "WARNING deadlocks may occur and extra memory allocating buffer sizes (packet_freelist_size) mismatched\n");
1030
1031        libtrace->started=true; // Before we start the threads otherwise we could have issues
1032        /* Disable signals - Pthread signal handling */
1033
1034        sigemptyset(&sig_block_all);
1035
1036        assert(pthread_sigmask(SIG_SETMASK, &sig_block_all, &sig_before) == 0);
1037
1038        // If we are using a hasher start it
1039        if (libtrace->hasher && libtrace->hasher_thread.type == THREAD_HASHER) {
1040                libtrace_thread_t *t = &libtrace->hasher_thread;
1041                t->trace = libtrace;
1042                t->ret = NULL;
1043                t->type = THREAD_HASHER;
1044                t->state = THREAD_RUNNING;
1045                assert(pthread_create(&t->tid, NULL, hasher_start, (void *) libtrace) == 0);
1046        } else {
1047                libtrace->hasher_thread.type = THREAD_EMPTY;
1048        }
1049        libtrace_ringbuffer_init(&libtrace->packet_freelist, libtrace->packet_freelist_size, LIBTRACE_RINGBUFFER_POLLING);
1050        libtrace_slidingwindow_init(&libtrace->sliding_window, libtrace->packet_freelist_size, 0);
1051        assert(sem_init(&libtrace->sem, 0, libtrace->packet_freelist_size) == 0);
1052        // This will be applied to every new thread that starts, i.e. they will block all signals
1053        // Lets start a fixed number of reading threads
1054
1055        // For now we never have a dedicated thread for the reducer
1056        // i.e. This main thread is used as the reducer
1057        libtrace->reducer_thread.tid = pthread_self();
1058        libtrace->reducer_thread.type = THREAD_REDUCER;
1059        libtrace->reducer_thread.state = THREAD_RUNNING;
1060        libtrace_message_queue_init(&libtrace->reducer_thread.messages, sizeof(libtrace_message_t));
1061
1062        /* Ready some storages */
1063        libtrace->first_packets.first = 0;
1064        libtrace->first_packets.count = 0;
1065        assert(pthread_spin_init(&libtrace->first_packets.lock, 0) == 0);
1066        libtrace->first_packets.packets = calloc(libtrace->perpkt_thread_count, sizeof(struct  __packet_storage_magic_type));
1067
1068
1069        /* Start all of our perpkt threads */
1070        libtrace->perpkt_threads = calloc(sizeof(libtrace_thread_t), libtrace->perpkt_thread_count);
1071        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1072                libtrace_thread_t *t = &libtrace->perpkt_threads[i];
1073                t->trace = libtrace;
1074                t->ret = NULL;
1075                t->type = THREAD_PERPKT;
1076                t->state = THREAD_RUNNING;
1077                t->user_data = NULL;
1078                // t->tid DONE on create
1079                t->perpkt_num = i;
1080                if (libtrace->hasher)
1081                        libtrace_ringbuffer_init(&t->rbuffer, libtrace->perpkt_buffer_size, LIBTRACE_RINGBUFFER_POLLING);
1082                // Depending on the mode vector or deque might be chosen
1083                libtrace_vector_init(&t->vector, sizeof(libtrace_result_t));
1084                libtrace_deque_init(&t->deque, sizeof(libtrace_result_t));
1085                libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
1086                t->tmp_key = 0;
1087                t->tmp_data = NULL;
1088                t->recorded_first = false;
1089                assert(pthread_spin_init(&t->tmp_spinlock, 0) == 0);
1090                t->tracetime_offset_usec = 0;;
1091        }
1092
1093        int threads_started = 0;
1094        /* Setup the trace and start our threads */
1095        if (libtrace->perpkt_thread_count > 1 && libtrace->format->pstart_input) {
1096                printf("This format has direct support for p's\n");
1097                threads_started = libtrace->format->pstart_input(libtrace);
1098        } else {
1099                if (libtrace->format->start_input) {
1100                        threads_started=libtrace->format->start_input(libtrace);
1101                }
1102        }
1103        if (threads_started == 0)
1104                threads_started = trace_start_perpkt_threads(libtrace);
1105
1106
1107        // Revert back - Allow signals again
1108        assert(pthread_sigmask(SIG_SETMASK, &sig_before, NULL) == 0);
1109        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1110
1111        if (threads_started < 0)
1112                // Error
1113                return threads_started;
1114
1115        // TODO fix these leaks etc
1116        if (libtrace->perpkt_thread_count != threads_started)
1117                printf("Warning started threads not equal requested s=%d r=%d", threads_started, libtrace->perpkt_thread_count);
1118
1119
1120        return 0;
1121}
1122
1123/**
1124 * Pauses a trace, this should only be called by the main thread
1125 * 1. Set started = false
1126 * 2. All perpkt threads are paused waiting on a condition var
1127 * 3. Then call ppause on the underlying format if found
1128 * 4. Return with perpkts_pausing set to perpkt_count (Used when restarting so we reuse the threads)
1129 *
1130 * Once done you should be a able to modify the trace setup and call pstart again
1131 * TODO handle changing thread numbers
1132 */
1133DLLEXPORT int trace_ppause(libtrace_t *libtrace)
1134{
1135        libtrace_thread_t *t;
1136        int i;
1137        assert(libtrace);
1138        if (!libtrace->started) {
1139                trace_set_err(libtrace,TRACE_ERR_BAD_STATE, "You must call trace_start() before calling trace_ppause()");
1140                return -1;
1141        }
1142
1143        t = get_thread_table(libtrace);
1144
1145        // Set paused
1146        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1147        libtrace->started = false;
1148        pthread_cond_broadcast(&libtrace->perpkt_cond);
1149        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1150
1151        printf("Sending messages \n");
1152        // Stop threads, skip this one if its a perpkt
1153        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1154                if (&libtrace->perpkt_threads[i] != t) {
1155                        libtrace_message_t message;
1156                        message.code = MESSAGE_PAUSE;
1157                        message.additional = NULL;
1158                        trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message);
1159                }
1160        }
1161
1162        // Formats must support native message handling if a message is ready
1163        // Approach per Perry's suggestion is a non-blocking read
1164        // followed by a blocking read. XXX STRIP THIS OUT
1165
1166        if (t) {
1167                // A perpkt is doing the pausing interesting fake a extra thread paused
1168                // We rely on the user to not return before starting the trace again
1169                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1170                libtrace->perpkts_pausing++;
1171                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1172        }
1173
1174        printf("Threads are pausing\n");
1175
1176        // Do a early pause to kick threads out - XXX testing for int
1177        if (libtrace->format->pause_input)
1178                        libtrace->format->pause_input(libtrace);
1179
1180        // Wait for all threads to pause
1181        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1182        while (libtrace->perpkt_thread_count != libtrace->perpkts_pausing) {
1183                assert(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock) == 0);
1184        }
1185        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1186
1187        printf("Threads have paused\n");
1188
1189        if (trace_supports_parallel(libtrace)) {
1190                if (libtrace->format->ppause_input)
1191                        libtrace->format->ppause_input(libtrace);
1192                // TODO What happens if we don't have pause input??
1193        } else {
1194                printf("Trace is not parallel so we are doing a normal pause %s\n", libtrace->uridata);
1195                // This doesn't really work because this could be called by any thread
1196                // Maybe we should grab the lock here??
1197                if (libtrace->format->pause_input)
1198                        libtrace->format->pause_input(libtrace);
1199                // TODO What happens if we don't have pause input??
1200        }
1201
1202        return 0;
1203}
1204
1205/**
1206 * Stop trace finish prematurely as though it meet an EOF
1207 * This should only be called by the main thread
1208 * 1. Calls ppause
1209 * 2. Sends a message asking for threads to finish
1210 *
1211 */
1212DLLEXPORT int trace_pstop(libtrace_t *libtrace)
1213{
1214        int i;
1215        if (!libtrace->started) {
1216                trace_set_err(libtrace,TRACE_ERR_BAD_STATE, "You must call trace_start() before calling trace_pstop()");
1217                return -1;
1218        }
1219
1220        // Ensure all threads have paused and the underlying trace format has
1221        // been closed
1222        trace_ppause(libtrace);
1223
1224        // Now send a message asking the threads to stop
1225        // This will be retrieved before trying to read another packet
1226        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1227                libtrace_message_t message;
1228                message.code = MESSAGE_STOP;
1229                message.additional = NULL;
1230                trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message);
1231        }
1232
1233        // Now release the threads and let them stop
1234        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1235        libtrace->started = true;
1236        assert(pthread_cond_broadcast(&libtrace->perpkt_cond) == 0);
1237        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1238        return 0;
1239}
1240
1241/**
1242 * Set the hasher type along with a selected function, if hardware supports
1243 * that generic type of hashing it will be used otherwise the supplied
1244 * hasher function will be used and passed data when called.
1245 *
1246 * @return 0 if successful otherwise -1 on error
1247 */
1248DLLEXPORT int trace_set_hasher(libtrace_t *trace, enum hasher_types type, fn_hasher hasher, void *data) {
1249        int ret = -1;
1250        if (type == HASHER_HARDWARE || (type == HASHER_CUSTOM && !hasher) || (type == HASHER_BALANCE && hasher)) {
1251                return -1;
1252        }
1253
1254        // Save the requirements
1255        trace->hasher_type = type;
1256        if (hasher) {
1257                trace->hasher = hasher;
1258                trace->hasher_data = hasher;
1259        } else {
1260                trace->hasher = NULL;
1261                // TODO consider how to handle freeing this
1262                trace->hasher_data = NULL;
1263        }
1264
1265        // Try push this to hardware - NOTE hardware could do custom if
1266        // there is a more efficient way to apply it, in this case
1267        // it will simply grab the function out of libtrace_t
1268        if (trace->format->pconfig_input)
1269                ret = trace->format->pconfig_input(trace, TRACE_OPTION_SET_HASHER, &type);
1270
1271        if (ret == -1) {
1272                // We have to deal with this ourself
1273                // This most likely means single threaded reading of the trace
1274                if (!hasher) {
1275                        switch (type)
1276                        {
1277                                case HASHER_CUSTOM:
1278                                case HASHER_BALANCE:
1279                                        return 0;
1280                                case HASHER_BIDIRECTIONAL:
1281                                        trace->hasher = toeplitz_hash_packet;
1282                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
1283                                        toeplitz_init_config(trace->hasher_data, 1);
1284                                        return 0;
1285                                case HASHER_UNIDIRECTIONAL:
1286                                        trace->hasher = toeplitz_hash_packet;
1287                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
1288                                        toeplitz_init_config(trace->hasher_data, 1);
1289                                        return 0;
1290                                case HASHER_HARDWARE:
1291                                        return -1;
1292                        }
1293                        return -1;
1294                }
1295        } else {
1296                // The hardware is dealing with this yay
1297                trace->hasher_type = HASHER_HARDWARE;
1298        }
1299
1300        return 0;
1301}
1302
1303// Waits for all threads to finish
1304DLLEXPORT void trace_join(libtrace_t *libtrace) {
1305        int i;
1306
1307        /* Firstly wait for the perpkt threads to finish, since these are
1308         * user controlled */
1309        for (i=0; i< libtrace->perpkt_thread_count; i++) {
1310                //printf("Waiting to join with perpkt #%d\n", i);
1311                assert(pthread_join(libtrace->perpkt_threads[i].tid, NULL) == 0);
1312                //printf("Joined with perpkt #%d\n", i);
1313                // So we must do our best effort to empty the queue - so
1314                // the producer (or any other threads) don't block.
1315                libtrace_packet_t * packet;
1316                // Mark that we are no longer accepting packets
1317                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1318                libtrace->perpkt_threads[i].state = THREAD_FINISHED; // Important we are finished before we empty the buffer
1319                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1320                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
1321                        if (packet) // This could be NULL iff the perpkt finishes early
1322                                trace_destroy_packet(packet);
1323        }
1324
1325        /* Now the hasher */
1326        // XXX signal it to stop
1327        if (trace_has_dedicated_hasher(libtrace)) {
1328                printf("Waiting to join with the hasher\n");
1329                pthread_join(libtrace->hasher_thread.tid, NULL);
1330                printf("Joined with with the hasher\n");
1331                libtrace->hasher_thread.state = THREAD_FINISHED;
1332        }
1333
1334        // Now that everything is finished nothing can be touching our
1335        // buffers so clean them up
1336        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1337                // Its possible 1 packet got added by the reducer (or 1 per any other thread) since we cleaned up
1338                // if they lost timeslice before-during a write
1339                libtrace_packet_t * packet;
1340                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
1341                        trace_destroy_packet(packet);
1342                if (libtrace->hasher) {
1343                        assert(libtrace_ringbuffer_is_empty(&libtrace->perpkt_threads[i].rbuffer));
1344                        libtrace_ringbuffer_destroy(&libtrace->perpkt_threads[i].rbuffer);
1345                }
1346                // Cannot destroy vector yet, this happens with trace_destroy
1347        }
1348
1349        // Lets mark this as done for now
1350        libtrace->joined = true;
1351}
1352
1353// Don't use extra overhead = :( directly place in storage structure using
1354// post
1355DLLEXPORT libtrace_result_t *trace_create_result()
1356{
1357        libtrace_result_t *result = malloc(sizeof(libtrace_result_t));
1358        assert(result);
1359        result->key = 0;
1360        result->value = NULL;
1361        // TODO automatically back with a free list!!
1362        return result;
1363}
1364
1365DLLEXPORT int libtrace_thread_get_message_count(libtrace_t * libtrace)
1366{
1367        libtrace_thread_t * t = get_thread_descriptor(libtrace);
1368        assert(t);
1369        return libtrace_message_queue_count(&t->messages);
1370}
1371
1372DLLEXPORT int libtrace_thread_get_message(libtrace_t * libtrace, libtrace_message_t * message)
1373{
1374        libtrace_thread_t * t = get_thread_descriptor(libtrace);
1375        assert(t);
1376        return libtrace_message_queue_get(&t->messages, message);
1377}
1378
1379DLLEXPORT int libtrace_thread_try_get_message(libtrace_t * libtrace, libtrace_message_t * message)
1380{
1381        libtrace_thread_t * t = get_thread_descriptor(libtrace);
1382        assert(t);
1383        return libtrace_message_queue_try_get(&t->messages, message);
1384}
1385
1386/**
1387 * Return backlog indicator
1388 */
1389DLLEXPORT int trace_post_reduce(libtrace_t *libtrace)
1390{
1391        libtrace_message_t message = {0};
1392        message.code = MESSAGE_POST_REDUCE;
1393        message.additional = NULL;
1394        message.sender = get_thread_descriptor(libtrace);
1395        return libtrace_message_queue_put(&libtrace->reducer_thread.messages, (void *) &message);
1396}
1397
1398/**
1399 * Return backlog indicator
1400 */
1401DLLEXPORT int trace_send_message_to_reducer(libtrace_t * libtrace, libtrace_message_t * message)
1402{
1403        //printf("Sending message code=%d to reducer\n", message->code);
1404        message->sender = get_thread_descriptor(libtrace);
1405        return libtrace_message_queue_put(&libtrace->reducer_thread.messages, message);
1406}
1407
1408/**
1409 *
1410 */
1411DLLEXPORT int trace_send_message_to_thread(libtrace_t * libtrace, libtrace_thread_t *t, libtrace_message_t * message)
1412{
1413        //printf("Sending message code=%d to reducer\n", message->code);
1414        message->sender = get_thread_descriptor(libtrace);
1415        return libtrace_message_queue_put(&t->messages, message);
1416}
1417
1418DLLEXPORT void libtrace_result_set_key(libtrace_result_t * result, uint64_t key) {
1419        result->key = key;
1420}
1421DLLEXPORT uint64_t libtrace_result_get_key(libtrace_result_t * result) {
1422        return result->key;
1423}
1424DLLEXPORT void libtrace_result_set_value(libtrace_result_t * result, void * value) {
1425        result->value = value;
1426}
1427DLLEXPORT void* libtrace_result_get_value(libtrace_result_t * result) {
1428        return result->value;
1429}
1430DLLEXPORT void libtrace_result_set_key_value(libtrace_result_t * result, uint64_t key, void * value) {
1431        result->key = key;
1432        result->value = value;
1433}
1434DLLEXPORT void trace_destroy_result(libtrace_result_t ** result) {
1435        free(*result);
1436        result = NULL;
1437        // TODO automatically back with a free list!!
1438}
1439
1440DLLEXPORT void * trace_get_global(libtrace_t *trace)
1441{
1442        return trace->global_blob;
1443}
1444
1445DLLEXPORT void * trace_set_global(libtrace_t *trace, void * data)
1446{
1447        if (trace->global_blob && trace->global_blob != data) {
1448                void * ret = trace->global_blob;
1449                trace->global_blob = data;
1450                return ret;
1451        } else {
1452                trace->global_blob = data;
1453                return NULL;
1454        }
1455}
1456
1457DLLEXPORT void * trace_get_tls(libtrace_thread_t *t)
1458{
1459        return t->user_data;
1460}
1461
1462DLLEXPORT void * trace_set_tls(libtrace_thread_t *t, void * data)
1463{
1464        if(t->user_data && t->user_data != data) {
1465                void *ret = t->user_data;
1466                t->user_data = data;
1467                return ret;
1468        } else {
1469                t->user_data = data;
1470                return NULL;
1471        }
1472}
1473
1474/**
1475 * Note: This function grabs a lock and expects trace_update_inprogress_result
1476 * to be called to release the lock.
1477 *
1478 * Expected to be used in trace-time situations to allow a result to be pending
1479 * a publish that can be taken by the reducer before publish if it wants to
1480 * publish a result. Such as publish a result every second but a queue hasn't
1481 * processed a packet (or is overloaded) and hasn't published yet.
1482 *
1483 * Currently this only supports a single temporary result,
1484 * as such if a key is different to the current temporary result the existing
1485 * one will be published and NULL returned.
1486 */
1487DLLEXPORT void * trace_retrive_inprogress_result(libtrace_t *libtrace, uint64_t key)
1488{
1489        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
1490        libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
1491
1492        assert (pthread_spin_lock(&t->tmp_spinlock) == 0);
1493        if (t->tmp_key != key) {
1494                if (t->tmp_data) {
1495                        //printf("publising data key=%"PRIu64"\n", t->tmp_key);
1496                        trace_publish_result(libtrace, t->tmp_key, t->tmp_data);
1497                }
1498                t->tmp_data = NULL;
1499                t->tmp_key = key;
1500        }
1501        return t->tmp_data;
1502}
1503
1504/**
1505 * Updates a temporary result and releases the lock previously grabbed by trace_retrive_inprogress_result
1506 */
1507DLLEXPORT void trace_update_inprogress_result(libtrace_t *libtrace, uint64_t key, void * value)
1508{
1509        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
1510        libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
1511        if (t->tmp_key != key) {
1512                if (t->tmp_data) {
1513                        printf("BAD publihsing data key=%"PRIu64"\n", t->tmp_key);
1514                        trace_publish_result(libtrace, t->tmp_key, t->tmp_data);
1515                }
1516                t->tmp_key = key;
1517        }
1518        t->tmp_data = value;
1519        assert (pthread_spin_unlock(&t->tmp_spinlock) == 0);
1520}
1521
1522/**
1523 * Publish to the reduce queue, return
1524 */
1525DLLEXPORT void trace_publish_result(libtrace_t *libtrace, uint64_t key, void * value) {
1526        libtrace_result_t res;
1527        // Who am I???
1528        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
1529        libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
1530        // Now put it into my table
1531        static __thread int count = 0;
1532
1533
1534        libtrace_result_set_key_value(&res, key, value);
1535        /*
1536        if (count == 1)
1537                printf("My vector size is %d\n", libtrace_vector_get_size(&t->vector));
1538        count = (count+1) %1000;
1539        libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :)
1540        */
1541        /*if (count == 1)
1542                printf("My vector size is %d\n", libtrace_deque_get_size(&t->deque));
1543        count = (count+1)%1000;*/
1544        if (libtrace->reducer_flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED)) {
1545                if (libtrace_deque_get_size(&t->deque) >= 800) {
1546                        trace_post_reduce(libtrace);
1547                }
1548                //while (libtrace_deque_get_size(&t->deque) >= 1000)
1549                //      sched_yield();
1550                libtrace_deque_push_back(&t->deque, &res); // Automatically locking for us :)
1551        } else {
1552                //while (libtrace_vector_get_size(&t->vector) >= 1000)
1553                //      sched_yield();
1554
1555                if (libtrace_deque_get_size(&t->deque) >= 800) {
1556                        trace_post_reduce(libtrace);
1557                }
1558                libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :)
1559        }
1560}
1561
1562
1563static int compareres(const void* p1, const void* p2)
1564{
1565        if (libtrace_result_get_key((libtrace_result_t *) p1) < libtrace_result_get_key((libtrace_result_t *) p2))
1566                return -1;
1567        if (libtrace_result_get_key((libtrace_result_t *) p1) == libtrace_result_get_key((libtrace_result_t *) p2))
1568                return 0;
1569        else
1570                return 1;
1571}
1572
1573/* Retrieves all results with the key requested from the temporary result
1574 * holding zone.
1575 */
1576DLLEXPORT int trace_get_results_check_temp(libtrace_t *libtrace, libtrace_vector_t *results, uint64_t key)
1577{
1578        int i;
1579
1580        libtrace_vector_empty(results);
1581        // Check all of the temp queues
1582        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1583                libtrace_result_t r = {0,0};
1584                assert (pthread_spin_lock(&libtrace->perpkt_threads[i].tmp_spinlock) == 0);
1585                if (libtrace->perpkt_threads[i].tmp_key == key) {
1586                        libtrace_result_set_key_value(&r, key, libtrace->perpkt_threads[i].tmp_data);
1587                        libtrace->perpkt_threads[i].tmp_data = NULL;
1588                        printf("Found in temp queue\n");
1589                }
1590                assert (pthread_spin_unlock(&libtrace->perpkt_threads[i].tmp_spinlock) == 0);
1591                if (libtrace_result_get_value(&r)) {
1592                        // Got a result still in temporary
1593                        printf("Publishing from temp queue\n");
1594                        libtrace_vector_push_back(results, &r);
1595                } else {
1596                        // This might be waiting on the actual queue
1597                        libtrace_queue_t *v = &libtrace->perpkt_threads[i].deque;
1598                        if (libtrace_deque_peek_front(v, (void *) &r) &&
1599                                        libtrace_result_get_value(&r)) {
1600                                assert (libtrace_deque_pop_front(&libtrace->perpkt_threads[i].deque, (void *) &r) == 1);
1601                                printf("Found in real queue\n");
1602                                libtrace_vector_push_back(results, &r);
1603                        } // else no data (probably means no packets)
1604                        else {
1605                                printf("Result missing in real queue\n");
1606                        }
1607                }
1608        }
1609        //printf("Loop done yo, that means we've got #%d results to print fool!\n", libtrace_vector_get_size(results));
1610        return libtrace_vector_get_size(results);
1611}
1612
1613DLLEXPORT int trace_get_results(libtrace_t *libtrace, libtrace_vector_t * results) {
1614        int i;
1615        int flags = libtrace->reducer_flags; // Hint these aren't a changing
1616
1617        libtrace_vector_empty(results);
1618
1619        /* Here we assume queues are in order ascending order and they want
1620         * the smallest result first. If they are not in order the results
1621         * may not be in order.
1622         */
1623        if (flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED)) {
1624                int live_count = 0;
1625                bool live[libtrace->perpkt_thread_count]; // Set if a trace is alive
1626                uint64_t key[libtrace->perpkt_thread_count]; // Cached keys
1627                uint64_t min_key = UINT64_MAX; // XXX use max int here stdlimit.h?
1628                int min_queue = -1;
1629
1630                /* Loop through check all are alive (have data) and find the smallest */
1631                for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1632                        libtrace_queue_t *v = &libtrace->perpkt_threads[i].deque;
1633                        if (libtrace_deque_get_size(v) != 0) {
1634                                libtrace_result_t r;
1635                                libtrace_deque_peek_front(v, (void *) &r);
1636                                live_count++;
1637                                live[i] = 1;
1638                                key[i] = libtrace_result_get_key(&r);
1639                                if (i==0 || min_key > key[i]) {
1640                                        min_key = key[i];
1641                                        min_queue = i;
1642                                }
1643                        } else {
1644                                live[i] = 0;
1645                        }
1646                }
1647
1648                /* Now remove the smallest and loop - special case if all threads have joined we always flush whats left */
1649                while ((live_count == libtrace->perpkt_thread_count) || (live_count &&
1650                                ((flags & REDUCE_SEQUENTIAL && min_key == libtrace->expected_key) ||
1651                                libtrace->joined))) {
1652                        /* Get the minimum queue and then do stuff */
1653                        libtrace_result_t r;
1654
1655                        assert (libtrace_deque_pop_front(&libtrace->perpkt_threads[min_queue].deque, (void *) &r) == 1);
1656                        libtrace_vector_push_back(results, &r);
1657
1658                        // We expect the key we read +1 now
1659                        libtrace->expected_key = key[min_queue] + 1;
1660
1661                        // Now update the one we just removed
1662                        if (libtrace_deque_get_size(&libtrace->perpkt_threads[min_queue].deque) )
1663                        {
1664                                libtrace_deque_peek_front(&libtrace->perpkt_threads[min_queue].deque, (void *) &r);
1665                                key[min_queue] = libtrace_result_get_key(&r);
1666                                if (key[min_queue] <= min_key) {
1667                                        // We are still the smallest, might be out of order though :(
1668                                        min_key = key[min_queue];
1669                                } else {
1670                                        min_key = key[min_queue]; // Update our minimum
1671                                        // Check all find the smallest again - all are alive
1672                                        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1673                                                if (live[i] && min_key > key[i]) {
1674                                                        min_key = key[i];
1675                                                        min_queue = i;
1676                                                }
1677                                        }
1678                                }
1679                        } else {
1680                                live[min_queue] = 0;
1681                                live_count--;
1682                                min_key = UINT64_MAX; // Update our minimum
1683                                // Check all find the smallest again - all are alive
1684                                for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1685                                        // Still not 100% TODO (what if order is wrong or not increasing)
1686                                        if (live[i] && min_key >= key[i]) {
1687                                                min_key = key[i];
1688                                                min_queue = i;
1689                                        }
1690                                }
1691                        }
1692                }
1693        } else { // Queues are not in order - return all results in the queue
1694                for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1695                        libtrace_vector_append(results, &libtrace->perpkt_threads[i].vector);
1696                }
1697                if (flags & REDUCE_SORT) {
1698                        qsort(results->elements, results->size, results->element_size, &compareres);
1699                }
1700        }
1701        return libtrace_vector_get_size(results);
1702}
1703
1704DLLEXPORT uint64_t trace_packet_get_order(libtrace_packet_t * packet) {
1705        return packet->order;
1706}
1707
1708DLLEXPORT uint64_t trace_packet_get_hash(libtrace_packet_t * packet) {
1709        return packet->hash;
1710}
1711
1712DLLEXPORT void trace_packet_set_order(libtrace_packet_t * packet, uint64_t order) {
1713        packet->order = order;
1714}
1715
1716DLLEXPORT void trace_packet_set_hash(libtrace_packet_t * packet, uint64_t hash) {
1717        packet->hash = hash;
1718}
1719
1720DLLEXPORT int trace_finished(libtrace_t * libtrace) {
1721        int i;
1722        int b = 0;
1723        // TODO I don't like using this so much
1724        //assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1725        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1726                if (libtrace->perpkt_threads[i].state == THREAD_RUNNING)
1727                        b++;
1728        }
1729        //assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1730        return !b;
1731}
1732
1733DLLEXPORT int trace_parallel_config(libtrace_t *libtrace, trace_parallel_option_t option, void *value)
1734{
1735        int ret = -1;
1736        switch (option) {
1737                case TRACE_OPTION_SET_HASHER:
1738                        return trace_set_hasher(libtrace, (enum hasher_types) *((int *) value), NULL, NULL);
1739                case TRACE_OPTION_SET_PERPKT_BUFFER_SIZE:
1740                        libtrace->perpkt_buffer_size = *((int *) value);
1741                        return 1;
1742                case TRACE_OPTION_SET_PACKET_FREELIST_SIZE:
1743                        libtrace->packet_freelist_size = *((int *) value);
1744                        return 1;
1745                case TRACE_OPTION_SET_PERPKT_THREAD_COUNT:
1746                        libtrace->perpkt_thread_count = *((int *) value);
1747                        return 1;
1748                case TRACE_DROP_OUT_OF_ORDER:
1749                        if (*((int *) value))
1750                                libtrace->reducer_flags |= REDUCE_DROP_OOO;
1751                        else
1752                                libtrace->reducer_flags &= ~REDUCE_DROP_OOO;
1753                        return 1;
1754                case TRACE_OPTION_SEQUENTIAL:
1755                        if (*((int *) value))
1756                                libtrace->reducer_flags |= REDUCE_SEQUENTIAL;
1757                        else
1758                                libtrace->reducer_flags &= ~REDUCE_SEQUENTIAL;
1759                        return 1;
1760                case TRACE_OPTION_ORDERED:
1761                        if (*((int *) value))
1762                                libtrace->reducer_flags |= REDUCE_ORDERED;
1763                        else
1764                                libtrace->reducer_flags &= ~REDUCE_ORDERED;
1765                        return 1;
1766                case TRACE_OPTION_USE_DEDICATED_HASHER:
1767                        if (*((int *) value))
1768                                libtrace->hasher_thread.type = THREAD_HASHER;
1769                        else
1770                                libtrace->hasher_thread.type = THREAD_EMPTY;
1771                        return 1;
1772                case TRACE_OPTION_USE_SLIDING_WINDOW_BUFFER:
1773                        if (*((int *) value))
1774                                libtrace->reducer_flags |= PERPKT_USE_SLIDING_WINDOW;
1775                        else
1776                                libtrace->reducer_flags &= ~PERPKT_USE_SLIDING_WINDOW;
1777                        return 1;
1778                case TRACE_OPTION_TRACETIME:
1779                        if(*((int *) value))
1780                                libtrace->tracetime = 1;
1781                        else
1782                                libtrace->tracetime = 0;
1783                        return 0;
1784        }
1785        return 0;
1786}
1787
1788DLLEXPORT libtrace_packet_t* trace_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet) {
1789        libtrace_packet_t* result;
1790        if (!libtrace_ringbuffer_try_sread_bl(&libtrace->packet_freelist, (void **) &result))
1791                result = trace_create_packet();
1792        assert(result);
1793        swap_packets(result, packet); // Move the current packet into our copy
1794        return result;
1795}
1796
1797DLLEXPORT void trace_free_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet) {
1798        // Try write back the packet
1799        assert(packet);
1800        // Always release any resources this might be holding such as a slot in a ringbuffer
1801        trace_fin_packet(packet);
1802        if (!libtrace_ringbuffer_try_swrite_bl(&libtrace->packet_freelist, packet)) {
1803                /* We couldn't, oh well lets just destroy it - XXX consider non managed formats i.e. rings buffers loosing packets and jamming up :( */
1804                //assert(1 == 90);
1805                trace_destroy_packet(packet);
1806        }
1807}
Note: See TracBrowser for help on using the repository browser.