source: lib/trace_parallel.c @ 049a700

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 049a700 was 049a700, checked in by Richard Sanger <rsangerarj@…>, 7 years ago

Fix bug checking for registerthread rather than unregisterthread.

  • Property mode set to 100644
File size: 67.5 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * All rights reserved.
8 *
9 * This code has been developed by the University of Waikato WAND
10 * research group. For further information please see http://www.wand.net.nz/
11 *
12 * libtrace is free software; you can redistribute it and/or modify
13 * it under the terms of the GNU General Public License as published by
14 * the Free Software Foundation; either version 2 of the License, or
15 * (at your option) any later version.
16 *
17 * libtrace is distributed in the hope that it will be useful,
18 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 * GNU General Public License for more details.
21 *
22 * You should have received a copy of the GNU General Public License
23 * along with libtrace; if not, write to the Free Software
24 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25 *
26 * $Id$
27 *
28 */
29
30
31#define _GNU_SOURCE
32#include "common.h"
33#include "config.h"
34#include <assert.h>
35#include <errno.h>
36#include <fcntl.h>
37#include <stdio.h>
38#include <stdlib.h>
39#include <string.h>
40#include <sys/stat.h>
41#include <sys/types.h>
42#ifndef WIN32
43#include <sys/socket.h>
44#endif
45#include <stdarg.h>
46#include <sys/param.h>
47
48#ifdef HAVE_LIMITS_H
49#  include <limits.h>
50#endif
51
52#ifdef HAVE_SYS_LIMITS_H
53#  include <sys/limits.h>
54#endif
55
56#ifdef HAVE_NET_IF_ARP_H
57#  include <net/if_arp.h>
58#endif
59
60#ifdef HAVE_NET_IF_H
61#  include <net/if.h>
62#endif
63
64#ifdef HAVE_NETINET_IN_H
65#  include <netinet/in.h>
66#endif
67
68#ifdef HAVE_NET_ETHERNET_H
69#  include <net/ethernet.h>
70#endif
71
72#ifdef HAVE_NETINET_IF_ETHER_H
73#  include <netinet/if_ether.h>
74#endif
75
76#include <time.h>
77#ifdef WIN32
78#include <sys/timeb.h>
79#endif
80
81#include "libtrace.h"
82#include "libtrace_int.h"
83
84#ifdef HAVE_PCAP_BPF_H
85#  include <pcap-bpf.h>
86#else
87#  ifdef HAVE_NET_BPF_H
88#    include <net/bpf.h>
89#  endif
90#endif
91
92
93#include "libtrace_int.h"
94#include "format_helper.h"
95#include "rt_protocol.h"
96#include "hash_toeplitz.h"
97
98#include <pthread.h>
99#include <signal.h>
100#include <unistd.h>
101
102
103#define VERBOSE_DEBBUGING 0
104
105extern int libtrace_parallel;
106
107struct multithreading_stats {
108        uint64_t full_queue_hits;
109        uint64_t wait_for_fill_complete_hits;
110} contention_stats[1024];
111
112/**
113 * True if the trace has dedicated hasher thread otherwise false,
114 * to be used after the trace is running
115 */
116static inline int trace_has_dedicated_hasher(libtrace_t * libtrace)
117{
118        assert(libtrace->state != STATE_NEW);
119        return libtrace->hasher_thread.type == THREAD_HASHER;
120}
121
122/**
123 * Changes a thread's state and broadcasts the condition variable. This
124 * should always be done when the lock is held.
125 *
126 * Additionally for perpkt threads the state counts are updated.
127 *
128 * @param trace A pointer to the trace
129 * @param t A pointer to the thread to modify
130 * @param new_state The new state of the thread
131 * @param need_lock Set to true if libtrace_lock is not held, otherwise
132 *        false in the case the lock is currently held by this thread.
133 */
134static inline void thread_change_state(libtrace_t *trace, libtrace_thread_t *t,
135        const enum thread_states new_state, const bool need_lock)
136{
137        enum thread_states prev_state;
138        if (need_lock)
139                pthread_mutex_lock(&trace->libtrace_lock);
140        prev_state = t->state;
141        t->state = new_state;
142        if (t->type == THREAD_PERPKT) {
143                --trace->perpkt_thread_states[prev_state];
144                ++trace->perpkt_thread_states[new_state];
145        }
146
147#if VERBOSE_DEBBUGING
148        fprintf(stderr, "Thread %d State changed from %d to %d\n", t->tid,
149                t->state, prev_state);
150#endif
151        if (need_lock)
152                pthread_mutex_unlock(&trace->libtrace_lock);
153        pthread_cond_broadcast(&trace->perpkt_cond);
154}
155
156/**
157 * Changes the overall traces state and signals the condition.
158 *
159 * @param trace A pointer to the trace
160 * @param new_state The new state of the trace
161 * @param need_lock Set to true if libtrace_lock is not held, otherwise
162 *        false in the case the lock is currently held by this thread.
163 */
164static inline void libtrace_change_state(libtrace_t *trace, 
165        const enum trace_state new_state, const bool need_lock)
166{
167        enum trace_state prev_state;
168        if (need_lock)
169                pthread_mutex_lock(&trace->libtrace_lock);
170        prev_state = trace->state;
171        trace->state = new_state;
172#if VERBOSE_DEBBUGING
173        fprintf(stderr, "Trace(%s) state changed from %s to %s\n",
174                trace->uridata, get_trace_state_name(trace->state),
175                get_trace_state_name(prev_state));
176#endif
177        if (need_lock)
178                pthread_mutex_unlock(&trace->libtrace_lock);
179        pthread_cond_broadcast(&trace->perpkt_cond);
180}
181
182/**
183 * @return True if the format supports parallel threads.
184 */
185static inline bool trace_supports_parallel(libtrace_t *trace)
186{
187        assert(trace);
188        assert(trace->format);
189        if (trace->format->pstart_input)
190                return true;
191        else
192                return false;
193}
194
195DLLEXPORT void print_contention_stats(libtrace_t *libtrace) {
196        int i;
197        struct multithreading_stats totals = {0};
198        for (i = 0; i < libtrace->perpkt_thread_count ; i++) {
199                fprintf(stderr, "\nStats for perpkt thread#%d\n", i);
200                fprintf(stderr, "\tfull_queue_hits: %"PRIu64"\n", contention_stats[i].full_queue_hits);
201                totals.full_queue_hits += contention_stats[i].full_queue_hits;
202                fprintf(stderr, "\twait_for_fill_complete_hits: %"PRIu64"\n", contention_stats[i].wait_for_fill_complete_hits);
203                totals.wait_for_fill_complete_hits += contention_stats[i].wait_for_fill_complete_hits;
204        }
205        fprintf(stderr, "\nTotals for perpkt threads\n");
206        fprintf(stderr, "\tfull_queue_hits: %"PRIu64"\n", totals.full_queue_hits);
207        fprintf(stderr, "\twait_for_fill_complete_hits: %"PRIu64"\n", totals.wait_for_fill_complete_hits);
208
209        return;
210}
211
212void libtrace_zero_thread(libtrace_thread_t * t) {
213        t->trace = NULL;
214        t->ret = NULL;
215        t->type = THREAD_EMPTY;
216        libtrace_zero_ringbuffer(&t->rbuffer);
217        libtrace_zero_vector(&t->vector);
218        libtrace_zero_deque(&t->deque);
219        t->recorded_first = false;
220        t->perpkt_num = -1;
221}
222
223// Ints are aligned int is atomic so safe to read and write at same time
224// However write must be locked, read doesn't (We never try read before written to table)
225libtrace_thread_t * get_thread_table(libtrace_t *libtrace) {
226        int i = 0;
227        pthread_t tid = pthread_self();
228
229        for (;i<libtrace->perpkt_thread_count ;++i) {
230                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
231                        return &libtrace->perpkt_threads[i];
232        }
233        return NULL;
234}
235
236int get_thread_table_num(libtrace_t *libtrace) {
237        int i = 0;
238        pthread_t tid = pthread_self();
239        for (;i<libtrace->perpkt_thread_count; ++i) {
240                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
241                        return i;
242        }
243        return -1;
244}
245
246static libtrace_thread_t * get_thread_descriptor(libtrace_t *libtrace) {
247        libtrace_thread_t *ret;
248        if (!(ret = get_thread_table(libtrace))) {
249                pthread_t tid = pthread_self();
250                // Check if we are reducer or something else
251                if (pthread_equal(tid, libtrace->reducer_thread.tid))
252                        ret = &libtrace->reducer_thread;
253                else if (pthread_equal(tid, libtrace->hasher_thread.tid))
254                        ret = &libtrace->hasher_thread;
255                else
256                        ret = NULL;
257        }
258        return ret;
259}
260
261/** Used below in trace_make_results_packets_safe*/
262static void do_copy_result_packet(void *data)
263{
264        libtrace_result_t *res = (libtrace_result_t *)data;
265        if (res->is_packet) {
266                // Duplicate the packet in standard malloc'd memory and free the
267                // original
268                libtrace_packet_t *oldpkt, *dup;
269                oldpkt = (libtrace_packet_t *) res->value;
270                dup = trace_copy_packet(oldpkt);
271                res->value = (void *)dup;
272                trace_destroy_packet(oldpkt);
273                fprintf(stderr, "Made a packet safe!!\n");
274        }
275}
276
277/**
278 * Make a safe replacement copy of any result packets that are owned
279 * by the format in the result queue. Used when pausing traces.
280 */ 
281static void trace_make_results_packets_safe(libtrace_t *trace) {
282        libtrace_thread_t *t = get_thread_descriptor(trace);
283        if (trace->reducer_flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED))
284                libtrace_deque_apply_function(&t->deque, &do_copy_result_packet);
285        else 
286                libtrace_vector_apply_function(&t->vector, &do_copy_result_packet);
287}
288
289/**
290 * Holds threads in a paused state, until released by broadcasting
291 * the condition mutex.
292 */
293static void trace_thread_pause(libtrace_t *trace, libtrace_thread_t *t) {
294        trace_make_results_packets_safe(trace);
295        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
296        thread_change_state(trace, t, THREAD_PAUSED, false);
297        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
298                assert(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock) == 0);
299        }
300        thread_change_state(trace, t, THREAD_RUNNING, false);
301        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
302}
303
304/**
305 * The is the entry point for our packet processing threads.
306 */
307static void* perpkt_threads_entry(void *data) {
308        libtrace_t *trace = (libtrace_t *)data;
309        libtrace_thread_t * t;
310        libtrace_message_t message = {0};
311        libtrace_packet_t *packet = NULL;
312
313
314        // Force this thread to wait until trace_pstart has been completed
315        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
316        t = get_thread_table(trace);
317        assert(t);
318        //printf("Yay Started perpkt thread #%d\n", (int) get_thread_table_num(trace));
319        if (trace->format->pregister_thread) {
320                trace->format->pregister_thread(trace, t, !trace_has_dedicated_hasher(trace));
321        }
322        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
323
324        /* ~~~~~~~~~~~ Setup complete now we loop ~~~~~~~~~~~~~~~ */
325        // Send a message to say we've started
326
327        message.code = MESSAGE_STARTED;
328        message.sender = t;
329
330        // Let the per_packet function know we have started
331        (*trace->per_pkt)(trace, NULL, &message, t);
332
333
334        for (;;) {
335                int psize;
336
337                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
338                        switch (message.code) {
339                                case MESSAGE_DO_PAUSE: // This is internal
340                                        // Send message to say we are pausing, TODO consider sender
341                                        message.code = MESSAGE_PAUSING;
342                                        (*trace->per_pkt)(trace, NULL, &message, t);
343                                        // If a hasher thread is running empty input queues so we don't loose data
344                                        if (trace_has_dedicated_hasher(trace)) {
345                                                fprintf(stderr, "Trace is using a hasher thread emptying queues\n");
346                                                // The hasher has stopped by this point, so the queue shouldn't be filling
347                                                while(!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
348                                                        psize = trace_pread_packet(trace, t, &packet);
349                                                        if (psize > 0) {
350                                                                packet = (*trace->per_pkt)(trace, packet, NULL, t);
351                                                        } else {
352                                                                fprintf(stderr, "Psize=%d empty=%d this is probably EOF or message waiting, but if this continues we have a bug!!!\n", psize, libtrace_ringbuffer_is_empty(&t->rbuffer));
353                                                        }
354                                                }
355                                        }
356                                        // Send a paused message as a final chance to memory copy any packets
357                                        message.code = MESSAGE_PAUSED;
358                                        (*trace->per_pkt)(trace, NULL, &message, t);
359                                        // Now we do the actual pause, this returns when we are done
360                                        trace_thread_pause(trace, t);
361                                        // Check for new messages as soon as we return
362                                        continue;
363                                case MESSAGE_DO_STOP: // This is internal
364                                        goto stop;
365                        }
366                        (*trace->per_pkt)(trace, NULL, &message, t);
367                        continue;
368                }
369
370                if (trace->perpkt_thread_count == 1) {
371                        if (!packet) {
372                                if (!libtrace_ringbuffer_try_sread_bl(&trace->packet_freelist, (void **) &packet))
373                                        packet = trace_create_packet();
374                        }
375                        assert(packet);
376                        if ((psize = trace_read_packet(trace, packet)) <1) {
377                                break;
378                        }
379                } else {
380                        psize = trace_pread_packet(trace, t, &packet);
381                }
382
383                if (psize > 0) {
384                        packet = (*trace->per_pkt)(trace, packet, NULL, t);
385                        continue;
386                }
387
388                if (psize == -2)
389                        continue; // We have a message
390
391                if (psize < 1) { // consider sending a message
392                        break;
393                }
394
395        }
396
397
398stop:
399        /* ~~~~~~~~~~~~~~ Trace is finished do tear down ~~~~~~~~~~~~~~~~~~~~~ */
400        // Let the per_packet function know we have stopped
401        message.code = MESSAGE_STOPPED;
402        message.sender = NULL;
403        message.additional.uint64 = 0;
404        (*trace->per_pkt)(trace, NULL, &message, t);
405
406        // Free our last packet
407        if (packet)
408                trace_destroy_packet(packet);
409
410       
411        thread_change_state(trace, t, THREAD_FINISHED, true);
412
413        // Notify only after we've defiantly set the state to finished
414        message.code = MESSAGE_PERPKT_ENDED;
415        message.additional.uint64 = 0;
416        trace_send_message_to_reducer(trace, &message);
417
418        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
419        if (trace->format->punregister_thread) {
420                trace->format->punregister_thread(trace, t);
421        }
422        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
423
424        pthread_exit(NULL);
425};
426
427/**
428 * The start point for our single threaded hasher thread, this will read
429 * and hash a packet from a data source and queue it against the correct
430 * core to process it.
431 */
432static void* hasher_start(void *data) {
433        libtrace_t *trace = (libtrace_t *)data;
434        libtrace_thread_t * t;
435        int i;
436        libtrace_packet_t * packet;
437        libtrace_message_t message = {0};
438
439        assert(trace_has_dedicated_hasher(trace));
440        /* Wait until all threads are started and objects are initialised (ring buffers) */
441        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
442        t = &trace->hasher_thread;
443        assert(t->type == THREAD_HASHER && pthread_equal(pthread_self(), t->tid));
444        printf("Hasher Thread started\n");
445        if (trace->format->pregister_thread) {
446                trace->format->pregister_thread(trace, t, true);
447        }
448        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
449        int pkt_skipped = 0;
450        /* Read all packets in then hash and queue against the correct thread */
451        while (1) {
452                int thread;
453                if (!pkt_skipped && !libtrace_ringbuffer_try_sread_bl(&trace->packet_freelist, (void **) &packet))
454                        packet = trace_create_packet();
455                assert(packet);
456
457                if (libtrace_halt) // Signal to die has been sent - TODO
458                        break;
459
460                // Check for messages that we expect MESSAGE_DO_PAUSE, (internal messages only)
461                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
462                        switch(message.code) {
463                                case MESSAGE_DO_PAUSE:
464                                        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
465                                        thread_change_state(trace, t, THREAD_PAUSED, false);
466                                        pthread_cond_broadcast(&trace->perpkt_cond);
467                                        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
468                                                assert(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock) == 0);
469                                        }
470                                        thread_change_state(trace, t, THREAD_RUNNING, false);
471                                        pthread_cond_broadcast(&trace->perpkt_cond);
472                                        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
473                                        break;
474                                case MESSAGE_DO_STOP:
475                                        // Stop called after pause
476                                        assert(trace->started == false);
477                                        assert(trace->state == STATE_FINSHED);
478                                default:
479                                        fprintf(stderr, "Hasher thread didn't expect message code=%d\n", message.code);
480                        }
481                        pkt_skipped = 1;
482                        continue;
483                }
484
485                if ((packet->error = trace_read_packet(trace, packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
486                        break; /* We are EOF or error'd either way we stop  */
487                }
488
489                /* We are guaranteed to have a hash function i.e. != NULL */
490                trace_packet_set_hash(packet, (*trace->hasher)(packet, trace->hasher_data));
491                thread = trace_packet_get_hash(packet) % trace->perpkt_thread_count;
492                /* Blocking write to the correct queue - I'm the only writer */
493                if (trace->perpkt_threads[thread].state != THREAD_FINISHED) {
494                        libtrace_ringbuffer_write(&trace->perpkt_threads[thread].rbuffer, packet);
495                        pkt_skipped = 0;
496                } else {
497                        pkt_skipped = 1; // Reuse that packet no one read it
498                }
499        }
500
501        /* Broadcast our last failed read to all threads */
502        for (i = 0; i < trace->perpkt_thread_count; i++) {
503                libtrace_packet_t * bcast;
504                printf("Broadcasting error/EOF now the trace is over\n");
505                if (i == trace->perpkt_thread_count - 1) {
506                        bcast = packet;
507                } else {
508                        bcast = trace_create_packet();
509                        bcast->error = packet->error;
510                }
511                assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
512                if (trace->perpkt_threads[i].state != THREAD_FINISHED) {
513                        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
514                        // Unlock early otherwise we could deadlock
515                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, bcast);
516                } else {
517                        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
518                }
519        }
520
521        // We don't need to free the packet
522        thread_change_state(trace, t, THREAD_FINISHED, true);
523
524        // Notify only after we've defiantly set the state to finished
525        message.code = MESSAGE_PERPKT_ENDED;
526        message.additional.uint64 = 0;
527        trace_send_message_to_reducer(trace, &message);
528        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
529        if (trace->format->punregister_thread) {
530                trace->format->punregister_thread(trace, t);
531        }
532        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
533
534        // TODO remove from TTABLE t sometime
535        pthread_exit(NULL);
536};
537
538/**
539 * Moves src into dest(Complete copy) and copies the memory buffer and
540 * its flags from dest into src ready for reuse without needing extra mallocs.
541 */
542static inline void swap_packets(libtrace_packet_t *dest, libtrace_packet_t *src) {
543        // Save the passed in buffer status
544        assert(dest->trace == NULL); // Must be a empty packet
545        void * temp_buf = dest->buffer;
546        buf_control_t temp_buf_control = dest->buf_control;
547        // Completely copy StoredPacket into packet
548        memcpy(dest, src, sizeof(libtrace_packet_t));
549        // Set the buffer settings on the returned packet
550        src->buffer = temp_buf;
551        src->buf_control = temp_buf_control;
552        src->trace = NULL;
553}
554
555/* Our simplest case when a thread becomes ready it can obtain an exclusive
556 * lock to read a packet from the underlying trace.
557 */
558inline static int trace_pread_packet_first_in_first_served(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packet)
559{
560        // We need this to fill the 'first' packet table
561        if (!*packet) {
562                if (!libtrace_ringbuffer_try_sread_bl(&libtrace->packet_freelist, (void **) packet))
563                        *packet = trace_create_packet();
564        }
565        assert(*packet);
566        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
567        /* Read a packet */
568        (*packet)->error = trace_read_packet(libtrace, *packet);
569        // Doing this inside the lock ensures the first packet is always
570        // recorded first
571        if ((*packet)->error > 0)
572                store_first_packet(libtrace, *packet, t);
573
574        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
575        return (*packet)->error;
576}
577
578/**
579 * For the case that we have a dedicated hasher thread
580 * 1. We read a packet from our buffer
581 * 2. Move that into the packet provided (packet)
582 */
583inline static int trace_pread_packet_hasher_thread(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packet)
584{
585        if (*packet) // Recycle the old get the new
586                if (!libtrace_ringbuffer_try_swrite_bl(&libtrace->packet_freelist, (void *) *packet))
587                        trace_destroy_packet(*packet);
588        *packet = libtrace_ringbuffer_read(&t->rbuffer);
589
590        if (*packet) {
591                return (*packet)->error;
592        } else {
593                // This is how we do a notify, we send a message before hand to note that the trace is over etc.
594                // And this will notify the perpkt thread to read that message, this is easiest
595                // since cases like pause can also be dealt with this way without actually
596                // having to be the end of the stream.
597                fprintf(stderr, "Got a NULL packet from hasher therefore message waiting\n");
598                return -2;
599        }
600}
601
602/**
603 * Tries to read from our queue and returns 1 if a packet was retrieved
604 */
605static inline int try_waiting_queue(libtrace_t *libtrace, libtrace_thread_t * t, libtrace_packet_t **packet, int * ret)
606{
607        libtrace_packet_t* retrived_packet;
608
609        /* Lets see if we have one waiting */
610        if (libtrace_ringbuffer_try_read(&t->rbuffer, (void **) &retrived_packet)) {
611                /* Copy paste from trace_pread_packet_hasher_thread() except that we try read (non-blocking) */
612                assert(retrived_packet);
613
614                if (*packet) // Recycle the old get the new
615                        if (!libtrace_ringbuffer_try_swrite_bl(&libtrace->packet_freelist, (void *) *packet))
616                                trace_destroy_packet(*packet);
617                *packet = retrived_packet;
618                *ret = (*packet)->error;
619                return 1;
620        }
621        return 0;
622}
623
624/**
625 * Allows us to ensure all threads are finished writing to our threads ring_buffer
626 * before returning EOF/error.
627 */
628inline static int trace_handle_finishing_perpkt(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t)
629{
630        /* We are waiting for the condition that another thread ends to check
631         * our queue for new data, once all threads end we can go to finished */
632        bool complete = false;
633        int ret;
634
635        do {
636                // Wait for a thread to end
637                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
638
639                // Check before
640                if (libtrace->perpkt_thread_states[THREAD_FINISHING] == libtrace->perpkt_thread_count) {
641                        complete = true;
642                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
643                        continue;
644                }
645
646                assert(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock) == 0);
647
648                // Check after
649                if (libtrace->perpkt_thread_states[THREAD_FINISHING] == libtrace->perpkt_thread_count) {
650                        complete = true;
651                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
652                        continue;
653                }
654
655                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
656
657                // Always trying to keep our buffer empty for the unlikely case more threads than buffer space want to write into our queue
658                if(try_waiting_queue(libtrace, t, packet, &ret))
659                        return ret;
660        } while (!complete);
661
662        // We can only end up here once all threads complete
663        try_waiting_queue(libtrace, t, packet, &ret);
664
665        return ret;
666        // TODO rethink this logic fix bug here
667}
668
669/**
670 * Expects the libtrace_lock to not be held
671 */
672inline static int trace_finish_perpkt(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t)
673{
674        thread_change_state(libtrace, t, THREAD_FINISHING, true);
675        return trace_handle_finishing_perpkt(libtrace, packet, t);
676}
677
678/**
679 * This case is much like the dedicated hasher, except that we will become
680 * hasher if we don't have a a packet waiting.
681 *
682 * Note: This is only every used if we have are doing hashing.
683 *
684 * TODO: Can block on zero copy formats such as ring: and dpdk: if the
685 * queue sizes in total are larger than the ring size.
686 *
687 * 1. We read a packet from our buffer
688 * 2. Move that into the packet provided (packet)
689 */
690inline static int trace_pread_packet_hash_locked(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packet)
691{
692        int thread, ret/*, psize*/;
693
694        while (1) {
695                if(try_waiting_queue(libtrace, t, packet, &ret))
696                        return ret;
697                // Can still block here if another thread is writing to a full queue
698                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
699
700                // Its impossible for our own queue to overfill, because no one can write
701                // when we are in the lock
702                if(try_waiting_queue(libtrace, t, packet, &ret)) {
703                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
704                        return ret;
705                }
706
707                // Another thread cannot write a packet because a queue has filled up. Is it ours?
708                if (libtrace->perpkt_queue_full) {
709                        contention_stats[t->perpkt_num].wait_for_fill_complete_hits++;
710                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
711                        continue;
712                }
713
714                if (!*packet) {
715                        if (!libtrace_ringbuffer_try_sread_bl(&libtrace->packet_freelist, (void **) packet))
716                                *packet = trace_create_packet();
717                }
718                assert(*packet);
719
720                // If we fail here we can guarantee that our queue is empty (and no new data will be added because we hold the lock)
721                if (libtrace_halt || ((*packet)->error = trace_read_packet(libtrace, *packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
722                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
723                        if (libtrace_halt)
724                                return 0;
725                        else
726                                return (*packet)->error;
727                }
728
729                trace_packet_set_hash(*packet, (*libtrace->hasher)(*packet, libtrace->hasher_data));
730                thread = trace_packet_get_hash(*packet) % libtrace->perpkt_thread_count;
731                if (thread == t->perpkt_num) {
732                        // If it's this thread we must be in order because we checked the buffer once we got the lock
733                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
734                        return (*packet)->error;
735                }
736
737                if (libtrace->perpkt_threads[thread].state != THREAD_FINISHED) {
738                        while (!libtrace_ringbuffer_try_swrite_bl(&libtrace->perpkt_threads[thread].rbuffer, *packet)) {
739                                libtrace->perpkt_queue_full = true;
740                                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
741                                contention_stats[t->perpkt_num].full_queue_hits++;
742                                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
743                        }
744                        *packet = NULL;
745                        libtrace->perpkt_queue_full = false;
746                } else {
747                        /* We can get here if the user closes the thread before natural completion/or error */
748                        assert (!"packet_hash_locked() The user terminated the trace in a abnormal manner");
749                }
750                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
751        }
752}
753
754/**
755 * This case is much like the dedicated hasher, except that we will become
756 * hasher if we don't have a packet waiting.
757 *
758 * TODO: You can lose the tail of a trace if the final thread
759 * fills its own queue and therefore breaks early and doesn't empty the sliding window.
760 *
761 * TODO: Can block on zero copy formats such as ring: and dpdk: if the
762 * queue sizes in total are larger than the ring size.
763 *
764 * 1. We read a packet from our buffer
765 * 2. Move that into the packet provided (packet)
766 */
767inline static int trace_pread_packet_sliding_window(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packet)
768{
769        int ret, i, thread/*, psize*/;
770
771        if (t->state == THREAD_FINISHING)
772                return trace_handle_finishing_perpkt(libtrace, packet, t);
773
774        while (1) {
775                // Check if we have packets ready
776                if(try_waiting_queue(libtrace, t, packet, &ret))
777                        return ret;
778
779                // We limit the number of packets we get to the size of the sliding window
780                // such that it is impossible for any given thread to fail to store a packet
781                assert(sem_wait(&libtrace->sem) == 0);
782                /*~~~~Single threaded read of a packet~~~~*/
783                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
784
785                /* Re-check our queue things we might have data waiting */
786                if(try_waiting_queue(libtrace, t, packet, &ret)) {
787                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
788                        assert(sem_post(&libtrace->sem) == 0);
789                        return ret;
790                }
791
792                // TODO put on *proper* condition variable
793                if (libtrace->perpkt_queue_full) {
794                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
795                        assert(sem_post(&libtrace->sem) == 0);
796                        contention_stats[t->perpkt_num].wait_for_fill_complete_hits++;
797                        continue;
798                }
799
800                if (!*packet) {
801                        if (!libtrace_ringbuffer_try_sread_bl(&libtrace->packet_freelist, (void **) packet))
802                                *packet = trace_create_packet();
803                }
804                assert(*packet);
805
806                if (libtrace_halt || ((*packet)->error = trace_read_packet(libtrace, *packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
807                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
808                        assert(sem_post(&libtrace->sem) == 0);
809                        // Finish this thread ensuring that any data written later by another thread is retrieved also
810                        if (libtrace_halt)
811                                return 0;
812                        else
813                                return trace_finish_perpkt(libtrace, packet, t);
814                }
815                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
816
817                /* ~~~~Multiple threads can run the hasher~~~~ */
818                trace_packet_set_hash(*packet, (*libtrace->hasher)(*packet, libtrace->hasher_data));
819
820                /* Yes this is correct opposite read lock for a write operation */
821                assert(pthread_rwlock_rdlock(&libtrace->window_lock) == 0);
822                if (!libtrace_slidingwindow_try_write(&libtrace->sliding_window, trace_packet_get_order(*packet), *packet))
823                        assert(!"Semaphore should stop us from ever overfilling the sliding window");
824                assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
825                *packet = NULL;
826
827                // Always try read any data from the sliding window
828                while (libtrace_slidingwindow_read_ready(&libtrace->sliding_window)) {
829                        assert(pthread_rwlock_wrlock(&libtrace->window_lock) == 0);
830                        if (libtrace->perpkt_queue_full) {
831                                // I might be the holdup in which case if I can read my queue I should do that and return
832                                if(try_waiting_queue(libtrace, t, packet, &ret)) {
833                                        assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
834                                        return ret;
835                                }
836                                assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
837                                continue;
838                        }
839                        // Read greedily as many as we can
840                        while (libtrace_slidingwindow_try_read(&libtrace->sliding_window, (void **) packet, NULL)) {
841                                thread = trace_packet_get_hash(*packet) % libtrace->perpkt_thread_count;
842                                if (libtrace->perpkt_threads[thread].state != THREAD_FINISHED) {
843                                        while (!libtrace_ringbuffer_try_swrite_bl(&libtrace->perpkt_threads[thread].rbuffer, *packet)) {
844                                                if (t->perpkt_num == thread)
845                                                {
846                                                        // TODO think about this case more because we have to stop early if this were to happen on the last read
847                                                        // before EOF/error we might not have emptied the sliding window
848                                                        printf("!~!~!~!~!~!~In this Code~!~!~!~!\n");
849                                                        // Its our queue we must have a packet to read out
850                                                        if(try_waiting_queue(libtrace, t, packet, &ret)) {
851                                                                // We must be able to write this now 100% without fail
852                                                                libtrace_ringbuffer_write(&libtrace->perpkt_threads[thread].rbuffer, *packet);
853                                                                assert(sem_post(&libtrace->sem) == 0);
854                                                                assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
855                                                                return ret;
856                                                        } else {
857                                                                assert(!"Our queue is full but I cannot read from it??");
858                                                        }
859                                                }
860                                                // Not us we have to give the other threads a chance to write there packets then
861                                                libtrace->perpkt_queue_full = true;
862                                                assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
863                                                for (i = 0; i < libtrace->perpkt_thread_count-1; i++) // Release all other threads to read there packets
864                                                        assert(sem_post(&libtrace->sem) == 0);
865
866                                                contention_stats[t->perpkt_num].full_queue_hits++;
867                                                assert(pthread_rwlock_wrlock(&libtrace->window_lock) == 0);
868                                                // Grab these back
869                                                for (i = 0; i < libtrace->perpkt_thread_count-1; i++) // Release all other threads to read there packets
870                                                        assert(sem_wait(&libtrace->sem) == 0);
871                                                libtrace->perpkt_queue_full = false;
872                                        }
873                                        assert(sem_post(&libtrace->sem) == 0);
874                                        *packet = NULL;
875                                } else {
876                                        // Cannot write to a queue if no ones waiting (I think this is unreachable)
877                                        // in the general case (unless the user ends early without proper clean up).
878                                        assert (!"unreachable code??");
879                                }
880                        }
881                        assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
882                }
883                // Now we go back to checking our queue anyways
884        }
885}
886
887
888/**
889 * For the first packet of each queue we keep a copy and note the system
890 * time it was received at.
891 *
892 * This is used for finding the first packet when playing back a trace
893 * in trace time. And can be used by real time applications to print
894 * results out every XXX seconds.
895 */
896void store_first_packet(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t)
897{
898        if (!t->recorded_first) {
899                struct timeval tv;
900                libtrace_packet_t * dup;
901                // For what it's worth we can call these outside of the lock
902                gettimeofday(&tv, NULL);
903                dup = trace_copy_packet(packet);
904                assert(pthread_spin_lock(&libtrace->first_packets.lock) == 0);
905                libtrace->first_packets.packets[t->perpkt_num].packet = dup;
906                //printf("Stored first packet time=%f\n", trace_get_seconds(dup));
907                memcpy(&libtrace->first_packets.packets[t->perpkt_num].tv, &tv, sizeof(tv));
908                // Now update the first
909                libtrace->first_packets.count++;
910                if (libtrace->first_packets.count == 1) {
911                        // We the first entry hence also the first known packet
912                        libtrace->first_packets.first = t->perpkt_num;
913                } else {
914                        // Check if we are newer than the previous 'first' packet
915                        size_t first = libtrace->first_packets.first;
916                        if (trace_get_seconds(dup) <
917                                trace_get_seconds(libtrace->first_packets.packets[first].packet))
918                                libtrace->first_packets.first = t->perpkt_num;
919                }
920                assert(pthread_spin_unlock(&libtrace->first_packets.lock) == 0);
921                libtrace_message_t mesg = {0};
922                mesg.code = MESSAGE_FIRST_PACKET;
923                trace_send_message_to_reducer(libtrace, &mesg);
924                t->recorded_first = true;
925        }
926}
927
928/**
929 * Returns 1 if it's certain that the first packet is truly the first packet
930 * rather than a best guess based upon threads that have published so far.
931 * Otherwise 0 is returned.
932 * It's recommended that this result is stored rather than calling this
933 * function again.
934 */
935DLLEXPORT int retrive_first_packet(libtrace_t *libtrace, libtrace_packet_t **packet, struct timeval **tv)
936{
937        int ret = 0;
938        assert(pthread_spin_lock(&libtrace->first_packets.lock) == 0);
939        if (libtrace->first_packets.count) {
940                *packet = libtrace->first_packets.packets[libtrace->first_packets.first].packet;
941                *tv = &libtrace->first_packets.packets[libtrace->first_packets.first].tv;
942                if (libtrace->first_packets.count == (size_t) libtrace->perpkt_thread_count) {
943                        ret = 1;
944                } else {
945                        struct timeval curr_tv;
946                        // If a second has passed since the first entry we will assume this is the very first packet
947                        gettimeofday(&curr_tv, NULL);
948                        if (curr_tv.tv_sec > (*tv)->tv_sec) {
949                                if(curr_tv.tv_usec > (*tv)->tv_usec || curr_tv.tv_sec - (*tv)->tv_sec > 1) {
950                                        ret = 1;
951                                }
952                        }
953                }
954        } else {
955                *packet = NULL;
956                *tv = NULL;
957        }
958        assert(pthread_spin_unlock(&libtrace->first_packets.lock) == 0);
959        return ret;
960}
961
962
963DLLEXPORT uint64_t tv_to_usec(struct timeval *tv)
964{
965        return (uint64_t) tv->tv_sec*1000000ull + (uint64_t) tv->tv_usec;
966}
967
968inline static struct timeval usec_to_tv(uint64_t usec)
969{
970        struct timeval tv;
971        tv.tv_sec = usec / 1000000;
972        tv.tv_usec = usec % 1000000;
973        return tv;
974}
975
976/** Similar to delay_tracetime but send messages to all threads periodically */
977static void* keepalive_entry(void *data) {
978        struct timeval prev, next;
979        libtrace_message_t message = {0};
980        libtrace_t *trace = (libtrace_t *)data;
981        uint64_t next_release;
982        fprintf(stderr, "keepalive thread is starting\n");
983
984        gettimeofday(&prev, NULL);
985        message.code = MESSAGE_TICK;
986        while (trace->state != STATE_FINSHED) {
987                fd_set rfds;
988                next_release = tv_to_usec(&prev) + (trace->tick_interval * 1000);
989                gettimeofday(&next, NULL);
990                if (next_release > tv_to_usec(&next)) {
991                        next = usec_to_tv(next_release - tv_to_usec(&next));
992                        // Wait for timeout or a message
993                        FD_ZERO(&rfds);
994                FD_SET(libtrace_message_queue_get_fd(&trace->keepalive_thread.messages), &rfds);
995                        if (select(libtrace_message_queue_get_fd(&trace->keepalive_thread.messages)+1, &rfds, NULL, NULL, &next) == 1) {
996                                libtrace_message_t msg;
997                                libtrace_message_queue_get(&trace->keepalive_thread.messages, &msg);
998                                assert(msg.code == MESSAGE_DO_STOP);
999                                goto done;
1000                        }
1001                }
1002                prev = usec_to_tv(next_release);
1003                if (trace->state == STATE_RUNNING) {
1004                        message.additional.uint64 = tv_to_usec(&prev);
1005                        trace_send_message_to_perpkts(trace, &message);
1006                }
1007        }
1008done:
1009
1010        thread_change_state(trace, &trace->keepalive_thread, THREAD_FINISHED, true);
1011        return NULL;
1012}
1013
1014/**
1015 * Delays a packets playback so the playback will be in trace time
1016 */
1017static inline void delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t) {
1018        struct timeval curr_tv, pkt_tv;
1019        uint64_t next_release = t->tracetime_offset_usec; // Time at which to release the packet
1020        uint64_t curr_usec;
1021        /* Tracetime we might delay releasing this packet */
1022        if (!t->tracetime_offset_usec) {
1023                libtrace_packet_t * first_pkt;
1024                struct timeval *sys_tv;
1025                int64_t initial_offset;
1026                int stable = retrive_first_packet(libtrace, &first_pkt, &sys_tv);
1027                assert(first_pkt);
1028                pkt_tv = trace_get_timeval(first_pkt);
1029                initial_offset = (int64_t)tv_to_usec(sys_tv) - (int64_t)tv_to_usec(&pkt_tv);
1030                if (stable)
1031                        // 0->1 because 0 is used to mean unset
1032                        t->tracetime_offset_usec = initial_offset ? initial_offset: 1;
1033                next_release = initial_offset;
1034        }
1035        /* next_release == offset */
1036        pkt_tv = trace_get_timeval(packet);
1037        next_release += tv_to_usec(&pkt_tv);
1038        gettimeofday(&curr_tv, NULL);
1039        curr_usec = tv_to_usec(&curr_tv);
1040        if (next_release > curr_usec) {
1041                // We need to wait
1042                struct timeval delay_tv = usec_to_tv(next_release-curr_usec);
1043                //printf("WAITING for %d.%d next=%"PRIu64" curr=%"PRIu64" seconds packettime %f\n", delay_tv.tv_sec, delay_tv.tv_usec, next_release, curr_usec, trace_get_seconds(packet));
1044                select(0, NULL, NULL, NULL, &delay_tv);
1045        }
1046}
1047
1048/* Read one packet from the trace into a buffer. Note that this function will
1049 * block until a packet is read (or EOF is reached).
1050 *
1051 * @param libtrace      the libtrace opaque pointer
1052 * @param packet        the packet opaque pointer
1053 * @returns 0 on EOF, negative value on error
1054 *
1055 * Note this is identical to read_packet but calls pread_packet instead of
1056 * read packet in the format.
1057 *
1058 */
1059static inline int trace_pread_packet_wrapper(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packet) {
1060
1061        assert(libtrace && "You called trace_read_packet() with a NULL libtrace parameter!\n");
1062        if (trace_is_err(libtrace))
1063                return -1;
1064        if (!libtrace->started) {
1065                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,"You must call libtrace_start() before trace_read_packet()\n");
1066                return -1;
1067        }
1068        if (!(packet->buf_control==TRACE_CTRL_PACKET || packet->buf_control==TRACE_CTRL_EXTERNAL)) {
1069                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,"Packet passed to trace_read_packet() is invalid\n");
1070                return -1;
1071        }
1072        assert(packet);
1073
1074        if (libtrace->format->read_packet) {
1075                do {
1076                        size_t ret;
1077                        /* Finalise the packet, freeing any resources the format module
1078                         * may have allocated it and zeroing all data associated with it.
1079                         */
1080                        trace_fin_packet(packet);
1081                        /* Store the trace we are reading from into the packet opaque
1082                         * structure */
1083                        packet->trace = libtrace;
1084                        ret=libtrace->format->pread_packet(libtrace, t, packet);
1085                        if (ret==(size_t)-1 || ret==(size_t)-2 || ret==0) {
1086                                return ret;
1087                        }
1088                        if (libtrace->filter) {
1089                                /* If the filter doesn't match, read another
1090                                 * packet
1091                                 */
1092                                if (!trace_apply_filter(libtrace->filter,packet)){
1093                                        ++libtrace->filtered_packets;
1094                                        continue;
1095                                }
1096                        }
1097                        if (libtrace->snaplen>0) {
1098                                /* Snap the packet */
1099                                trace_set_capture_length(packet,
1100                                                libtrace->snaplen);
1101                        }
1102                        trace_packet_set_order(packet, libtrace->accepted_packets);
1103                        ++libtrace->accepted_packets;
1104                        return ret;
1105                } while(1);
1106        }
1107        trace_set_err(libtrace,TRACE_ERR_UNSUPPORTED,"This format does not support reading packets\n");
1108        return ~0U;
1109}
1110
1111/**
1112 * Read a packet from the parallel trace
1113 */
1114DLLEXPORT int trace_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packet)
1115{
1116        int ret;
1117
1118        // Cleanup the packet passed back
1119        if (*packet)
1120                trace_fin_packet(*packet);
1121
1122        if (trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
1123                if (!*packet)
1124                        *packet = trace_create_packet();
1125                ret = trace_pread_packet_wrapper(libtrace, t, *packet);
1126        } else if (trace_has_dedicated_hasher(libtrace)) {
1127                ret = trace_pread_packet_hasher_thread(libtrace, t, packet);
1128        } else if (!trace_has_dedicated_hasher(libtrace)) {
1129                /* We don't care about which core a packet goes to */
1130                ret = trace_pread_packet_first_in_first_served(libtrace, t, packet);
1131        } /* else {
1132                ret = trace_pread_packet_hash_locked(libtrace, packet);
1133        }*/
1134
1135        // Formats can also optionally do this internally to ensure the first
1136        // packet is always reported correctly
1137        if (ret > 0) {
1138                store_first_packet(libtrace, *packet, t);
1139                if (libtrace->tracetime)
1140                        delay_tracetime(libtrace, *packet, t);
1141        }
1142
1143        return ret;
1144}
1145
1146/* Starts perpkt threads
1147 * @return threads_started
1148 */
1149static inline int trace_start_perpkt_threads (libtrace_t *libtrace) {
1150        int i;
1151
1152        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1153                libtrace_thread_t *t = &libtrace->perpkt_threads[i];
1154                assert(pthread_create(&t->tid, NULL, perpkt_threads_entry, (void *) libtrace) == 0);
1155        }
1156        return libtrace->perpkt_thread_count;
1157}
1158
1159/* Start an input trace in a parallel fashion, or restart a paused trace.
1160 *
1161 * NOTE: libtrace lock is held for the majority of this function
1162 *
1163 * @param libtrace the input trace to start
1164 * @param global_blob some global data you can share with the new perpkt threads
1165 * @returns 0 on success
1166 */
1167DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob, fn_per_pkt per_pkt, fn_reducer reducer)
1168{
1169        int i;
1170        sigset_t sig_before, sig_block_all;
1171        assert(libtrace);
1172        if (trace_is_err(libtrace)) {
1173                return -1;
1174        }
1175       
1176        // NOTE: Until the trace is started we wont have a libtrace_lock initialised
1177        if (libtrace->state != STATE_NEW) {
1178                int err = 0;
1179                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1180                if (libtrace->state != STATE_PAUSED) {
1181                        trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
1182                                "The trace(%s) has already been started and is not paused!!", libtrace->uridata);
1183                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1184                        return -1;
1185                }
1186               
1187                // Update the per_pkt function, or reuse the old one
1188                if (per_pkt)
1189                        libtrace->per_pkt = per_pkt;
1190
1191                assert(libtrace_parallel);
1192                assert(!libtrace->perpkt_thread_states[THREAD_RUNNING]);
1193                assert(libtrace->per_pkt);
1194               
1195                if (libtrace->perpkt_thread_count > 1 && trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
1196                        fprintf(stderr, "Restarting trace pstart_input()\n");
1197                        err = libtrace->format->pstart_input(libtrace);
1198                } else {
1199                        if (libtrace->format->start_input) {
1200                                fprintf(stderr, "Restarting trace start_input()\n");
1201                                err = libtrace->format->start_input(libtrace);
1202                        }
1203                }
1204               
1205                if (err == 0) {
1206                        libtrace->started = true;
1207                        libtrace_change_state(libtrace, STATE_RUNNING, false);
1208                }
1209                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1210                return err;
1211        }
1212
1213        assert(libtrace->state == STATE_NEW);
1214        libtrace_parallel = 1;
1215
1216        // Store the user defined things against the trace
1217        libtrace->global_blob = global_blob;
1218        libtrace->per_pkt = per_pkt;
1219        libtrace->reducer = reducer;
1220
1221        assert(pthread_mutex_init(&libtrace->libtrace_lock, NULL) == 0);
1222        assert(pthread_cond_init(&libtrace->perpkt_cond, NULL) == 0);
1223        assert(pthread_rwlock_init(&libtrace->window_lock, NULL) == 0);
1224        // Grab the lock
1225        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1226
1227        // Set default buffer sizes
1228        if (libtrace->perpkt_buffer_size <= 0)
1229                libtrace->perpkt_buffer_size = 1000;
1230
1231        if (libtrace->perpkt_thread_count <= 0) {
1232                // TODO add BSD support
1233                libtrace->perpkt_thread_count = sysconf(_SC_NPROCESSORS_ONLN);
1234                if (libtrace->perpkt_thread_count <= 0)
1235                        // Lets just use one
1236                        libtrace->perpkt_thread_count = 1;
1237        }
1238
1239        if(libtrace->packet_freelist_size <= 0)
1240                libtrace->packet_freelist_size = (libtrace->perpkt_buffer_size + 1) * libtrace->perpkt_thread_count;
1241
1242        if(libtrace->packet_freelist_size <
1243                (libtrace->perpkt_buffer_size + 1) * libtrace->perpkt_thread_count)
1244                fprintf(stderr, "WARNING deadlocks may occur and extra memory allocating buffer sizes (packet_freelist_size) mismatched\n");
1245
1246        libtrace->started = true; // Before we start the threads otherwise we could have issues
1247        libtrace_change_state(libtrace, STATE_RUNNING, false);
1248        /* Disable signals - Pthread signal handling */
1249
1250        sigemptyset(&sig_block_all);
1251
1252        assert(pthread_sigmask(SIG_SETMASK, &sig_block_all, &sig_before) == 0);
1253
1254        // If we are using a hasher start it
1255        // If single threaded we don't need a hasher
1256        if (libtrace->perpkt_thread_count > 1 && libtrace->hasher && libtrace->hasher_type != HASHER_HARDWARE) {
1257                libtrace_thread_t *t = &libtrace->hasher_thread;
1258                t->trace = libtrace;
1259                t->ret = NULL;
1260                t->type = THREAD_HASHER;
1261                t->state = THREAD_RUNNING;
1262                libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
1263                assert(pthread_create(&t->tid, NULL, hasher_start, (void *) libtrace) == 0);
1264        } else {
1265                libtrace->hasher_thread.type = THREAD_EMPTY;
1266        }
1267        libtrace_ringbuffer_init(&libtrace->packet_freelist, libtrace->packet_freelist_size, LIBTRACE_RINGBUFFER_BLOCKING);
1268        //libtrace_slidingwindow_init(&libtrace->sliding_window, libtrace->packet_freelist_size, 0);
1269        assert(sem_init(&libtrace->sem, 0, libtrace->packet_freelist_size) == 0);
1270        // This will be applied to every new thread that starts, i.e. they will block all signals
1271        // Lets start a fixed number of reading threads
1272
1273        // For now we never have a dedicated thread for the reducer
1274        // i.e. This main thread is used as the reducer
1275        libtrace->reducer_thread.tid = pthread_self();
1276        libtrace->reducer_thread.type = THREAD_REDUCER;
1277        libtrace->reducer_thread.state = THREAD_RUNNING;
1278        libtrace_message_queue_init(&libtrace->reducer_thread.messages, sizeof(libtrace_message_t));
1279
1280        /* Ready some storages */
1281        libtrace->first_packets.first = 0;
1282        libtrace->first_packets.count = 0;
1283        assert(pthread_spin_init(&libtrace->first_packets.lock, 0) == 0);
1284        libtrace->first_packets.packets = calloc(libtrace->perpkt_thread_count, sizeof(struct  __packet_storage_magic_type));
1285
1286
1287        /* Ready all of our perpkt threads - they are started later */
1288        libtrace->perpkt_threads = calloc(sizeof(libtrace_thread_t), libtrace->perpkt_thread_count);
1289        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1290                libtrace_thread_t *t = &libtrace->perpkt_threads[i];
1291                t->trace = libtrace;
1292                t->ret = NULL;
1293                t->type = THREAD_PERPKT;
1294                t->state = THREAD_RUNNING;
1295                t->user_data = NULL;
1296                // t->tid DONE on create
1297                t->perpkt_num = i;
1298                if (libtrace->hasher)
1299                        libtrace_ringbuffer_init(&t->rbuffer, libtrace->perpkt_buffer_size, LIBTRACE_RINGBUFFER_BLOCKING);
1300                // Depending on the mode vector or deque might be chosen
1301                libtrace_vector_init(&t->vector, sizeof(libtrace_result_t));
1302                libtrace_deque_init(&t->deque, sizeof(libtrace_result_t));
1303                libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
1304                t->tmp_key = 0;
1305                t->tmp_data = NULL;
1306                t->recorded_first = false;
1307                assert(pthread_spin_init(&t->tmp_spinlock, 0) == 0);
1308                t->tracetime_offset_usec = 0;;
1309        }
1310
1311        int threads_started = 0;
1312        /* Setup the trace and start our threads */
1313        if (libtrace->perpkt_thread_count > 1 && trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
1314                printf("This format has direct support for p's\n");
1315                threads_started = libtrace->format->pstart_input(libtrace);
1316        } else {
1317                if (libtrace->format->start_input) {
1318                        threads_started=libtrace->format->start_input(libtrace);
1319                }
1320        }
1321        if (threads_started == 0)
1322                threads_started = trace_start_perpkt_threads(libtrace);
1323
1324        if (libtrace->tick_interval > 0) {
1325                libtrace->keepalive_thread.type = THREAD_KEEPALIVE;
1326                libtrace->keepalive_thread.state = THREAD_RUNNING;
1327                libtrace_message_queue_init(&libtrace->keepalive_thread.messages, sizeof(libtrace_message_t));
1328                assert(pthread_create(&libtrace->keepalive_thread.tid, NULL, keepalive_entry, (void *) libtrace) == 0);
1329        }
1330
1331        for (i = 0; i < THREAD_STATE_MAX; ++i) {
1332                libtrace->perpkt_thread_states[i] = 0;
1333        }
1334        libtrace->perpkt_thread_states[THREAD_RUNNING] = threads_started;
1335
1336        // Revert back - Allow signals again
1337        assert(pthread_sigmask(SIG_SETMASK, &sig_before, NULL) == 0);
1338        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1339
1340        if (threads_started < 0)
1341                // Error
1342                return threads_started;
1343
1344        // TODO fix these leaks etc
1345        if (libtrace->perpkt_thread_count != threads_started)
1346                fprintf(stderr, "Warning started threads not equal requested s=%d r=%d", threads_started, libtrace->perpkt_thread_count);
1347
1348
1349        return 0;
1350}
1351
1352/**
1353 * Pauses a trace, this should only be called by the main thread
1354 * 1. Set started = false
1355 * 2. All perpkt threads are paused waiting on a condition var
1356 * 3. Then call ppause on the underlying format if found
1357 * 4. The traces state is paused
1358 *
1359 * Once done you should be able to modify the trace setup and call pstart again
1360 * TODO handle changing thread numbers
1361 */
1362DLLEXPORT int trace_ppause(libtrace_t *libtrace)
1363{
1364        libtrace_thread_t *t;
1365        int i;
1366        assert(libtrace);
1367       
1368        t = get_thread_table(libtrace);
1369        // Check state from within the lock if we are going to change it
1370        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1371        if (!libtrace->started || libtrace->state != STATE_RUNNING) {
1372                fprintf(stderr, "pause failed started=%d state=%s (%d)\n", libtrace->started, get_trace_state_name(libtrace->state), libtrace->state);
1373                trace_set_err(libtrace,TRACE_ERR_BAD_STATE, "You must call trace_start() before calling trace_ppause()");
1374                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1375                return -1;
1376        }
1377
1378        libtrace_change_state(libtrace, STATE_PAUSING, false);
1379        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1380
1381        // Special case handle the hasher thread case
1382        if (trace_has_dedicated_hasher(libtrace)) {
1383                fprintf(stderr, "Hasher thread running we deal with this special!\n");
1384                libtrace_message_t message = {0};
1385                message.code = MESSAGE_DO_PAUSE;
1386                trace_send_message_to_thread(libtrace, &libtrace->hasher_thread, &message);
1387                // Wait for it to pause
1388                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1389                while (libtrace->hasher_thread.state == THREAD_RUNNING) {
1390                        assert(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock) == 0);
1391                }
1392                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1393        }
1394
1395        fprintf(stderr, "Sending messages \n");
1396        // Stop threads, skip this one if it's a perpkt
1397        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1398                if (&libtrace->perpkt_threads[i] != t) {
1399                        libtrace_message_t message = {0};
1400                        message.code = MESSAGE_DO_PAUSE;
1401                        trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message);
1402                        if(trace_has_dedicated_hasher(libtrace)) {
1403                                // The hasher has stopped and other threads have messages waiting therefore
1404                                // If the queues are empty the other threads would have no data
1405                                // So send some NULL packets to simply ask the threads to check there message queues
1406                                // We are the only writer since hasher has paused
1407                                libtrace_ringbuffer_write(&libtrace->perpkt_threads[i].rbuffer, NULL);
1408                        }
1409                } else {
1410                        fprintf(stderr, "Mapper threads should not be used to pause a trace this could cause any number of problems!!\n");
1411                }
1412        }
1413
1414        // Formats must support native message handling if a message is ready
1415        // Approach per Perry's suggestion is a non-blocking read
1416        // followed by a blocking read. XXX STRIP THIS OUT
1417
1418        if (t) {
1419                // A perpkt is doing the pausing, interesting, fake an extra thread paused
1420                // We rely on the user to *not* return before starting the trace again
1421                thread_change_state(libtrace, t, THREAD_PAUSED, true);
1422        }
1423
1424        fprintf(stderr, "Asking threads to pause\n");
1425
1426        // Wait for all threads to pause
1427        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1428        while(libtrace->perpkt_thread_states[THREAD_RUNNING]) {
1429                assert(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock) == 0);
1430        }
1431        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1432
1433        fprintf(stderr, "Threads have paused\n");
1434
1435        if (trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
1436                libtrace->started = false;
1437                if (libtrace->format->ppause_input)
1438                        libtrace->format->ppause_input(libtrace);
1439                // TODO What happens if we don't have pause input??
1440        } else {
1441                int err;
1442                fprintf(stderr, "Trace is not parallel so we are doing a normal pause %s\n", libtrace->uridata);
1443                err = trace_pause(libtrace);
1444                // We should handle this a bit better
1445                if (err)
1446                        return err;
1447        }
1448
1449        // Only set as paused after the pause has been called on the trace
1450        libtrace_change_state(libtrace, STATE_PAUSED, true);
1451        return 0;
1452}
1453
1454/**
1455 * Stop trace finish prematurely as though it meet an EOF
1456 * This should only be called by the main thread
1457 * 1. Calls ppause
1458 * 2. Sends a message asking for threads to finish
1459 * 3. Releases threads which will pause
1460 */
1461DLLEXPORT int trace_pstop(libtrace_t *libtrace)
1462{
1463        int i, err;
1464        libtrace_message_t message = {0};
1465        assert(libtrace);
1466
1467        // Ensure all threads have paused and the underlying trace format has
1468        // been closed and all packets associated are cleaned up
1469        // Pause will do any state checks for us
1470        err = trace_ppause(libtrace);
1471        if (err)
1472                return err;
1473
1474        // Now send a message asking the threads to stop
1475        // This will be retrieved before trying to read another packet
1476       
1477        message.code = MESSAGE_DO_STOP;
1478        trace_send_message_to_perpkts(libtrace, &message);
1479        if (trace_has_dedicated_hasher(libtrace))
1480                trace_send_message_to_thread(libtrace, &libtrace->hasher_thread, &message);
1481       
1482        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1483                trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message);
1484        }
1485
1486        // Now release the threads and let them stop
1487        libtrace_change_state(libtrace, STATE_FINSHED, true);
1488        return 0;
1489}
1490
1491/**
1492 * Set the hasher type along with a selected function, if hardware supports
1493 * that generic type of hashing it will be used otherwise the supplied
1494 * hasher function will be used and passed data when called.
1495 *
1496 * @return 0 if successful otherwise -1 on error
1497 */
1498DLLEXPORT int trace_set_hasher(libtrace_t *trace, enum hasher_types type, fn_hasher hasher, void *data) {
1499        int ret = -1;
1500        if (type == HASHER_HARDWARE || (type == HASHER_CUSTOM && !hasher) || (type == HASHER_BALANCE && hasher)) {
1501                return -1;
1502        }
1503
1504        // Save the requirements
1505        trace->hasher_type = type;
1506        if (hasher) {
1507                trace->hasher = hasher;
1508                trace->hasher_data = data;
1509        } else {
1510                trace->hasher = NULL;
1511                // TODO consider how to handle freeing this
1512                trace->hasher_data = NULL;
1513        }
1514
1515        // Try push this to hardware - NOTE hardware could do custom if
1516        // there is a more efficient way to apply it, in this case
1517        // it will simply grab the function out of libtrace_t
1518        if (trace->format->pconfig_input)
1519                ret = trace->format->pconfig_input(trace, TRACE_OPTION_SET_HASHER, &type);
1520
1521        if (ret == -1) {
1522                // We have to deal with this ourself
1523                // This most likely means single threaded reading of the trace
1524                if (!hasher) {
1525                        switch (type)
1526                        {
1527                                case HASHER_CUSTOM:
1528                                case HASHER_BALANCE:
1529                                        return 0;
1530                                case HASHER_BIDIRECTIONAL:
1531                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
1532                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
1533                                        toeplitz_init_config(trace->hasher_data, 1);
1534                                        return 0;
1535                                case HASHER_UNIDIRECTIONAL:
1536                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
1537                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
1538                                        toeplitz_init_config(trace->hasher_data, 0);
1539                                        return 0;
1540                                case HASHER_HARDWARE:
1541                                        return -1;
1542                        }
1543                        return -1;
1544                }
1545        } else {
1546                // The hardware is dealing with this yay
1547                trace->hasher_type = HASHER_HARDWARE;
1548        }
1549
1550        return 0;
1551}
1552
1553// Waits for all threads to finish
1554DLLEXPORT void trace_join(libtrace_t *libtrace) {
1555        int i;
1556
1557        /* Firstly wait for the perpkt threads to finish, since these are
1558         * user controlled */
1559        for (i=0; i< libtrace->perpkt_thread_count; i++) {
1560                //printf("Waiting to join with perpkt #%d\n", i);
1561                assert(pthread_join(libtrace->perpkt_threads[i].tid, NULL) == 0);
1562                //printf("Joined with perpkt #%d\n", i);
1563                // So we must do our best effort to empty the queue - so
1564                // the producer (or any other threads) don't block.
1565                libtrace_packet_t * packet;
1566                assert(libtrace->perpkt_threads[i].state == THREAD_FINISHED);
1567                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
1568                        if (packet) // This could be NULL iff the perpkt finishes early
1569                                trace_destroy_packet(packet);
1570        }
1571
1572        /* Now the hasher */
1573        // XXX signal it to stop if it hasn't already we should never be in this situation!!
1574        if (trace_has_dedicated_hasher(libtrace)) {
1575                fprintf(stderr, "Waiting to join with the hasher\n");
1576                pthread_join(libtrace->hasher_thread.tid, NULL);
1577                fprintf(stderr, "Joined with the hasher\n");
1578                assert(libtrace->hasher_thread.state == THREAD_FINISHED);
1579        }
1580
1581        // Now that everything is finished nothing can be touching our
1582        // buffers so clean them up
1583        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1584                // Its possible 1 packet got added by the reducer (or 1 per any other thread) since we cleaned up
1585                // if they lost timeslice before-during a write
1586                libtrace_packet_t * packet;
1587                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
1588                        trace_destroy_packet(packet);
1589                if (libtrace->hasher) {
1590                        assert(libtrace_ringbuffer_is_empty(&libtrace->perpkt_threads[i].rbuffer));
1591                        libtrace_ringbuffer_destroy(&libtrace->perpkt_threads[i].rbuffer);
1592                }
1593                // Cannot destroy vector yet, this happens with trace_destroy
1594        }
1595        // TODO consider perpkt threads marking trace as finished before join is called
1596        libtrace_change_state(libtrace, STATE_FINSHED, true);
1597       
1598        // Wait for the tick (keepalive) thread if it has been started
1599        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
1600                libtrace_message_t msg = {0};
1601                msg.code = MESSAGE_DO_STOP;
1602                fprintf(stderr, "Waiting to join with the keepalive\n");
1603                trace_send_message_to_thread(libtrace, &libtrace->keepalive_thread, &msg);
1604                pthread_join(libtrace->keepalive_thread.tid, NULL);
1605                fprintf(stderr, "Joined with with the keepalive\n");
1606        }
1607       
1608        libtrace_change_state(libtrace, STATE_JOINED, true);
1609}
1610
1611DLLEXPORT int libtrace_thread_get_message_count(libtrace_t * libtrace)
1612{
1613        libtrace_thread_t * t = get_thread_descriptor(libtrace);
1614        assert(t);
1615        return libtrace_message_queue_count(&t->messages);
1616}
1617
1618DLLEXPORT int libtrace_thread_get_message(libtrace_t * libtrace, libtrace_message_t * message)
1619{
1620        libtrace_thread_t * t = get_thread_descriptor(libtrace);
1621        assert(t);
1622        return libtrace_message_queue_get(&t->messages, message);
1623}
1624
1625DLLEXPORT int libtrace_thread_try_get_message(libtrace_t * libtrace, libtrace_message_t * message)
1626{
1627        libtrace_thread_t * t = get_thread_descriptor(libtrace);
1628        assert(t);
1629        return libtrace_message_queue_try_get(&t->messages, message);
1630}
1631
1632/**
1633 * Return backlog indicator
1634 */
1635DLLEXPORT int trace_post_reduce(libtrace_t *libtrace)
1636{
1637        libtrace_message_t message = {0};
1638        message.code = MESSAGE_POST_REDUCE;
1639        message.sender = get_thread_descriptor(libtrace);
1640        return libtrace_message_queue_put(&libtrace->reducer_thread.messages, (void *) &message);
1641}
1642
1643/**
1644 * Return backlog indicator
1645 */
1646DLLEXPORT int trace_send_message_to_reducer(libtrace_t * libtrace, libtrace_message_t * message)
1647{
1648        //printf("Sending message code=%d to reducer\n", message->code);
1649        message->sender = get_thread_descriptor(libtrace);
1650        return libtrace_message_queue_put(&libtrace->reducer_thread.messages, message);
1651}
1652
1653/**
1654 *
1655 */
1656DLLEXPORT int trace_send_message_to_thread(libtrace_t * libtrace, libtrace_thread_t *t, libtrace_message_t * message)
1657{
1658        //printf("Sending message code=%d to reducer\n", message->code);
1659        message->sender = get_thread_descriptor(libtrace);
1660        return libtrace_message_queue_put(&t->messages, message);
1661}
1662
1663DLLEXPORT int trace_send_message_to_perpkts(libtrace_t * libtrace, libtrace_message_t * message)
1664{
1665        int i;
1666        message->sender = get_thread_descriptor(libtrace);
1667        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1668                libtrace_message_queue_put(&libtrace->perpkt_threads[i].messages, message);
1669        }
1670        //printf("Sending message code=%d to reducer\n", message->code);
1671        return 0;
1672}
1673
1674DLLEXPORT void libtrace_result_set_key(libtrace_result_t * result, uint64_t key) {
1675        result->key = key;
1676}
1677DLLEXPORT uint64_t libtrace_result_get_key(libtrace_result_t * result) {
1678        return result->key;
1679}
1680DLLEXPORT void libtrace_result_set_value(libtrace_result_t * result, void * value) {
1681        result->value = value;
1682}
1683DLLEXPORT void* libtrace_result_get_value(libtrace_result_t * result) {
1684        return result->value;
1685}
1686DLLEXPORT void libtrace_result_set_key_value(libtrace_result_t * result, uint64_t key, void * value) {
1687        result->key = key;
1688        result->value = value;
1689}
1690DLLEXPORT void trace_destroy_result(libtrace_result_t ** result) {
1691        free(*result);
1692        result = NULL;
1693        // TODO automatically back with a free list!!
1694}
1695
1696DLLEXPORT void * trace_get_global(libtrace_t *trace)
1697{
1698        return trace->global_blob;
1699}
1700
1701DLLEXPORT void * trace_set_global(libtrace_t *trace, void * data)
1702{
1703        if (trace->global_blob && trace->global_blob != data) {
1704                void * ret = trace->global_blob;
1705                trace->global_blob = data;
1706                return ret;
1707        } else {
1708                trace->global_blob = data;
1709                return NULL;
1710        }
1711}
1712
1713DLLEXPORT void * trace_get_tls(libtrace_thread_t *t)
1714{
1715        return t->user_data;
1716}
1717
1718DLLEXPORT void * trace_set_tls(libtrace_thread_t *t, void * data)
1719{
1720        if(t->user_data && t->user_data != data) {
1721                void *ret = t->user_data;
1722                t->user_data = data;
1723                return ret;
1724        } else {
1725                t->user_data = data;
1726                return NULL;
1727        }
1728}
1729
1730/**
1731 * Publish to the reduce queue, return
1732 */
1733DLLEXPORT void trace_publish_result(libtrace_t *libtrace, uint64_t key, void * value) {
1734        libtrace_result_t res;
1735        res.is_packet = 0;
1736        // Who am I???
1737        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
1738        libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
1739        // Now put it into my table
1740        UNUSED static __thread int count = 0;
1741
1742
1743        libtrace_result_set_key_value(&res, key, value);
1744        /*
1745        if (count == 1)
1746                printf("My vector size is %d\n", libtrace_vector_get_size(&t->vector));
1747        count = (count+1) %1000;
1748        libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :)
1749        */
1750        /*if (count == 1)
1751                printf("My vector size is %d\n", libtrace_deque_get_size(&t->deque));
1752        count = (count+1)%1000;*/
1753        if (libtrace->reducer_flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED)) {
1754                if (libtrace_deque_get_size(&t->deque) >= 800) {
1755                        trace_post_reduce(libtrace);
1756                }
1757                //while (libtrace_deque_get_size(&t->deque) >= 1000)
1758                //      sched_yield();
1759                libtrace_deque_push_back(&t->deque, &res); // Automatically locking for us :)
1760        } else {
1761                //while (libtrace_vector_get_size(&t->vector) >= 1000)
1762                //      sched_yield();
1763
1764                if (libtrace_vector_get_size(&t->vector) >= 800) {
1765                        trace_post_reduce(libtrace);
1766                }
1767                libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :)
1768        }
1769}
1770
1771DLLEXPORT void trace_publish_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
1772        libtrace_result_t res;
1773        // Who am I???
1774        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
1775        libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
1776        // Now put it into my table
1777        UNUSED static __thread int count = 0;
1778
1779        res.is_packet = 1;
1780        libtrace_result_set_key_value(&res, trace_packet_get_order(packet), packet);
1781        /*
1782        if (count == 1)
1783                printf("My vector size is %d\n", libtrace_vector_get_size(&t->vector));
1784        count = (count+1) %1000;
1785        libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :)
1786        */
1787        /*if (count == 1)
1788                printf("My vector size is %d\n", libtrace_deque_get_size(&t->deque));
1789        count = (count+1)%1000;*/
1790        if (libtrace->reducer_flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED)) {
1791                if (libtrace_deque_get_size(&t->deque) >= 800) {
1792                        trace_post_reduce(libtrace);
1793                }
1794                //while (libtrace_deque_get_size(&t->deque) >= 1000)
1795                //      sched_yield();
1796                libtrace_deque_push_back(&t->deque, &res); // Automatically locking for us :)
1797        } else {
1798                //while (libtrace_vector_get_size(&t->vector) >= 1000)
1799                //      sched_yield();
1800
1801                if (libtrace_vector_get_size(&t->vector) >= 800) {
1802                        trace_post_reduce(libtrace);
1803                }
1804                libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :)
1805        }
1806}
1807
1808
1809static int compareres(const void* p1, const void* p2)
1810{
1811        if (libtrace_result_get_key((libtrace_result_t *) p1) < libtrace_result_get_key((libtrace_result_t *) p2))
1812                return -1;
1813        if (libtrace_result_get_key((libtrace_result_t *) p1) == libtrace_result_get_key((libtrace_result_t *) p2))
1814                return 0;
1815        else
1816                return 1;
1817}
1818
1819DLLEXPORT int trace_get_results(libtrace_t *libtrace, libtrace_vector_t * results) {
1820        int i;
1821        int flags = libtrace->reducer_flags; // Hint these aren't a changing
1822
1823        libtrace_vector_empty(results);
1824
1825        /* Here we assume queues are in order ascending order and they want
1826         * the smallest result first. If they are not in order the results
1827         * may not be in order.
1828         */
1829        if (flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED)) {
1830                int live_count = 0;
1831                bool live[libtrace->perpkt_thread_count]; // Set if a trace is alive
1832                uint64_t key[libtrace->perpkt_thread_count]; // Cached keys
1833                uint64_t min_key = UINT64_MAX; // XXX use max int here stdlimit.h?
1834                int min_queue = -1;
1835
1836                /* Loop through check all are alive (have data) and find the smallest */
1837                for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1838                        libtrace_queue_t *v = &libtrace->perpkt_threads[i].deque;
1839                        if (libtrace_deque_get_size(v) != 0) {
1840                                libtrace_result_t r;
1841                                libtrace_deque_peek_front(v, (void *) &r);
1842                                live_count++;
1843                                live[i] = 1;
1844                                key[i] = libtrace_result_get_key(&r);
1845                                if (i==0 || min_key > key[i]) {
1846                                        min_key = key[i];
1847                                        min_queue = i;
1848                                }
1849                        } else {
1850                                live[i] = 0;
1851                        }
1852                }
1853
1854                /* Now remove the smallest and loop - special case if all threads have joined we always flush whats left */
1855                while ((live_count == libtrace->perpkt_thread_count) || (live_count &&
1856                                ((flags & REDUCE_SEQUENTIAL && min_key == libtrace->expected_key) ||
1857                                libtrace->state == STATE_JOINED))) {
1858                        /* Get the minimum queue and then do stuff */
1859                        libtrace_result_t r;
1860
1861                        assert (libtrace_deque_pop_front(&libtrace->perpkt_threads[min_queue].deque, (void *) &r) == 1);
1862                        libtrace_vector_push_back(results, &r);
1863
1864                        // We expect the key we read +1 now
1865                        libtrace->expected_key = key[min_queue] + 1;
1866
1867                        // Now update the one we just removed
1868                        if (libtrace_deque_get_size(&libtrace->perpkt_threads[min_queue].deque) )
1869                        {
1870                                libtrace_deque_peek_front(&libtrace->perpkt_threads[min_queue].deque, (void *) &r);
1871                                key[min_queue] = libtrace_result_get_key(&r);
1872                                if (key[min_queue] <= min_key) {
1873                                        // We are still the smallest, might be out of order though :(
1874                                        min_key = key[min_queue];
1875                                } else {
1876                                        min_key = key[min_queue]; // Update our minimum
1877                                        // Check all find the smallest again - all are alive
1878                                        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1879                                                if (live[i] && min_key > key[i]) {
1880                                                        min_key = key[i];
1881                                                        min_queue = i;
1882                                                }
1883                                        }
1884                                }
1885                        } else {
1886                                live[min_queue] = 0;
1887                                live_count--;
1888                                min_key = UINT64_MAX; // Update our minimum
1889                                // Check all find the smallest again - all are alive
1890                                for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1891                                        // Still not 100% TODO (what if order is wrong or not increasing)
1892                                        if (live[i] && min_key >= key[i]) {
1893                                                min_key = key[i];
1894                                                min_queue = i;
1895                                        }
1896                                }
1897                        }
1898                }
1899        } else { // Queues are not in order - return all results in the queue
1900                for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1901                        libtrace_vector_append(results, &libtrace->perpkt_threads[i].vector);
1902                }
1903                if (flags & REDUCE_SORT) {
1904                        qsort(results->elements, results->size, results->element_size, &compareres);
1905                }
1906        }
1907        return libtrace_vector_get_size(results);
1908}
1909
1910DLLEXPORT uint64_t trace_packet_get_order(libtrace_packet_t * packet) {
1911        return packet->order;
1912}
1913
1914DLLEXPORT uint64_t trace_packet_get_hash(libtrace_packet_t * packet) {
1915        return packet->hash;
1916}
1917
1918DLLEXPORT void trace_packet_set_order(libtrace_packet_t * packet, uint64_t order) {
1919        packet->order = order;
1920}
1921
1922DLLEXPORT void trace_packet_set_hash(libtrace_packet_t * packet, uint64_t hash) {
1923        packet->hash = hash;
1924}
1925
1926DLLEXPORT int trace_finished(libtrace_t * libtrace) {
1927        // TODO I don't like using this so much, we could use state!!!
1928        return !(libtrace->perpkt_thread_states[THREAD_RUNNING] || libtrace->perpkt_thread_states[THREAD_FINISHING]);
1929}
1930
1931DLLEXPORT int trace_parallel_config(libtrace_t *libtrace, trace_parallel_option_t option, void *value)
1932{
1933        UNUSED int ret = -1;
1934        switch (option) {
1935                case TRACE_OPTION_TICK_INTERVAL:
1936                        libtrace->tick_interval = *((int *) value);
1937                        return 1;
1938                case TRACE_OPTION_SET_HASHER:
1939                        return trace_set_hasher(libtrace, (enum hasher_types) *((int *) value), NULL, NULL);
1940                case TRACE_OPTION_SET_PERPKT_BUFFER_SIZE:
1941                        libtrace->perpkt_buffer_size = *((int *) value);
1942                        return 1;
1943                case TRACE_OPTION_SET_PACKET_FREELIST_SIZE:
1944                        libtrace->packet_freelist_size = *((int *) value);
1945                        return 1;
1946                case TRACE_OPTION_SET_PERPKT_THREAD_COUNT:
1947                        libtrace->perpkt_thread_count = *((int *) value);
1948                        return 1;
1949                case TRACE_DROP_OUT_OF_ORDER:
1950                        if (*((int *) value))
1951                                libtrace->reducer_flags |= REDUCE_DROP_OOO;
1952                        else
1953                                libtrace->reducer_flags &= ~REDUCE_DROP_OOO;
1954                        return 1;
1955                case TRACE_OPTION_SEQUENTIAL:
1956                        if (*((int *) value))
1957                                libtrace->reducer_flags |= REDUCE_SEQUENTIAL;
1958                        else
1959                                libtrace->reducer_flags &= ~REDUCE_SEQUENTIAL;
1960                        return 1;
1961                case TRACE_OPTION_ORDERED:
1962                        if (*((int *) value))
1963                                libtrace->reducer_flags |= REDUCE_ORDERED;
1964                        else
1965                                libtrace->reducer_flags &= ~REDUCE_ORDERED;
1966                        return 1;
1967                case TRACE_OPTION_TRACETIME:
1968                        if(*((int *) value))
1969                                libtrace->tracetime = 1;
1970                        else
1971                                libtrace->tracetime = 0;
1972                        return 0;
1973        }
1974        return 0;
1975}
1976
1977DLLEXPORT libtrace_packet_t* trace_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet) {
1978        libtrace_packet_t* result;
1979        if (!libtrace_ringbuffer_try_sread_bl(&libtrace->packet_freelist, (void **) &result))
1980                result = trace_create_packet();
1981        assert(result);
1982        swap_packets(result, packet); // Move the current packet into our copy
1983        return result;
1984}
1985
1986DLLEXPORT void trace_free_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet) {
1987        // Try write back the packet
1988        assert(packet);
1989        // Always release any resources this might be holding such as a slot in a ringbuffer
1990        trace_fin_packet(packet);
1991        if (!libtrace_ringbuffer_try_swrite_bl(&libtrace->packet_freelist, packet)) {
1992                /* We couldn't, oh well lets just destroy it - XXX consider non managed formats i.e. rings buffers loosing packets and jamming up :( */
1993                //assert(1 == 90);
1994                trace_destroy_packet(packet);
1995        }
1996}
1997
1998DLLEXPORT libtrace_info_t *trace_get_information(libtrace_t * libtrace) {
1999        if (libtrace->format)
2000                return &libtrace->format->info;
2001        else
2002                return NULL;
2003}
Note: See TracBrowser for help on using the repository browser.