source: lib/trace_parallel.c @ 3296252

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 3296252 was 3296252, checked in by Richard Sanger <rsangerarj@…>, 7 years ago

Fixes pausing the trace in the edge case that some threads have already ended.
Tidy up the state system, for both threads and the the overall state.
General tidies to the code.

  • Property mode set to 100644
File size: 67.2 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * All rights reserved.
8 *
9 * This code has been developed by the University of Waikato WAND
10 * research group. For further information please see http://www.wand.net.nz/
11 *
12 * libtrace is free software; you can redistribute it and/or modify
13 * it under the terms of the GNU General Public License as published by
14 * the Free Software Foundation; either version 2 of the License, or
15 * (at your option) any later version.
16 *
17 * libtrace is distributed in the hope that it will be useful,
18 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 * GNU General Public License for more details.
21 *
22 * You should have received a copy of the GNU General Public License
23 * along with libtrace; if not, write to the Free Software
24 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25 *
26 * $Id$
27 *
28 */
29
30
31#define _GNU_SOURCE
32#include "common.h"
33#include "config.h"
34#include <assert.h>
35#include <errno.h>
36#include <fcntl.h>
37#include <stdio.h>
38#include <stdlib.h>
39#include <string.h>
40#include <sys/stat.h>
41#include <sys/types.h>
42#ifndef WIN32
43#include <sys/socket.h>
44#endif
45#include <stdarg.h>
46#include <sys/param.h>
47
48#ifdef HAVE_LIMITS_H
49#  include <limits.h>
50#endif
51
52#ifdef HAVE_SYS_LIMITS_H
53#  include <sys/limits.h>
54#endif
55
56#ifdef HAVE_NET_IF_ARP_H
57#  include <net/if_arp.h>
58#endif
59
60#ifdef HAVE_NET_IF_H
61#  include <net/if.h>
62#endif
63
64#ifdef HAVE_NETINET_IN_H
65#  include <netinet/in.h>
66#endif
67
68#ifdef HAVE_NET_ETHERNET_H
69#  include <net/ethernet.h>
70#endif
71
72#ifdef HAVE_NETINET_IF_ETHER_H
73#  include <netinet/if_ether.h>
74#endif
75
76#include <time.h>
77#ifdef WIN32
78#include <sys/timeb.h>
79#endif
80
81#include "libtrace.h"
82#include "libtrace_int.h"
83
84#ifdef HAVE_PCAP_BPF_H
85#  include <pcap-bpf.h>
86#else
87#  ifdef HAVE_NET_BPF_H
88#    include <net/bpf.h>
89#  endif
90#endif
91
92
93#include "libtrace_int.h"
94#include "format_helper.h"
95#include "rt_protocol.h"
96#include "hash_toeplitz.h"
97
98#include <pthread.h>
99#include <signal.h>
100#include <unistd.h>
101
102
103#define VERBOSE_DEBBUGING 0
104
105extern int libtrace_parallel;
106
107struct multithreading_stats {
108        uint64_t full_queue_hits;
109        uint64_t wait_for_fill_complete_hits;
110} contention_stats[1024];
111
112/**
113 * True if the trace has dedicated hasher thread otherwise false,
114 * to be used after the trace is running
115 */
116static inline int trace_has_dedicated_hasher(libtrace_t * libtrace)
117{
118        assert(libtrace->state != STATE_NEW);
119        return libtrace->hasher_thread.type == THREAD_HASHER;
120}
121
122/**
123 * Changes a thread's state and broadcasts the condition variable. This
124 * should always be done when the lock is held.
125 *
126 * Additionally for perpkt threads the state counts are updated.
127 *
128 * @param trace A pointer to the trace
129 * @param t A pointer to the thread to modify
130 * @param new_state The new state of the thread
131 * @param need_lock Set to true if libtrace_lock is not held, otherwise
132 *        false in the case the lock is currently held by this thread.
133 */
134static inline void thread_change_state(libtrace_t *trace, libtrace_thread_t *t,
135        const enum thread_states new_state, const bool need_lock)
136{
137        enum thread_states prev_state;
138        if (need_lock)
139                pthread_mutex_lock(&trace->libtrace_lock);
140        prev_state = t->state;
141        t->state = new_state;
142        if (t->type == THREAD_PERPKT) {
143                --trace->perpkt_thread_states[prev_state];
144                ++trace->perpkt_thread_states[new_state];
145        }
146
147#if VERBOSE_DEBBUGING
148        fprintf(stderr, "Thread %d State changed from %d to %d\n", t->tid,
149                t->state, prev_state);
150#endif
151        if (need_lock)
152                pthread_mutex_unlock(&trace->libtrace_lock);
153        pthread_cond_broadcast(&trace->perpkt_cond);
154}
155
156/**
157 * Changes the overall traces state and signals the condition.
158 *
159 * @param trace A pointer to the trace
160 * @param new_state The new state of the trace
161 * @param need_lock Set to true if libtrace_lock is not held, otherwise
162 *        false in the case the lock is currently held by this thread.
163 */
164static inline void libtrace_change_state(libtrace_t *trace, 
165        const enum trace_state new_state, const bool need_lock)
166{
167        enum trace_state prev_state;
168        if (need_lock)
169                pthread_mutex_lock(&trace->libtrace_lock);
170        prev_state = trace->state;
171        trace->state = new_state;
172#if VERBOSE_DEBBUGING
173        fprintf(stderr, "Trace(%s) state changed from %s to %s\n",
174                trace->uridata, get_trace_state_name(trace->state),
175                get_trace_state_name(prev_state));
176#endif
177        if (need_lock)
178                pthread_mutex_unlock(&trace->libtrace_lock);
179        pthread_cond_broadcast(&trace->perpkt_cond);
180}
181
182/**
183 * @return True if the format supports parallel threads.
184 */
185static inline bool trace_supports_parallel(libtrace_t *trace)
186{
187        assert(trace);
188        assert(trace->format);
189        if (trace->format->pstart_input)
190                return true;
191        else
192                return false;
193}
194
195DLLEXPORT void print_contention_stats(libtrace_t *libtrace) {
196        int i;
197        struct multithreading_stats totals = {0};
198        for (i = 0; i < libtrace->perpkt_thread_count ; i++) {
199                fprintf(stderr, "\nStats for perpkt thread#%d\n", i);
200                fprintf(stderr, "\tfull_queue_hits: %"PRIu64"\n", contention_stats[i].full_queue_hits);
201                totals.full_queue_hits += contention_stats[i].full_queue_hits;
202                fprintf(stderr, "\twait_for_fill_complete_hits: %"PRIu64"\n", contention_stats[i].wait_for_fill_complete_hits);
203                totals.wait_for_fill_complete_hits += contention_stats[i].wait_for_fill_complete_hits;
204        }
205        fprintf(stderr, "\nTotals for perpkt threads\n");
206        fprintf(stderr, "\tfull_queue_hits: %"PRIu64"\n", totals.full_queue_hits);
207        fprintf(stderr, "\twait_for_fill_complete_hits: %"PRIu64"\n", totals.wait_for_fill_complete_hits);
208
209        return;
210}
211
212void libtrace_zero_thread(libtrace_thread_t * t) {
213        t->trace = NULL;
214        t->ret = NULL;
215        t->type = THREAD_EMPTY;
216        libtrace_zero_ringbuffer(&t->rbuffer);
217        libtrace_zero_vector(&t->vector);
218        libtrace_zero_deque(&t->deque);
219        t->recorded_first = false;
220        t->perpkt_num = -1;
221}
222
223// Ints are aligned int is atomic so safe to read and write at same time
224// However write must be locked, read doesn't (We never try read before written to table)
225libtrace_thread_t * get_thread_table(libtrace_t *libtrace) {
226        int i = 0;
227        pthread_t tid = pthread_self();
228
229        for (;i<libtrace->perpkt_thread_count ;++i) {
230                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
231                        return &libtrace->perpkt_threads[i];
232        }
233        return NULL;
234}
235
236int get_thread_table_num(libtrace_t *libtrace) {
237        int i = 0;
238        pthread_t tid = pthread_self();
239        for (;i<libtrace->perpkt_thread_count; ++i) {
240                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
241                        return i;
242        }
243        return -1;
244}
245
246static libtrace_thread_t * get_thread_descriptor(libtrace_t *libtrace) {
247        libtrace_thread_t *ret;
248        if (!(ret = get_thread_table(libtrace))) {
249                pthread_t tid = pthread_self();
250                // Check if we are reducer or something else
251                if (pthread_equal(tid, libtrace->reducer_thread.tid))
252                        ret = &libtrace->reducer_thread;
253                else if (pthread_equal(tid, libtrace->hasher_thread.tid))
254                        ret = &libtrace->hasher_thread;
255                else
256                        ret = NULL;
257        }
258        return ret;
259}
260
261/** Used below in trace_make_results_packets_safe*/
262static void do_copy_result_packet(void *data)
263{
264        libtrace_result_t *res = (libtrace_result_t *)data;
265        if (res->is_packet) {
266                // Duplicate the packet in standard malloc'd memory and free the
267                // original
268                libtrace_packet_t *oldpkt, *dup;
269                oldpkt = (libtrace_packet_t *) res->value;
270                dup = trace_copy_packet(oldpkt);
271                res->value = (void *)dup;
272                trace_destroy_packet(oldpkt);
273                fprintf(stderr, "Made a packet safe!!\n");
274        }
275}
276
277/**
278 * Make a safe replacement copy of any result packets that are owned
279 * by the format in the result queue. Used when pausing traces.
280 */ 
281static void trace_make_results_packets_safe(libtrace_t *trace) {
282        libtrace_thread_t *t = get_thread_descriptor(trace);
283        if (trace->reducer_flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED))
284                libtrace_deque_apply_function(&t->deque, &do_copy_result_packet);
285        else 
286                libtrace_vector_apply_function(&t->vector, &do_copy_result_packet);
287}
288
289/**
290 * Holds threads in a paused state, until released by broadcasting
291 * the condition mutex.
292 */
293static void trace_thread_pause(libtrace_t *trace, libtrace_thread_t *t) {
294        trace_make_results_packets_safe(trace);
295        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
296        thread_change_state(trace, t, THREAD_PAUSED, false);
297        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
298                assert(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock) == 0);
299        }
300        thread_change_state(trace, t, THREAD_RUNNING, false);
301        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
302}
303
304/**
305 * The is the entry point for our packet processing threads.
306 */
307static void* perpkt_threads_entry(void *data) {
308        libtrace_t *trace = (libtrace_t *)data;
309        libtrace_thread_t * t;
310        libtrace_message_t message = {0};
311        libtrace_packet_t *packet = NULL;
312
313
314        // Force this thread to wait until trace_pstart has been completed
315        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
316        t = get_thread_table(trace);
317        assert(t);
318        //printf("Yay Started perpkt thread #%d\n", (int) get_thread_table_num(trace));
319        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
320
321        /* ~~~~~~~~~~~ Setup complete now we loop ~~~~~~~~~~~~~~~ */
322        // Send a message to say we've started
323
324        message.code = MESSAGE_STARTED;
325        message.sender = t;
326
327        // Let the per_packet function know we have started
328        (*trace->per_pkt)(trace, NULL, &message, t);
329
330
331        for (;;) {
332                int psize;
333
334                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
335                        switch (message.code) {
336                                case MESSAGE_DO_PAUSE: // This is internal
337                                        // Send message to say we are pausing, TODO consider sender
338                                        message.code = MESSAGE_PAUSING;
339                                        (*trace->per_pkt)(trace, NULL, &message, t);
340                                        // If a hasher thread is running empty input queues so we don't loose data
341                                        if (trace_has_dedicated_hasher(trace)) {
342                                                fprintf(stderr, "Trace is using a hasher thread emptying queues\n");
343                                                // The hasher has stopped by this point, so the queue shouldn't be filling
344                                                while(!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
345                                                        psize = trace_pread_packet(trace, &packet);
346                                                        if (psize > 0) {
347                                                                packet = (*trace->per_pkt)(trace, packet, NULL, t);
348                                                        } else {
349                                                                fprintf(stderr, "Psize=%d empty=%d this is probably EOF or message waiting, but if this continues we have a bug!!!\n", psize, libtrace_ringbuffer_is_empty(&t->rbuffer));
350                                                        }
351                                                }
352                                        }
353                                        // Send a paused message as a final chance to memory copy any packets
354                                        message.code = MESSAGE_PAUSED;
355                                        (*trace->per_pkt)(trace, NULL, &message, t);
356                                        // Now we do the actual pause, this returns when we are done
357                                        trace_thread_pause(trace, t);
358                                        // Check for new messages as soon as we return
359                                        continue;
360                                case MESSAGE_DO_STOP: // This is internal
361                                        goto stop;
362                        }
363                        (*trace->per_pkt)(trace, NULL, &message, t);
364                        continue;
365                }
366
367                if (trace->perpkt_thread_count == 1) {
368                        if (!packet) {
369                                if (!libtrace_ringbuffer_try_sread_bl(&trace->packet_freelist, (void **) &packet))
370                                        packet = trace_create_packet();
371                        }
372                        assert(packet);
373                        if ((psize = trace_read_packet(trace, packet)) <1) {
374                                break;
375                        }
376                } else {
377                        psize = trace_pread_packet(trace, &packet);
378                }
379
380                if (psize > 0) {
381                        packet = (*trace->per_pkt)(trace, packet, NULL, t);
382                        continue;
383                }
384
385                if (psize == -2)
386                        continue; // We have a message
387
388                if (psize < 1) { // consider sending a message
389                        break;
390                }
391
392        }
393
394
395stop:
396        /* ~~~~~~~~~~~~~~ Trace is finished do tear down ~~~~~~~~~~~~~~~~~~~~~ */
397        // Let the per_packet function know we have stopped
398        message.code = MESSAGE_STOPPED;
399        message.sender = NULL;
400        message.additional.uint64 = 0;
401        (*trace->per_pkt)(trace, NULL, &message, t);
402
403        // Free our last packet
404        if (packet)
405                trace_destroy_packet(packet);
406
407       
408        thread_change_state(trace, t, THREAD_FINISHED, true);
409
410        // Notify only after we've defiantly set the state to finished
411        message.code = MESSAGE_PERPKT_ENDED;
412        message.additional.uint64 = 0;
413        trace_send_message_to_reducer(trace, &message);
414
415        pthread_exit(NULL);
416};
417
418/**
419 * The start point for our single threaded hasher thread, this will read
420 * and hash a packet from a data source and queue it against the correct
421 * core to process it.
422 */
423static void* hasher_start(void *data) {
424        libtrace_t *trace = (libtrace_t *)data;
425        libtrace_thread_t * t;
426        int i;
427        libtrace_packet_t * packet;
428        libtrace_message_t message = {0};
429
430        assert(trace_has_dedicated_hasher(trace));
431        /* Wait until all threads are started and objects are initialised (ring buffers) */
432        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
433        t = &trace->hasher_thread;
434        assert(t->type == THREAD_HASHER && pthread_equal(pthread_self(), t->tid));
435        printf("Hasher Thread started\n");
436        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
437        int pkt_skipped = 0;
438        /* Read all packets in then hash and queue against the correct thread */
439        while (1) {
440                int thread;
441                if (!pkt_skipped && !libtrace_ringbuffer_try_sread_bl(&trace->packet_freelist, (void **) &packet))
442                        packet = trace_create_packet();
443                assert(packet);
444
445                if (libtrace_halt) // Signal to die has been sent - TODO
446                        break;
447
448                // Check for messages that we expect MESSAGE_DO_PAUSE, (internal messages only)
449                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
450                        switch(message.code) {
451                                case MESSAGE_DO_PAUSE:
452                                        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
453                                        thread_change_state(trace, t, THREAD_PAUSED, false);
454                                        pthread_cond_broadcast(&trace->perpkt_cond);
455                                        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
456                                                assert(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock) == 0);
457                                        }
458                                        thread_change_state(trace, t, THREAD_RUNNING, false);
459                                        pthread_cond_broadcast(&trace->perpkt_cond);
460                                        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
461                                        break;
462                                case MESSAGE_DO_STOP:
463                                        // Stop called after pause
464                                        assert(trace->started == false);
465                                        assert(trace->state == STATE_FINSHED);
466                                default:
467                                        fprintf(stderr, "Hasher thread didn't expect message code=%d\n", message.code);
468                        }
469                        pkt_skipped = 1;
470                        continue;
471                }
472
473                if ((packet->error = trace_read_packet(trace, packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
474                        break; /* We are EOF or error'd either way we stop  */
475                }
476
477                /* We are guaranteed to have a hash function i.e. != NULL */
478                trace_packet_set_hash(packet, (*trace->hasher)(packet, trace->hasher_data));
479                thread = trace_packet_get_hash(packet) % trace->perpkt_thread_count;
480                /* Blocking write to the correct queue - I'm the only writer */
481                if (trace->perpkt_threads[thread].state != THREAD_FINISHED) {
482                        libtrace_ringbuffer_write(&trace->perpkt_threads[thread].rbuffer, packet);
483                        pkt_skipped = 0;
484                } else {
485                        pkt_skipped = 1; // Reuse that packet no one read it
486                }
487        }
488
489        /* Broadcast our last failed read to all threads */
490        for (i = 0; i < trace->perpkt_thread_count; i++) {
491                libtrace_packet_t * bcast;
492                printf("Broadcasting error/EOF now the trace is over\n");
493                if (i == trace->perpkt_thread_count - 1) {
494                        bcast = packet;
495                } else {
496                        bcast = trace_create_packet();
497                        bcast->error = packet->error;
498                }
499                assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
500                if (trace->perpkt_threads[i].state != THREAD_FINISHED) {
501                        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
502                        // Unlock early otherwise we could deadlock
503                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, bcast);
504                } else {
505                        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
506                }
507        }
508
509        // We don't need to free the packet
510        thread_change_state(trace, t, THREAD_FINISHED, true);
511
512        // Notify only after we've defiantly set the state to finished
513        message.code = MESSAGE_PERPKT_ENDED;
514        message.additional.uint64 = 0;
515        trace_send_message_to_reducer(trace, &message);
516
517        // TODO remove from TTABLE t sometime
518        pthread_exit(NULL);
519};
520
521/**
522 * Moves src into dest(Complete copy) and copies the memory buffer and
523 * its flags from dest into src ready for reuse without needing extra mallocs.
524 */
525static inline void swap_packets(libtrace_packet_t *dest, libtrace_packet_t *src) {
526        // Save the passed in buffer status
527        assert(dest->trace == NULL); // Must be a empty packet
528        void * temp_buf = dest->buffer;
529        buf_control_t temp_buf_control = dest->buf_control;
530        // Completely copy StoredPacket into packet
531        memcpy(dest, src, sizeof(libtrace_packet_t));
532        // Set the buffer settings on the returned packet
533        src->buffer = temp_buf;
534        src->buf_control = temp_buf_control;
535        src->trace = NULL;
536}
537
538/* Our simplest case when a thread becomes ready it can obtain an exclusive
539 * lock to read a packet from the underlying trace.
540 */
541inline static int trace_pread_packet_first_in_first_served(libtrace_t *libtrace, libtrace_packet_t **packet)
542{
543        // We need this to fill the 'first' packet table
544        libtrace_thread_t *t = get_thread_table(libtrace);
545        if (!*packet) {
546                if (!libtrace_ringbuffer_try_sread_bl(&libtrace->packet_freelist, (void **) packet))
547                        *packet = trace_create_packet();
548        }
549        assert(*packet);
550        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
551        /* Read a packet */
552        (*packet)->error = trace_read_packet(libtrace, *packet);
553        // Doing this inside the lock ensures the first packet is always
554        // recorded first
555        if ((*packet)->error > 0)
556                store_first_packet(libtrace, *packet, t);
557
558        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
559        return (*packet)->error;
560}
561
562/**
563 * For the case that we have a dedicated hasher thread
564 * 1. We read a packet from our buffer
565 * 2. Move that into the packet provided (packet)
566 */
567inline static int trace_pread_packet_hasher_thread(libtrace_t *libtrace, libtrace_packet_t **packet)
568{
569        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
570        libtrace_thread_t* t = &libtrace->perpkt_threads[this_thread];
571
572        if (*packet) // Recycle the old get the new
573                if (!libtrace_ringbuffer_try_swrite_bl(&libtrace->packet_freelist, (void *) *packet))
574                        trace_destroy_packet(*packet);
575        *packet = libtrace_ringbuffer_read(&t->rbuffer);
576
577        if (*packet) {
578                return (*packet)->error;
579        } else {
580                // This is how we do a notify, we send a message before hand to note that the trace is over etc.
581                // And this will notify the perpkt thread to read that message, this is easiest
582                // since cases like pause can also be dealt with this way without actually
583                // having to be the end of the stream.
584                fprintf(stderr, "Got a NULL packet from hasher therefore message waiting\n");
585                return -2;
586        }
587}
588
589/**
590 * Tries to read from our queue and returns 1 if a packet was retrieved
591 */
592static inline int try_waiting_queue(libtrace_t *libtrace, libtrace_thread_t * t, libtrace_packet_t **packet, int * ret)
593{
594        libtrace_packet_t* retrived_packet;
595
596        /* Lets see if we have one waiting */
597        if (libtrace_ringbuffer_try_read(&t->rbuffer, (void **) &retrived_packet)) {
598                /* Copy paste from trace_pread_packet_hasher_thread() except that we try read (non-blocking) */
599                assert(retrived_packet);
600
601                if (*packet) // Recycle the old get the new
602                        if (!libtrace_ringbuffer_try_swrite_bl(&libtrace->packet_freelist, (void *) *packet))
603                                trace_destroy_packet(*packet);
604                *packet = retrived_packet;
605                *ret = (*packet)->error;
606                return 1;
607        }
608        return 0;
609}
610
611/**
612 * Allows us to ensure all threads are finished writing to our threads ring_buffer
613 * before returning EOF/error.
614 */
615inline static int trace_handle_finishing_perpkt(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t)
616{
617        /* We are waiting for the condition that another thread ends to check
618         * our queue for new data, once all threads end we can go to finished */
619        bool complete = false;
620        int ret;
621
622        do {
623                // Wait for a thread to end
624                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
625
626                // Check before
627                if (libtrace->perpkt_thread_states[THREAD_FINISHING] == libtrace->perpkt_thread_count) {
628                        complete = true;
629                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
630                        continue;
631                }
632
633                assert(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock) == 0);
634
635                // Check after
636                if (libtrace->perpkt_thread_states[THREAD_FINISHING] == libtrace->perpkt_thread_count) {
637                        complete = true;
638                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
639                        continue;
640                }
641
642                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
643
644                // Always trying to keep our buffer empty for the unlikely case more threads than buffer space want to write into our queue
645                if(try_waiting_queue(libtrace, t, packet, &ret))
646                        return ret;
647        } while (!complete);
648
649        // We can only end up here once all threads complete
650        try_waiting_queue(libtrace, t, packet, &ret);
651
652        return ret;
653        // TODO rethink this logic fix bug here
654}
655
656/**
657 * Expects the libtrace_lock to not be held
658 */
659inline static int trace_finish_perpkt(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t)
660{
661        thread_change_state(libtrace, t, THREAD_FINISHING, true);
662        return trace_handle_finishing_perpkt(libtrace, packet, t);
663}
664
665/**
666 * This case is much like the dedicated hasher, except that we will become
667 * hasher if we don't have a a packet waiting.
668 *
669 * Note: This is only every used if we have are doing hashing.
670 *
671 * TODO: Can block on zero copy formats such as ring: and dpdk: if the
672 * queue sizes in total are larger than the ring size.
673 *
674 * 1. We read a packet from our buffer
675 * 2. Move that into the packet provided (packet)
676 */
677inline static int trace_pread_packet_hash_locked(libtrace_t *libtrace, libtrace_packet_t **packet)
678{
679        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
680        libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
681        int thread, ret/*, psize*/;
682
683        while (1) {
684                if(try_waiting_queue(libtrace, t, packet, &ret))
685                        return ret;
686                // Can still block here if another thread is writing to a full queue
687                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
688
689                // Its impossible for our own queue to overfill, because no one can write
690                // when we are in the lock
691                if(try_waiting_queue(libtrace, t, packet, &ret)) {
692                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
693                        return ret;
694                }
695
696                // Another thread cannot write a packet because a queue has filled up. Is it ours?
697                if (libtrace->perpkt_queue_full) {
698                        contention_stats[this_thread].wait_for_fill_complete_hits++;
699                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
700                        continue;
701                }
702
703                if (!*packet) {
704                        if (!libtrace_ringbuffer_try_sread_bl(&libtrace->packet_freelist, (void **) packet))
705                                *packet = trace_create_packet();
706                }
707                assert(*packet);
708
709                // If we fail here we can guarantee that our queue is empty (and no new data will be added because we hold the lock)
710                if (libtrace_halt || ((*packet)->error = trace_read_packet(libtrace, *packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
711                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
712                        if (libtrace_halt)
713                                return 0;
714                        else
715                                return (*packet)->error;
716                }
717
718                trace_packet_set_hash(*packet, (*libtrace->hasher)(*packet, libtrace->hasher_data));
719                thread = trace_packet_get_hash(*packet) % libtrace->perpkt_thread_count;
720                if (thread == this_thread) {
721                        // If it's this thread we must be in order because we checked the buffer once we got the lock
722                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
723                        return (*packet)->error;
724                }
725
726                if (libtrace->perpkt_threads[thread].state != THREAD_FINISHED) {
727                        while (!libtrace_ringbuffer_try_swrite_bl(&libtrace->perpkt_threads[thread].rbuffer, *packet)) {
728                                libtrace->perpkt_queue_full = true;
729                                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
730                                contention_stats[this_thread].full_queue_hits++;
731                                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
732                        }
733                        *packet = NULL;
734                        libtrace->perpkt_queue_full = false;
735                } else {
736                        /* We can get here if the user closes the thread before natural completion/or error */
737                        assert (!"packet_hash_locked() The user terminated the trace in a abnormal manner");
738                }
739                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
740        }
741}
742
743/**
744 * This case is much like the dedicated hasher, except that we will become
745 * hasher if we don't have a packet waiting.
746 *
747 * TODO: You can lose the tail of a trace if the final thread
748 * fills its own queue and therefore breaks early and doesn't empty the sliding window.
749 *
750 * TODO: Can block on zero copy formats such as ring: and dpdk: if the
751 * queue sizes in total are larger than the ring size.
752 *
753 * 1. We read a packet from our buffer
754 * 2. Move that into the packet provided (packet)
755 */
756inline static int trace_pread_packet_sliding_window(libtrace_t *libtrace, libtrace_packet_t **packet)
757{
758        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
759        libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
760        int ret, i, thread/*, psize*/;
761
762        if (t->state == THREAD_FINISHING)
763                return trace_handle_finishing_perpkt(libtrace, packet, t);
764
765        while (1) {
766                // Check if we have packets ready
767                if(try_waiting_queue(libtrace, t, packet, &ret))
768                        return ret;
769
770                // We limit the number of packets we get to the size of the sliding window
771                // such that it is impossible for any given thread to fail to store a packet
772                assert(sem_wait(&libtrace->sem) == 0);
773                /*~~~~Single threaded read of a packet~~~~*/
774                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
775
776                /* Re-check our queue things we might have data waiting */
777                if(try_waiting_queue(libtrace, t, packet, &ret)) {
778                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
779                        assert(sem_post(&libtrace->sem) == 0);
780                        return ret;
781                }
782
783                // TODO put on *proper* condition variable
784                if (libtrace->perpkt_queue_full) {
785                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
786                        assert(sem_post(&libtrace->sem) == 0);
787                        contention_stats[this_thread].wait_for_fill_complete_hits++;
788                        continue;
789                }
790
791                if (!*packet) {
792                        if (!libtrace_ringbuffer_try_sread_bl(&libtrace->packet_freelist, (void **) packet))
793                                *packet = trace_create_packet();
794                }
795                assert(*packet);
796
797                if (libtrace_halt || ((*packet)->error = trace_read_packet(libtrace, *packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
798                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
799                        assert(sem_post(&libtrace->sem) == 0);
800                        // Finish this thread ensuring that any data written later by another thread is retrieved also
801                        if (libtrace_halt)
802                                return 0;
803                        else
804                                return trace_finish_perpkt(libtrace, packet, t);
805                }
806                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
807
808                /* ~~~~Multiple threads can run the hasher~~~~ */
809                trace_packet_set_hash(*packet, (*libtrace->hasher)(*packet, libtrace->hasher_data));
810
811                /* Yes this is correct opposite read lock for a write operation */
812                assert(pthread_rwlock_rdlock(&libtrace->window_lock) == 0);
813                if (!libtrace_slidingwindow_try_write(&libtrace->sliding_window, trace_packet_get_order(*packet), *packet))
814                        assert(!"Semaphore should stop us from ever overfilling the sliding window");
815                assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
816                *packet = NULL;
817
818                // Always try read any data from the sliding window
819                while (libtrace_slidingwindow_read_ready(&libtrace->sliding_window)) {
820                        assert(pthread_rwlock_wrlock(&libtrace->window_lock) == 0);
821                        if (libtrace->perpkt_queue_full) {
822                                // I might be the holdup in which case if I can read my queue I should do that and return
823                                if(try_waiting_queue(libtrace, t, packet, &ret)) {
824                                        assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
825                                        return ret;
826                                }
827                                assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
828                                continue;
829                        }
830                        // Read greedily as many as we can
831                        while (libtrace_slidingwindow_try_read(&libtrace->sliding_window, (void **) packet, NULL)) {
832                                thread = trace_packet_get_hash(*packet) % libtrace->perpkt_thread_count;
833                                if (libtrace->perpkt_threads[thread].state != THREAD_FINISHED) {
834                                        while (!libtrace_ringbuffer_try_swrite_bl(&libtrace->perpkt_threads[thread].rbuffer, *packet)) {
835                                                if (this_thread == thread)
836                                                {
837                                                        // TODO think about this case more because we have to stop early if this were to happen on the last read
838                                                        // before EOF/error we might not have emptied the sliding window
839                                                        printf("!~!~!~!~!~!~In this Code~!~!~!~!\n");
840                                                        // Its our queue we must have a packet to read out
841                                                        if(try_waiting_queue(libtrace, t, packet, &ret)) {
842                                                                // We must be able to write this now 100% without fail
843                                                                libtrace_ringbuffer_write(&libtrace->perpkt_threads[thread].rbuffer, *packet);
844                                                                assert(sem_post(&libtrace->sem) == 0);
845                                                                assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
846                                                                return ret;
847                                                        } else {
848                                                                assert(!"Our queue is full but I cannot read from it??");
849                                                        }
850                                                }
851                                                // Not us we have to give the other threads a chance to write there packets then
852                                                libtrace->perpkt_queue_full = true;
853                                                assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
854                                                for (i = 0; i < libtrace->perpkt_thread_count-1; i++) // Release all other threads to read there packets
855                                                        assert(sem_post(&libtrace->sem) == 0);
856
857                                                contention_stats[this_thread].full_queue_hits++;
858                                                assert(pthread_rwlock_wrlock(&libtrace->window_lock) == 0);
859                                                // Grab these back
860                                                for (i = 0; i < libtrace->perpkt_thread_count-1; i++) // Release all other threads to read there packets
861                                                        assert(sem_wait(&libtrace->sem) == 0);
862                                                libtrace->perpkt_queue_full = false;
863                                        }
864                                        assert(sem_post(&libtrace->sem) == 0);
865                                        *packet = NULL;
866                                } else {
867                                        // Cannot write to a queue if no ones waiting (I think this is unreachable)
868                                        // in the general case (unless the user ends early without proper clean up).
869                                        assert (!"unreachable code??");
870                                }
871                        }
872                        assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
873                }
874                // Now we go back to checking our queue anyways
875        }
876}
877
878
879/**
880 * For the first packet of each queue we keep a copy and note the system
881 * time it was received at.
882 *
883 * This is used for finding the first packet when playing back a trace
884 * in trace time. And can be used by real time applications to print
885 * results out every XXX seconds.
886 */
887void store_first_packet(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t)
888{
889        if (!t->recorded_first) {
890                struct timeval tv;
891                libtrace_packet_t * dup;
892                // For what it's worth we can call these outside of the lock
893                gettimeofday(&tv, NULL);
894                dup = trace_copy_packet(packet);
895                assert(pthread_spin_lock(&libtrace->first_packets.lock) == 0);
896                libtrace->first_packets.packets[t->perpkt_num].packet = dup;
897                //printf("Stored first packet time=%f\n", trace_get_seconds(dup));
898                memcpy(&libtrace->first_packets.packets[t->perpkt_num].tv, &tv, sizeof(tv));
899                // Now update the first
900                libtrace->first_packets.count++;
901                if (libtrace->first_packets.count == 1) {
902                        // We the first entry hence also the first known packet
903                        libtrace->first_packets.first = t->perpkt_num;
904                } else {
905                        // Check if we are newer than the previous 'first' packet
906                        size_t first = libtrace->first_packets.first;
907                        if (trace_get_seconds(dup) <
908                                trace_get_seconds(libtrace->first_packets.packets[first].packet))
909                                libtrace->first_packets.first = t->perpkt_num;
910                }
911                assert(pthread_spin_unlock(&libtrace->first_packets.lock) == 0);
912                libtrace_message_t mesg = {0};
913                mesg.code = MESSAGE_FIRST_PACKET;
914                trace_send_message_to_reducer(libtrace, &mesg);
915                t->recorded_first = true;
916        }
917}
918
919/**
920 * Returns 1 if it's certain that the first packet is truly the first packet
921 * rather than a best guess based upon threads that have published so far.
922 * Otherwise 0 is returned.
923 * It's recommended that this result is stored rather than calling this
924 * function again.
925 */
926DLLEXPORT int retrive_first_packet(libtrace_t *libtrace, libtrace_packet_t **packet, struct timeval **tv)
927{
928        int ret = 0;
929        assert(pthread_spin_lock(&libtrace->first_packets.lock) == 0);
930        if (libtrace->first_packets.count) {
931                *packet = libtrace->first_packets.packets[libtrace->first_packets.first].packet;
932                *tv = &libtrace->first_packets.packets[libtrace->first_packets.first].tv;
933                if (libtrace->first_packets.count == (size_t) libtrace->perpkt_thread_count) {
934                        ret = 1;
935                } else {
936                        struct timeval curr_tv;
937                        // If a second has passed since the first entry we will assume this is the very first packet
938                        gettimeofday(&curr_tv, NULL);
939                        if (curr_tv.tv_sec > (*tv)->tv_sec) {
940                                if(curr_tv.tv_usec > (*tv)->tv_usec || curr_tv.tv_sec - (*tv)->tv_sec > 1) {
941                                        ret = 1;
942                                }
943                        }
944                }
945        } else {
946                *packet = NULL;
947                *tv = NULL;
948        }
949        assert(pthread_spin_unlock(&libtrace->first_packets.lock) == 0);
950        return ret;
951}
952
953
954DLLEXPORT uint64_t tv_to_usec(struct timeval *tv)
955{
956        return (uint64_t) tv->tv_sec*1000000ull + (uint64_t) tv->tv_usec;
957}
958
959inline static struct timeval usec_to_tv(uint64_t usec)
960{
961        struct timeval tv;
962        tv.tv_sec = usec / 1000000;
963        tv.tv_usec = usec % 1000000;
964        return tv;
965}
966
967/** Similar to delay_tracetime but send messages to all threads periodically */
968static void* keepalive_entry(void *data) {
969        struct timeval prev, next;
970        libtrace_message_t message = {0};
971        libtrace_t *trace = (libtrace_t *)data;
972        uint64_t next_release;
973        fprintf(stderr, "keepalive thread is starting\n");
974
975        gettimeofday(&prev, NULL);
976        message.code = MESSAGE_TICK;
977        while (trace->state != STATE_FINSHED) {
978                fd_set rfds;
979                next_release = tv_to_usec(&prev) + (trace->tick_interval * 1000);
980                gettimeofday(&next, NULL);
981                if (next_release > tv_to_usec(&next)) {
982                        next = usec_to_tv(next_release - tv_to_usec(&next));
983                        // Wait for timeout or a message
984                        FD_ZERO(&rfds);
985                FD_SET(libtrace_message_queue_get_fd(&trace->keepalive_thread.messages), &rfds);
986                        if (select(libtrace_message_queue_get_fd(&trace->keepalive_thread.messages)+1, &rfds, NULL, NULL, &next) == 1) {
987                                libtrace_message_t msg;
988                                libtrace_message_queue_get(&trace->keepalive_thread.messages, &msg);
989                                assert(msg.code == MESSAGE_DO_STOP);
990                                goto done;
991                        }
992                }
993                prev = usec_to_tv(next_release);
994                if (trace->state == STATE_RUNNING) {
995                        message.additional.uint64 = tv_to_usec(&prev);
996                        trace_send_message_to_perpkts(trace, &message);
997                }
998        }
999done:
1000
1001        thread_change_state(trace, &trace->keepalive_thread, THREAD_FINISHED, true);
1002        return NULL;
1003}
1004
1005/**
1006 * Delays a packets playback so the playback will be in trace time
1007 */
1008static inline void delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t) {
1009        struct timeval curr_tv, pkt_tv;
1010        uint64_t next_release = t->tracetime_offset_usec; // Time at which to release the packet
1011        uint64_t curr_usec;
1012        /* Tracetime we might delay releasing this packet */
1013        if (!t->tracetime_offset_usec) {
1014                libtrace_packet_t * first_pkt;
1015                struct timeval *sys_tv;
1016                int64_t initial_offset;
1017                int stable = retrive_first_packet(libtrace, &first_pkt, &sys_tv);
1018                assert(first_pkt);
1019                pkt_tv = trace_get_timeval(first_pkt);
1020                initial_offset = (int64_t)tv_to_usec(sys_tv) - (int64_t)tv_to_usec(&pkt_tv);
1021                if (stable)
1022                        // 0->1 because 0 is used to mean unset
1023                        t->tracetime_offset_usec = initial_offset ? initial_offset: 1;
1024                next_release = initial_offset;
1025        }
1026        /* next_release == offset */
1027        pkt_tv = trace_get_timeval(packet);
1028        next_release += tv_to_usec(&pkt_tv);
1029        gettimeofday(&curr_tv, NULL);
1030        curr_usec = tv_to_usec(&curr_tv);
1031        if (next_release > curr_usec) {
1032                // We need to wait
1033                struct timeval delay_tv = usec_to_tv(next_release-curr_usec);
1034                //printf("WAITING for %d.%d next=%"PRIu64" curr=%"PRIu64" seconds packettime %f\n", delay_tv.tv_sec, delay_tv.tv_usec, next_release, curr_usec, trace_get_seconds(packet));
1035                select(0, NULL, NULL, NULL, &delay_tv);
1036        }
1037}
1038
1039/* Read one packet from the trace into a buffer. Note that this function will
1040 * block until a packet is read (or EOF is reached).
1041 *
1042 * @param libtrace      the libtrace opaque pointer
1043 * @param packet        the packet opaque pointer
1044 * @returns 0 on EOF, negative value on error
1045 *
1046 * Note this is identical to read_packet but calls pread_packet instead of
1047 * read packet in the format.
1048 *
1049 */
1050static inline int trace_pread_packet_wrapper(libtrace_t *libtrace, libtrace_packet_t *packet) {
1051
1052        assert(libtrace && "You called trace_read_packet() with a NULL libtrace parameter!\n");
1053        if (trace_is_err(libtrace))
1054                return -1;
1055        if (!libtrace->started) {
1056                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,"You must call libtrace_start() before trace_read_packet()\n");
1057                return -1;
1058        }
1059        if (!(packet->buf_control==TRACE_CTRL_PACKET || packet->buf_control==TRACE_CTRL_EXTERNAL)) {
1060                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,"Packet passed to trace_read_packet() is invalid\n");
1061                return -1;
1062        }
1063        assert(packet);
1064
1065        if (libtrace->format->read_packet) {
1066                do {
1067                        size_t ret;
1068                        /* Finalise the packet, freeing any resources the format module
1069                         * may have allocated it and zeroing all data associated with it.
1070                         */
1071                        trace_fin_packet(packet);
1072                        /* Store the trace we are reading from into the packet opaque
1073                         * structure */
1074                        packet->trace = libtrace;
1075                        ret=libtrace->format->pread_packet(libtrace,packet);
1076                        if (ret==(size_t)-1 || ret==(size_t)-2 || ret==0) {
1077                                return ret;
1078                        }
1079                        if (libtrace->filter) {
1080                                /* If the filter doesn't match, read another
1081                                 * packet
1082                                 */
1083                                if (!trace_apply_filter(libtrace->filter,packet)){
1084                                        ++libtrace->filtered_packets;
1085                                        continue;
1086                                }
1087                        }
1088                        if (libtrace->snaplen>0) {
1089                                /* Snap the packet */
1090                                trace_set_capture_length(packet,
1091                                                libtrace->snaplen);
1092                        }
1093                        trace_packet_set_order(packet, libtrace->accepted_packets);
1094                        ++libtrace->accepted_packets;
1095                        return ret;
1096                } while(1);
1097        }
1098        trace_set_err(libtrace,TRACE_ERR_UNSUPPORTED,"This format does not support reading packets\n");
1099        return ~0U;
1100}
1101
1102/**
1103 * Read a packet from the parallel trace
1104 */
1105DLLEXPORT int trace_pread_packet(libtrace_t *libtrace, libtrace_packet_t **packet)
1106{
1107        int ret;
1108        libtrace_thread_t *t = get_thread_table(libtrace);
1109
1110        // Cleanup the packet passed back
1111        if (*packet)
1112                trace_fin_packet(*packet);
1113
1114        if (trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
1115                if (!*packet)
1116                        *packet = trace_create_packet();
1117                ret = trace_pread_packet_wrapper(libtrace, *packet);
1118        } else if (trace_has_dedicated_hasher(libtrace)) {
1119                ret = trace_pread_packet_hasher_thread(libtrace, packet);
1120        } else if (!trace_has_dedicated_hasher(libtrace)) {
1121                /* We don't care about which core a packet goes to */
1122                ret = trace_pread_packet_first_in_first_served(libtrace, packet);
1123        } /* else {
1124                ret = trace_pread_packet_hash_locked(libtrace, packet);
1125        }*/
1126
1127        // Formats can also optionally do this internally to ensure the first
1128        // packet is always reported correctly
1129        if (ret > 0) {
1130                store_first_packet(libtrace, *packet, t);
1131                if (libtrace->tracetime)
1132                        delay_tracetime(libtrace, *packet, t);
1133        }
1134
1135        return ret;
1136}
1137
1138/* Starts perpkt threads
1139 * @return threads_started
1140 */
1141static inline int trace_start_perpkt_threads (libtrace_t *libtrace) {
1142        int i;
1143
1144        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1145                libtrace_thread_t *t = &libtrace->perpkt_threads[i];
1146                assert(pthread_create(&t->tid, NULL, perpkt_threads_entry, (void *) libtrace) == 0);
1147        }
1148        return libtrace->perpkt_thread_count;
1149}
1150
1151/* Start an input trace in a parallel fashion, or restart a paused trace.
1152 *
1153 * NOTE: libtrace lock is held for the majority of this function
1154 *
1155 * @param libtrace the input trace to start
1156 * @param global_blob some global data you can share with the new perpkt threads
1157 * @returns 0 on success
1158 */
1159DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob, fn_per_pkt per_pkt, fn_reducer reducer)
1160{
1161        int i;
1162        sigset_t sig_before, sig_block_all;
1163        assert(libtrace);
1164        if (trace_is_err(libtrace)) {
1165                return -1;
1166        }
1167        // NOTE: Until the trace is started we wont have a libtrace_lock initialised
1168        if (libtrace->state != STATE_NEW) {
1169                int err = 0;
1170                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1171                if (libtrace->state != STATE_PAUSED) {
1172                        trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
1173                                "The trace(%s) has already been started and is not paused!!", libtrace->uridata);
1174                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1175                        return -1;
1176                }
1177               
1178                // Update the per_pkt function, or reuse the old one
1179                if (per_pkt)
1180                        libtrace->per_pkt = per_pkt;
1181
1182                assert(libtrace_parallel);
1183                assert(!libtrace->perpkt_thread_states[THREAD_RUNNING]);
1184                assert(libtrace->per_pkt);
1185               
1186                if (libtrace->perpkt_thread_count > 1 && trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
1187                        fprintf(stderr, "Restarting trace pstart_input()\n");
1188                        err = libtrace->format->pstart_input(libtrace);
1189                } else {
1190                        if (libtrace->format->start_input) {
1191                                fprintf(stderr, "Restarting trace start_input()\n");
1192                                err = libtrace->format->start_input(libtrace);
1193                        }
1194                }
1195               
1196                if (err == 0) {
1197                        libtrace->started = true;
1198                        libtrace_change_state(libtrace, STATE_RUNNING, false);
1199                }
1200                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1201                return err;
1202        }
1203
1204        assert(libtrace->state == STATE_NEW);
1205        libtrace_parallel = 1;
1206
1207        // Store the user defined things against the trace
1208        libtrace->global_blob = global_blob;
1209        libtrace->per_pkt = per_pkt;
1210        libtrace->reducer = reducer;
1211
1212        assert(pthread_mutex_init(&libtrace->libtrace_lock, NULL) == 0);
1213        assert(pthread_cond_init(&libtrace->perpkt_cond, NULL) == 0);
1214        assert(pthread_rwlock_init(&libtrace->window_lock, NULL) == 0);
1215        // Grab the lock
1216        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1217
1218        // Set default buffer sizes
1219        if (libtrace->perpkt_buffer_size <= 0)
1220                libtrace->perpkt_buffer_size = 1000;
1221
1222        if (libtrace->perpkt_thread_count <= 0) {
1223                // TODO add BSD support
1224                libtrace->perpkt_thread_count = sysconf(_SC_NPROCESSORS_ONLN);
1225                if (libtrace->perpkt_thread_count <= 0)
1226                        // Lets just use one
1227                        libtrace->perpkt_thread_count = 1;
1228        }
1229
1230        if(libtrace->packet_freelist_size <= 0)
1231                libtrace->packet_freelist_size = (libtrace->perpkt_buffer_size + 1) * libtrace->perpkt_thread_count;
1232
1233        if(libtrace->packet_freelist_size <
1234                (libtrace->perpkt_buffer_size + 1) * libtrace->perpkt_thread_count)
1235                fprintf(stderr, "WARNING deadlocks may occur and extra memory allocating buffer sizes (packet_freelist_size) mismatched\n");
1236
1237        libtrace->started = true; // Before we start the threads otherwise we could have issues
1238        libtrace_change_state(libtrace, STATE_RUNNING, false);
1239        /* Disable signals - Pthread signal handling */
1240
1241        sigemptyset(&sig_block_all);
1242
1243        assert(pthread_sigmask(SIG_SETMASK, &sig_block_all, &sig_before) == 0);
1244
1245        // If we are using a hasher start it
1246        // If single threaded we don't need a hasher
1247        if (libtrace->perpkt_thread_count > 1 && libtrace->hasher && libtrace->hasher_type != HASHER_HARDWARE) {
1248                libtrace_thread_t *t = &libtrace->hasher_thread;
1249                t->trace = libtrace;
1250                t->ret = NULL;
1251                t->type = THREAD_HASHER;
1252                t->state = THREAD_RUNNING;
1253                libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
1254                assert(pthread_create(&t->tid, NULL, hasher_start, (void *) libtrace) == 0);
1255        } else {
1256                libtrace->hasher_thread.type = THREAD_EMPTY;
1257        }
1258        libtrace_ringbuffer_init(&libtrace->packet_freelist, libtrace->packet_freelist_size, LIBTRACE_RINGBUFFER_BLOCKING);
1259        //libtrace_slidingwindow_init(&libtrace->sliding_window, libtrace->packet_freelist_size, 0);
1260        assert(sem_init(&libtrace->sem, 0, libtrace->packet_freelist_size) == 0);
1261        // This will be applied to every new thread that starts, i.e. they will block all signals
1262        // Lets start a fixed number of reading threads
1263
1264        // For now we never have a dedicated thread for the reducer
1265        // i.e. This main thread is used as the reducer
1266        libtrace->reducer_thread.tid = pthread_self();
1267        libtrace->reducer_thread.type = THREAD_REDUCER;
1268        libtrace->reducer_thread.state = THREAD_RUNNING;
1269        libtrace_message_queue_init(&libtrace->reducer_thread.messages, sizeof(libtrace_message_t));
1270
1271        /* Ready some storages */
1272        libtrace->first_packets.first = 0;
1273        libtrace->first_packets.count = 0;
1274        assert(pthread_spin_init(&libtrace->first_packets.lock, 0) == 0);
1275        libtrace->first_packets.packets = calloc(libtrace->perpkt_thread_count, sizeof(struct  __packet_storage_magic_type));
1276
1277
1278        /* Ready all of our perpkt threads - they are started later */
1279        libtrace->perpkt_threads = calloc(sizeof(libtrace_thread_t), libtrace->perpkt_thread_count);
1280        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1281                libtrace_thread_t *t = &libtrace->perpkt_threads[i];
1282                t->trace = libtrace;
1283                t->ret = NULL;
1284                t->type = THREAD_PERPKT;
1285                t->state = THREAD_RUNNING;
1286                t->user_data = NULL;
1287                // t->tid DONE on create
1288                t->perpkt_num = i;
1289                if (libtrace->hasher)
1290                        libtrace_ringbuffer_init(&t->rbuffer, libtrace->perpkt_buffer_size, LIBTRACE_RINGBUFFER_BLOCKING);
1291                // Depending on the mode vector or deque might be chosen
1292                libtrace_vector_init(&t->vector, sizeof(libtrace_result_t));
1293                libtrace_deque_init(&t->deque, sizeof(libtrace_result_t));
1294                libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
1295                t->tmp_key = 0;
1296                t->tmp_data = NULL;
1297                t->recorded_first = false;
1298                assert(pthread_spin_init(&t->tmp_spinlock, 0) == 0);
1299                t->tracetime_offset_usec = 0;;
1300        }
1301
1302        int threads_started = 0;
1303        /* Setup the trace and start our threads */
1304        if (libtrace->perpkt_thread_count > 1 && trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
1305                printf("This format has direct support for p's\n");
1306                threads_started = libtrace->format->pstart_input(libtrace);
1307        } else {
1308                if (libtrace->format->start_input) {
1309                        threads_started=libtrace->format->start_input(libtrace);
1310                }
1311        }
1312        if (threads_started == 0)
1313                threads_started = trace_start_perpkt_threads(libtrace);
1314
1315        if (libtrace->tick_interval > 0) {
1316                libtrace->keepalive_thread.type = THREAD_KEEPALIVE;
1317                libtrace->keepalive_thread.state = THREAD_RUNNING;
1318                libtrace_message_queue_init(&libtrace->keepalive_thread.messages, sizeof(libtrace_message_t));
1319                assert(pthread_create(&libtrace->keepalive_thread.tid, NULL, keepalive_entry, (void *) libtrace) == 0);
1320        }
1321
1322        for (i = 0; i < THREAD_STATE_MAX; ++i) {
1323                libtrace->perpkt_thread_states[i] = 0;
1324        }
1325        libtrace->perpkt_thread_states[THREAD_RUNNING] = threads_started;
1326
1327        // Revert back - Allow signals again
1328        assert(pthread_sigmask(SIG_SETMASK, &sig_before, NULL) == 0);
1329        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1330
1331        if (threads_started < 0)
1332                // Error
1333                return threads_started;
1334
1335        // TODO fix these leaks etc
1336        if (libtrace->perpkt_thread_count != threads_started)
1337                fprintf(stderr, "Warning started threads not equal requested s=%d r=%d", threads_started, libtrace->perpkt_thread_count);
1338
1339
1340        return 0;
1341}
1342
1343/**
1344 * Pauses a trace, this should only be called by the main thread
1345 * 1. Set started = false
1346 * 2. All perpkt threads are paused waiting on a condition var
1347 * 3. Then call ppause on the underlying format if found
1348 * 4. The traces state is paused
1349 *
1350 * Once done you should be able to modify the trace setup and call pstart again
1351 * TODO handle changing thread numbers
1352 */
1353DLLEXPORT int trace_ppause(libtrace_t *libtrace)
1354{
1355        libtrace_thread_t *t;
1356        int i;
1357        assert(libtrace);
1358       
1359        t = get_thread_table(libtrace);
1360        // Check state from within the lock if we are going to change it
1361        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1362        if (!libtrace->started || libtrace->state != STATE_RUNNING) {
1363                fprintf(stderr, "pause failed started=%d state=%s (%d)\n", libtrace->started, get_trace_state_name(libtrace->state), libtrace->state);
1364                trace_set_err(libtrace,TRACE_ERR_BAD_STATE, "You must call trace_start() before calling trace_ppause()");
1365                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1366                return -1;
1367        }
1368
1369        libtrace_change_state(libtrace, STATE_PAUSING, false);
1370        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1371
1372        // Special case handle the hasher thread case
1373        if (trace_has_dedicated_hasher(libtrace)) {
1374                fprintf(stderr, "Hasher thread running we deal with this special!\n");
1375                libtrace_message_t message = {0};
1376                message.code = MESSAGE_DO_PAUSE;
1377                trace_send_message_to_thread(libtrace, &libtrace->hasher_thread, &message);
1378                // Wait for it to pause
1379                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1380                while (libtrace->hasher_thread.state == THREAD_RUNNING) {
1381                        assert(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock) == 0);
1382                }
1383                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1384        }
1385
1386        fprintf(stderr, "Sending messages \n");
1387        // Stop threads, skip this one if it's a perpkt
1388        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1389                if (&libtrace->perpkt_threads[i] != t) {
1390                        libtrace_message_t message = {0};
1391                        message.code = MESSAGE_DO_PAUSE;
1392                        trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message);
1393                        if(trace_has_dedicated_hasher(libtrace)) {
1394                                // The hasher has stopped and other threads have messages waiting therefore
1395                                // If the queues are empty the other threads would have no data
1396                                // So send some NULL packets to simply ask the threads to check there message queues
1397                                // We are the only writer since hasher has paused
1398                                libtrace_ringbuffer_write(&libtrace->perpkt_threads[i].rbuffer, NULL);
1399                        }
1400                } else {
1401                        fprintf(stderr, "Mapper threads should not be used to pause a trace this could cause any number of problems!!\n");
1402                }
1403        }
1404
1405        // Formats must support native message handling if a message is ready
1406        // Approach per Perry's suggestion is a non-blocking read
1407        // followed by a blocking read. XXX STRIP THIS OUT
1408
1409        if (t) {
1410                // A perpkt is doing the pausing, interesting, fake an extra thread paused
1411                // We rely on the user to *not* return before starting the trace again
1412                thread_change_state(libtrace, t, THREAD_PAUSED, true);
1413        }
1414
1415        fprintf(stderr, "Asking threads to pause\n");
1416
1417        // Wait for all threads to pause
1418        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1419        while(libtrace->perpkt_thread_states[THREAD_RUNNING]) {
1420                assert(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock) == 0);
1421        }
1422        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1423
1424        fprintf(stderr, "Threads have paused\n");
1425
1426        if (trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
1427                libtrace->started = false;
1428                if (libtrace->format->ppause_input)
1429                        libtrace->format->ppause_input(libtrace);
1430                // TODO What happens if we don't have pause input??
1431        } else {
1432                int err;
1433                fprintf(stderr, "Trace is not parallel so we are doing a normal pause %s\n", libtrace->uridata);
1434                err = trace_pause(libtrace);
1435                // We should handle this a bit better
1436                if (err)
1437                        return err;
1438        }
1439
1440        // Only set as paused after the pause has been called on the trace
1441        libtrace_change_state(libtrace, STATE_PAUSED, true);
1442        return 0;
1443}
1444
1445/**
1446 * Stop trace finish prematurely as though it meet an EOF
1447 * This should only be called by the main thread
1448 * 1. Calls ppause
1449 * 2. Sends a message asking for threads to finish
1450 * 3. Releases threads which will pause
1451 */
1452DLLEXPORT int trace_pstop(libtrace_t *libtrace)
1453{
1454        int i, err;
1455        libtrace_message_t message = {0};
1456        assert(libtrace);
1457
1458        // Ensure all threads have paused and the underlying trace format has
1459        // been closed and all packets associated are cleaned up
1460        // Pause will do any state checks for us
1461        err = trace_ppause(libtrace);
1462        if (err)
1463                return err;
1464
1465        // Now send a message asking the threads to stop
1466        // This will be retrieved before trying to read another packet
1467       
1468        message.code = MESSAGE_DO_STOP;
1469        trace_send_message_to_perpkts(libtrace, &message);
1470        if (trace_has_dedicated_hasher(libtrace))
1471                trace_send_message_to_thread(libtrace, &libtrace->hasher_thread, &message);
1472       
1473        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1474                trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message);
1475        }
1476
1477        // Now release the threads and let them stop
1478        libtrace_change_state(libtrace, STATE_FINSHED, true);
1479        return 0;
1480}
1481
1482/**
1483 * Set the hasher type along with a selected function, if hardware supports
1484 * that generic type of hashing it will be used otherwise the supplied
1485 * hasher function will be used and passed data when called.
1486 *
1487 * @return 0 if successful otherwise -1 on error
1488 */
1489DLLEXPORT int trace_set_hasher(libtrace_t *trace, enum hasher_types type, fn_hasher hasher, void *data) {
1490        int ret = -1;
1491        if (type == HASHER_HARDWARE || (type == HASHER_CUSTOM && !hasher) || (type == HASHER_BALANCE && hasher)) {
1492                return -1;
1493        }
1494
1495        // Save the requirements
1496        trace->hasher_type = type;
1497        if (hasher) {
1498                trace->hasher = hasher;
1499                trace->hasher_data = data;
1500        } else {
1501                trace->hasher = NULL;
1502                // TODO consider how to handle freeing this
1503                trace->hasher_data = NULL;
1504        }
1505
1506        // Try push this to hardware - NOTE hardware could do custom if
1507        // there is a more efficient way to apply it, in this case
1508        // it will simply grab the function out of libtrace_t
1509        if (trace->format->pconfig_input)
1510                ret = trace->format->pconfig_input(trace, TRACE_OPTION_SET_HASHER, &type);
1511
1512        if (ret == -1) {
1513                // We have to deal with this ourself
1514                // This most likely means single threaded reading of the trace
1515                if (!hasher) {
1516                        switch (type)
1517                        {
1518                                case HASHER_CUSTOM:
1519                                case HASHER_BALANCE:
1520                                        return 0;
1521                                case HASHER_BIDIRECTIONAL:
1522                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
1523                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
1524                                        toeplitz_init_config(trace->hasher_data, 1);
1525                                        return 0;
1526                                case HASHER_UNIDIRECTIONAL:
1527                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
1528                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
1529                                        toeplitz_init_config(trace->hasher_data, 0);
1530                                        return 0;
1531                                case HASHER_HARDWARE:
1532                                        return -1;
1533                        }
1534                        return -1;
1535                }
1536        } else {
1537                // The hardware is dealing with this yay
1538                trace->hasher_type = HASHER_HARDWARE;
1539        }
1540
1541        return 0;
1542}
1543
1544// Waits for all threads to finish
1545DLLEXPORT void trace_join(libtrace_t *libtrace) {
1546        int i;
1547
1548        /* Firstly wait for the perpkt threads to finish, since these are
1549         * user controlled */
1550        for (i=0; i< libtrace->perpkt_thread_count; i++) {
1551                //printf("Waiting to join with perpkt #%d\n", i);
1552                assert(pthread_join(libtrace->perpkt_threads[i].tid, NULL) == 0);
1553                //printf("Joined with perpkt #%d\n", i);
1554                // So we must do our best effort to empty the queue - so
1555                // the producer (or any other threads) don't block.
1556                libtrace_packet_t * packet;
1557                assert(libtrace->perpkt_threads[i].state == THREAD_FINISHED);
1558                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
1559                        if (packet) // This could be NULL iff the perpkt finishes early
1560                                trace_destroy_packet(packet);
1561        }
1562
1563        /* Now the hasher */
1564        // XXX signal it to stop if it hasn't already we should never be in this situation!!
1565        if (trace_has_dedicated_hasher(libtrace)) {
1566                fprintf(stderr, "Waiting to join with the hasher\n");
1567                pthread_join(libtrace->hasher_thread.tid, NULL);
1568                fprintf(stderr, "Joined with the hasher\n");
1569                assert(libtrace->hasher_thread.state == THREAD_FINISHED);
1570        }
1571
1572        // Now that everything is finished nothing can be touching our
1573        // buffers so clean them up
1574        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1575                // Its possible 1 packet got added by the reducer (or 1 per any other thread) since we cleaned up
1576                // if they lost timeslice before-during a write
1577                libtrace_packet_t * packet;
1578                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
1579                        trace_destroy_packet(packet);
1580                if (libtrace->hasher) {
1581                        assert(libtrace_ringbuffer_is_empty(&libtrace->perpkt_threads[i].rbuffer));
1582                        libtrace_ringbuffer_destroy(&libtrace->perpkt_threads[i].rbuffer);
1583                }
1584                // Cannot destroy vector yet, this happens with trace_destroy
1585        }
1586        // TODO consider perpkt threads marking trace as finished before join is called
1587        libtrace_change_state(libtrace, STATE_FINSHED, true);
1588       
1589        // Wait for the tick (keepalive) thread if it has been started
1590        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
1591                libtrace_message_t msg = {0};
1592                msg.code = MESSAGE_DO_STOP;
1593                fprintf(stderr, "Waiting to join with the keepalive\n");
1594                trace_send_message_to_thread(libtrace, &libtrace->keepalive_thread, &msg);
1595                pthread_join(libtrace->keepalive_thread.tid, NULL);
1596                fprintf(stderr, "Joined with with the keepalive\n");
1597        }
1598       
1599        libtrace_change_state(libtrace, STATE_JOINED, true);
1600}
1601
1602DLLEXPORT int libtrace_thread_get_message_count(libtrace_t * libtrace)
1603{
1604        libtrace_thread_t * t = get_thread_descriptor(libtrace);
1605        assert(t);
1606        return libtrace_message_queue_count(&t->messages);
1607}
1608
1609DLLEXPORT int libtrace_thread_get_message(libtrace_t * libtrace, libtrace_message_t * message)
1610{
1611        libtrace_thread_t * t = get_thread_descriptor(libtrace);
1612        assert(t);
1613        return libtrace_message_queue_get(&t->messages, message);
1614}
1615
1616DLLEXPORT int libtrace_thread_try_get_message(libtrace_t * libtrace, libtrace_message_t * message)
1617{
1618        libtrace_thread_t * t = get_thread_descriptor(libtrace);
1619        assert(t);
1620        return libtrace_message_queue_try_get(&t->messages, message);
1621}
1622
1623/**
1624 * Return backlog indicator
1625 */
1626DLLEXPORT int trace_post_reduce(libtrace_t *libtrace)
1627{
1628        libtrace_message_t message = {0};
1629        message.code = MESSAGE_POST_REDUCE;
1630        message.sender = get_thread_descriptor(libtrace);
1631        return libtrace_message_queue_put(&libtrace->reducer_thread.messages, (void *) &message);
1632}
1633
1634/**
1635 * Return backlog indicator
1636 */
1637DLLEXPORT int trace_send_message_to_reducer(libtrace_t * libtrace, libtrace_message_t * message)
1638{
1639        //printf("Sending message code=%d to reducer\n", message->code);
1640        message->sender = get_thread_descriptor(libtrace);
1641        return libtrace_message_queue_put(&libtrace->reducer_thread.messages, message);
1642}
1643
1644/**
1645 *
1646 */
1647DLLEXPORT int trace_send_message_to_thread(libtrace_t * libtrace, libtrace_thread_t *t, libtrace_message_t * message)
1648{
1649        //printf("Sending message code=%d to reducer\n", message->code);
1650        message->sender = get_thread_descriptor(libtrace);
1651        return libtrace_message_queue_put(&t->messages, message);
1652}
1653
1654DLLEXPORT int trace_send_message_to_perpkts(libtrace_t * libtrace, libtrace_message_t * message)
1655{
1656        int i;
1657        message->sender = get_thread_descriptor(libtrace);
1658        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1659                libtrace_message_queue_put(&libtrace->perpkt_threads[i].messages, message);
1660        }
1661        //printf("Sending message code=%d to reducer\n", message->code);
1662        return 0;
1663}
1664
1665DLLEXPORT void libtrace_result_set_key(libtrace_result_t * result, uint64_t key) {
1666        result->key = key;
1667}
1668DLLEXPORT uint64_t libtrace_result_get_key(libtrace_result_t * result) {
1669        return result->key;
1670}
1671DLLEXPORT void libtrace_result_set_value(libtrace_result_t * result, void * value) {
1672        result->value = value;
1673}
1674DLLEXPORT void* libtrace_result_get_value(libtrace_result_t * result) {
1675        return result->value;
1676}
1677DLLEXPORT void libtrace_result_set_key_value(libtrace_result_t * result, uint64_t key, void * value) {
1678        result->key = key;
1679        result->value = value;
1680}
1681DLLEXPORT void trace_destroy_result(libtrace_result_t ** result) {
1682        free(*result);
1683        result = NULL;
1684        // TODO automatically back with a free list!!
1685}
1686
1687DLLEXPORT void * trace_get_global(libtrace_t *trace)
1688{
1689        return trace->global_blob;
1690}
1691
1692DLLEXPORT void * trace_set_global(libtrace_t *trace, void * data)
1693{
1694        if (trace->global_blob && trace->global_blob != data) {
1695                void * ret = trace->global_blob;
1696                trace->global_blob = data;
1697                return ret;
1698        } else {
1699                trace->global_blob = data;
1700                return NULL;
1701        }
1702}
1703
1704DLLEXPORT void * trace_get_tls(libtrace_thread_t *t)
1705{
1706        return t->user_data;
1707}
1708
1709DLLEXPORT void * trace_set_tls(libtrace_thread_t *t, void * data)
1710{
1711        if(t->user_data && t->user_data != data) {
1712                void *ret = t->user_data;
1713                t->user_data = data;
1714                return ret;
1715        } else {
1716                t->user_data = data;
1717                return NULL;
1718        }
1719}
1720
1721/**
1722 * Publish to the reduce queue, return
1723 */
1724DLLEXPORT void trace_publish_result(libtrace_t *libtrace, uint64_t key, void * value) {
1725        libtrace_result_t res;
1726        res.is_packet = 0;
1727        // Who am I???
1728        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
1729        libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
1730        // Now put it into my table
1731        UNUSED static __thread int count = 0;
1732
1733
1734        libtrace_result_set_key_value(&res, key, value);
1735        /*
1736        if (count == 1)
1737                printf("My vector size is %d\n", libtrace_vector_get_size(&t->vector));
1738        count = (count+1) %1000;
1739        libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :)
1740        */
1741        /*if (count == 1)
1742                printf("My vector size is %d\n", libtrace_deque_get_size(&t->deque));
1743        count = (count+1)%1000;*/
1744        if (libtrace->reducer_flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED)) {
1745                if (libtrace_deque_get_size(&t->deque) >= 800) {
1746                        trace_post_reduce(libtrace);
1747                }
1748                //while (libtrace_deque_get_size(&t->deque) >= 1000)
1749                //      sched_yield();
1750                libtrace_deque_push_back(&t->deque, &res); // Automatically locking for us :)
1751        } else {
1752                //while (libtrace_vector_get_size(&t->vector) >= 1000)
1753                //      sched_yield();
1754
1755                if (libtrace_vector_get_size(&t->vector) >= 800) {
1756                        trace_post_reduce(libtrace);
1757                }
1758                libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :)
1759        }
1760}
1761
1762DLLEXPORT void trace_publish_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
1763        libtrace_result_t res;
1764        // Who am I???
1765        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
1766        libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
1767        // Now put it into my table
1768        UNUSED static __thread int count = 0;
1769
1770        res.is_packet = 1;
1771        libtrace_result_set_key_value(&res, trace_packet_get_order(packet), packet);
1772        /*
1773        if (count == 1)
1774                printf("My vector size is %d\n", libtrace_vector_get_size(&t->vector));
1775        count = (count+1) %1000;
1776        libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :)
1777        */
1778        /*if (count == 1)
1779                printf("My vector size is %d\n", libtrace_deque_get_size(&t->deque));
1780        count = (count+1)%1000;*/
1781        if (libtrace->reducer_flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED)) {
1782                if (libtrace_deque_get_size(&t->deque) >= 800) {
1783                        trace_post_reduce(libtrace);
1784                }
1785                //while (libtrace_deque_get_size(&t->deque) >= 1000)
1786                //      sched_yield();
1787                libtrace_deque_push_back(&t->deque, &res); // Automatically locking for us :)
1788        } else {
1789                //while (libtrace_vector_get_size(&t->vector) >= 1000)
1790                //      sched_yield();
1791
1792                if (libtrace_vector_get_size(&t->vector) >= 800) {
1793                        trace_post_reduce(libtrace);
1794                }
1795                libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :)
1796        }
1797}
1798
1799
1800static int compareres(const void* p1, const void* p2)
1801{
1802        if (libtrace_result_get_key((libtrace_result_t *) p1) < libtrace_result_get_key((libtrace_result_t *) p2))
1803                return -1;
1804        if (libtrace_result_get_key((libtrace_result_t *) p1) == libtrace_result_get_key((libtrace_result_t *) p2))
1805                return 0;
1806        else
1807                return 1;
1808}
1809
1810DLLEXPORT int trace_get_results(libtrace_t *libtrace, libtrace_vector_t * results) {
1811        int i;
1812        int flags = libtrace->reducer_flags; // Hint these aren't a changing
1813
1814        libtrace_vector_empty(results);
1815
1816        /* Here we assume queues are in order ascending order and they want
1817         * the smallest result first. If they are not in order the results
1818         * may not be in order.
1819         */
1820        if (flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED)) {
1821                int live_count = 0;
1822                bool live[libtrace->perpkt_thread_count]; // Set if a trace is alive
1823                uint64_t key[libtrace->perpkt_thread_count]; // Cached keys
1824                uint64_t min_key = UINT64_MAX; // XXX use max int here stdlimit.h?
1825                int min_queue = -1;
1826
1827                /* Loop through check all are alive (have data) and find the smallest */
1828                for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1829                        libtrace_queue_t *v = &libtrace->perpkt_threads[i].deque;
1830                        if (libtrace_deque_get_size(v) != 0) {
1831                                libtrace_result_t r;
1832                                libtrace_deque_peek_front(v, (void *) &r);
1833                                live_count++;
1834                                live[i] = 1;
1835                                key[i] = libtrace_result_get_key(&r);
1836                                if (i==0 || min_key > key[i]) {
1837                                        min_key = key[i];
1838                                        min_queue = i;
1839                                }
1840                        } else {
1841                                live[i] = 0;
1842                        }
1843                }
1844
1845                /* Now remove the smallest and loop - special case if all threads have joined we always flush whats left */
1846                while ((live_count == libtrace->perpkt_thread_count) || (live_count &&
1847                                ((flags & REDUCE_SEQUENTIAL && min_key == libtrace->expected_key) ||
1848                                libtrace->state == STATE_JOINED))) {
1849                        /* Get the minimum queue and then do stuff */
1850                        libtrace_result_t r;
1851
1852                        assert (libtrace_deque_pop_front(&libtrace->perpkt_threads[min_queue].deque, (void *) &r) == 1);
1853                        libtrace_vector_push_back(results, &r);
1854
1855                        // We expect the key we read +1 now
1856                        libtrace->expected_key = key[min_queue] + 1;
1857
1858                        // Now update the one we just removed
1859                        if (libtrace_deque_get_size(&libtrace->perpkt_threads[min_queue].deque) )
1860                        {
1861                                libtrace_deque_peek_front(&libtrace->perpkt_threads[min_queue].deque, (void *) &r);
1862                                key[min_queue] = libtrace_result_get_key(&r);
1863                                if (key[min_queue] <= min_key) {
1864                                        // We are still the smallest, might be out of order though :(
1865                                        min_key = key[min_queue];
1866                                } else {
1867                                        min_key = key[min_queue]; // Update our minimum
1868                                        // Check all find the smallest again - all are alive
1869                                        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1870                                                if (live[i] && min_key > key[i]) {
1871                                                        min_key = key[i];
1872                                                        min_queue = i;
1873                                                }
1874                                        }
1875                                }
1876                        } else {
1877                                live[min_queue] = 0;
1878                                live_count--;
1879                                min_key = UINT64_MAX; // Update our minimum
1880                                // Check all find the smallest again - all are alive
1881                                for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1882                                        // Still not 100% TODO (what if order is wrong or not increasing)
1883                                        if (live[i] && min_key >= key[i]) {
1884                                                min_key = key[i];
1885                                                min_queue = i;
1886                                        }
1887                                }
1888                        }
1889                }
1890        } else { // Queues are not in order - return all results in the queue
1891                for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1892                        libtrace_vector_append(results, &libtrace->perpkt_threads[i].vector);
1893                }
1894                if (flags & REDUCE_SORT) {
1895                        qsort(results->elements, results->size, results->element_size, &compareres);
1896                }
1897        }
1898        return libtrace_vector_get_size(results);
1899}
1900
1901DLLEXPORT uint64_t trace_packet_get_order(libtrace_packet_t * packet) {
1902        return packet->order;
1903}
1904
1905DLLEXPORT uint64_t trace_packet_get_hash(libtrace_packet_t * packet) {
1906        return packet->hash;
1907}
1908
1909DLLEXPORT void trace_packet_set_order(libtrace_packet_t * packet, uint64_t order) {
1910        packet->order = order;
1911}
1912
1913DLLEXPORT void trace_packet_set_hash(libtrace_packet_t * packet, uint64_t hash) {
1914        packet->hash = hash;
1915}
1916
1917DLLEXPORT int trace_finished(libtrace_t * libtrace) {
1918        // TODO I don't like using this so much, we could use state!!!
1919        return !(libtrace->perpkt_thread_states[THREAD_RUNNING] || libtrace->perpkt_thread_states[THREAD_FINISHING]);
1920}
1921
1922DLLEXPORT int trace_parallel_config(libtrace_t *libtrace, trace_parallel_option_t option, void *value)
1923{
1924        UNUSED int ret = -1;
1925        switch (option) {
1926                case TRACE_OPTION_TICK_INTERVAL:
1927                        libtrace->tick_interval = *((int *) value);
1928                        return 1;
1929                case TRACE_OPTION_SET_HASHER:
1930                        return trace_set_hasher(libtrace, (enum hasher_types) *((int *) value), NULL, NULL);
1931                case TRACE_OPTION_SET_PERPKT_BUFFER_SIZE:
1932                        libtrace->perpkt_buffer_size = *((int *) value);
1933                        return 1;
1934                case TRACE_OPTION_SET_PACKET_FREELIST_SIZE:
1935                        libtrace->packet_freelist_size = *((int *) value);
1936                        return 1;
1937                case TRACE_OPTION_SET_PERPKT_THREAD_COUNT:
1938                        libtrace->perpkt_thread_count = *((int *) value);
1939                        return 1;
1940                case TRACE_DROP_OUT_OF_ORDER:
1941                        if (*((int *) value))
1942                                libtrace->reducer_flags |= REDUCE_DROP_OOO;
1943                        else
1944                                libtrace->reducer_flags &= ~REDUCE_DROP_OOO;
1945                        return 1;
1946                case TRACE_OPTION_SEQUENTIAL:
1947                        if (*((int *) value))
1948                                libtrace->reducer_flags |= REDUCE_SEQUENTIAL;
1949                        else
1950                                libtrace->reducer_flags &= ~REDUCE_SEQUENTIAL;
1951                        return 1;
1952                case TRACE_OPTION_ORDERED:
1953                        if (*((int *) value))
1954                                libtrace->reducer_flags |= REDUCE_ORDERED;
1955                        else
1956                                libtrace->reducer_flags &= ~REDUCE_ORDERED;
1957                        return 1;
1958                case TRACE_OPTION_TRACETIME:
1959                        if(*((int *) value))
1960                                libtrace->tracetime = 1;
1961                        else
1962                                libtrace->tracetime = 0;
1963                        return 0;
1964        }
1965        return 0;
1966}
1967
1968DLLEXPORT libtrace_packet_t* trace_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet) {
1969        libtrace_packet_t* result;
1970        if (!libtrace_ringbuffer_try_sread_bl(&libtrace->packet_freelist, (void **) &result))
1971                result = trace_create_packet();
1972        assert(result);
1973        swap_packets(result, packet); // Move the current packet into our copy
1974        return result;
1975}
1976
1977DLLEXPORT void trace_free_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet) {
1978        // Try write back the packet
1979        assert(packet);
1980        // Always release any resources this might be holding such as a slot in a ringbuffer
1981        trace_fin_packet(packet);
1982        if (!libtrace_ringbuffer_try_swrite_bl(&libtrace->packet_freelist, packet)) {
1983                /* We couldn't, oh well lets just destroy it - XXX consider non managed formats i.e. rings buffers loosing packets and jamming up :( */
1984                //assert(1 == 90);
1985                trace_destroy_packet(packet);
1986        }
1987}
1988
1989DLLEXPORT libtrace_info_t *trace_get_information(libtrace_t * libtrace) {
1990        if (libtrace->format)
1991                return &libtrace->format->info;
1992        else
1993                return NULL;
1994}
Note: See TracBrowser for help on using the repository browser.