source: lib/trace_parallel.c @ fac8c46

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since fac8c46 was fac8c46, checked in by Richard Sanger <rsangerarj@…>, 7 years ago

Tidies up the pausing so that it now works as expected and a trace can easily be paused and restarted.
Ensures that packets will not be lost if pause is called on a file, any queued packets will be read (a message is sent allowing the user to drop these packets if they are unwanted).
Differentiates packets from other results in the queues to the reducer/reporter and makes a copy of the packets in result queues when pausing

  • this is needed to ensure that bad memory isn't referenced if a zero-copy trace is paused by closing sockets/associated data like in the case of ring:.

Fixed up the re-starting of traces which hadn't been finished to account for different configurations.
Adds a 'state' to libtrace to handle the state of parallel traces, rather than hacking around the existing 'started' boolean. Also provides two levels of checks for consistency if the trace is using existing that are checking started.

Various other bug fixes and tidy ups.

  • Property mode set to 100644
File size: 67.1 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * All rights reserved.
8 *
9 * This code has been developed by the University of Waikato WAND
10 * research group. For further information please see http://www.wand.net.nz/
11 *
12 * libtrace is free software; you can redistribute it and/or modify
13 * it under the terms of the GNU General Public License as published by
14 * the Free Software Foundation; either version 2 of the License, or
15 * (at your option) any later version.
16 *
17 * libtrace is distributed in the hope that it will be useful,
18 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 * GNU General Public License for more details.
21 *
22 * You should have received a copy of the GNU General Public License
23 * along with libtrace; if not, write to the Free Software
24 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25 *
26 * $Id$
27 *
28 */
29
30
31#define _GNU_SOURCE
32#include "common.h"
33#include "config.h"
34#include <assert.h>
35#include <errno.h>
36#include <fcntl.h>
37#include <stdio.h>
38#include <stdlib.h>
39#include <string.h>
40#include <sys/stat.h>
41#include <sys/types.h>
42#ifndef WIN32
43#include <sys/socket.h>
44#endif
45#include <stdarg.h>
46#include <sys/param.h>
47
48#ifdef HAVE_LIMITS_H
49#  include <limits.h>
50#endif
51
52#ifdef HAVE_SYS_LIMITS_H
53#  include <sys/limits.h>
54#endif
55
56#ifdef HAVE_NET_IF_ARP_H
57#  include <net/if_arp.h>
58#endif
59
60#ifdef HAVE_NET_IF_H
61#  include <net/if.h>
62#endif
63
64#ifdef HAVE_NETINET_IN_H
65#  include <netinet/in.h>
66#endif
67
68#ifdef HAVE_NET_ETHERNET_H
69#  include <net/ethernet.h>
70#endif
71
72#ifdef HAVE_NETINET_IF_ETHER_H
73#  include <netinet/if_ether.h>
74#endif
75
76#include <time.h>
77#ifdef WIN32
78#include <sys/timeb.h>
79#endif
80
81#include "libtrace.h"
82#include "libtrace_int.h"
83
84#ifdef HAVE_PCAP_BPF_H
85#  include <pcap-bpf.h>
86#else
87#  ifdef HAVE_NET_BPF_H
88#    include <net/bpf.h>
89#  endif
90#endif
91
92
93#include "libtrace_int.h"
94#include "format_helper.h"
95#include "rt_protocol.h"
96#include "hash_toeplitz.h"
97
98#include <pthread.h>
99#include <signal.h>
100
101
102extern int libtrace_parallel;
103
104struct multithreading_stats {
105        uint64_t full_queue_hits;
106        uint64_t wait_for_fill_complete_hits;
107} contention_stats[1024];
108
109inline int trace_has_dedicated_hasher(libtrace_t * libtrace);
110
111/**
112 * @return True if the format supports parallel threads.
113 */
114static inline bool trace_supports_parallel(libtrace_t *trace)
115{
116        assert(trace);
117        assert(trace->format);
118        if (trace->format->pstart_input)
119                return true;
120        else
121                return false;
122        //return trace->format->pstart_input;
123}
124
125DLLEXPORT void print_contention_stats(libtrace_t *libtrace) {
126        int i;
127        struct multithreading_stats totals = {0};
128        for (i = 0; i < libtrace->perpkt_thread_count ; i++) {
129                fprintf(stderr, "\nStats for perpkt thread#%d\n", i);
130                fprintf(stderr, "\tfull_queue_hits: %"PRIu64"\n", contention_stats[i].full_queue_hits);
131                totals.full_queue_hits += contention_stats[i].full_queue_hits;
132                fprintf(stderr, "\twait_for_fill_complete_hits: %"PRIu64"\n", contention_stats[i].wait_for_fill_complete_hits);
133                totals.wait_for_fill_complete_hits += contention_stats[i].wait_for_fill_complete_hits;
134        }
135        fprintf(stderr, "\nTotals for perpkt threads\n");
136        fprintf(stderr, "\tfull_queue_hits: %"PRIu64"\n", totals.full_queue_hits);
137        fprintf(stderr, "\twait_for_fill_complete_hits: %"PRIu64"\n", totals.wait_for_fill_complete_hits);
138
139        return;
140}
141
142inline void libtrace_zero_thread(libtrace_thread_t * t) {
143        t->trace = NULL;
144        t->ret = NULL;
145        t->type = THREAD_EMPTY;
146        libtrace_zero_ringbuffer(&t->rbuffer);
147        libtrace_zero_vector(&t->vector);
148        libtrace_zero_deque(&t->deque);
149        t->recorded_first = false;
150        t->perpkt_num = -1;
151}
152
153// Ints are aligned int is atomic so safe to read and write at same time
154// However write must be locked, read doesn't (We never try read before written to table)
155libtrace_thread_t * get_thread_table(libtrace_t *libtrace) {
156        int i = 0;
157        pthread_t tid = pthread_self();
158
159        for (;i<libtrace->perpkt_thread_count ;++i) {
160                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
161                        return &libtrace->perpkt_threads[i];
162        }
163        return NULL;
164}
165
166int get_thread_table_num(libtrace_t *libtrace) {
167        int i = 0;
168        pthread_t tid = pthread_self();
169        for (;i<libtrace->perpkt_thread_count; ++i) {
170                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
171                        return i;
172        }
173        return -1;
174}
175
176static libtrace_thread_t * get_thread_descriptor(libtrace_t *libtrace) {
177        libtrace_thread_t *ret;
178        if (!(ret = get_thread_table(libtrace))) {
179                pthread_t tid = pthread_self();
180                // Check if we are reducer or something else
181                if (pthread_equal(tid, libtrace->reducer_thread.tid))
182                        ret = &libtrace->reducer_thread;
183                else if (pthread_equal(tid, libtrace->hasher_thread.tid))
184                        ret = &libtrace->hasher_thread;
185                else
186                        ret = NULL;
187        }
188        return ret;
189}
190
191/** Used below in trace_make_results_packets_safe*/
192static void do_copy_result_packet(void *data)
193{
194        libtrace_result_t *res = (libtrace_result_t *)data;
195        if (res->is_packet) {
196                // Duplicate the packet in standard malloc'd memory and free the
197                // original
198                libtrace_packet_t *oldpkt, *dup;
199                oldpkt = (libtrace_packet_t *) res->value;
200                dup = trace_copy_packet(oldpkt);
201                res->value = (void *)dup;
202                trace_destroy_packet(oldpkt);
203                fprintf(stderr, "Made a packet safe!!\n");
204        }
205}
206
207/**
208 * Make a safe replacement copy of any result packets that are owned
209 * by the format in the result queue. Used when pausing traces.
210 */ 
211static void trace_make_results_packets_safe(libtrace_t *trace) {
212        libtrace_thread_t *t = get_thread_descriptor(trace);
213        if (trace->reducer_flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED))
214                libtrace_deque_apply_function(&t->deque, &do_copy_result_packet);
215        else 
216                libtrace_vector_apply_function(&t->vector, &do_copy_result_packet);
217}
218
219/**
220 * Holds threads in a paused state, until released by broadcasting
221 * the condition mutex.
222 */
223static void trace_thread_pause(libtrace_t *trace) {
224        printf("Pausing thread #%d\n", get_thread_table_num(trace));
225        trace_make_results_packets_safe(trace);
226        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
227        trace->perpkts_pausing++;
228        pthread_cond_broadcast(&trace->perpkt_cond);
229        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
230                assert(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock) == 0);
231        }
232        trace->perpkts_pausing--;
233        pthread_cond_broadcast(&trace->perpkt_cond);
234        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
235        printf("Releasing thread #%d\n", get_thread_table_num(trace));
236}
237
238/**
239 * The is the entry point for our packet processing threads.
240 */
241static void* perpkt_threads_entry(void *data) {
242        libtrace_t *trace = (libtrace_t *)data;
243        libtrace_thread_t * t;
244        libtrace_message_t message = {0};
245        libtrace_packet_t *packet = NULL;
246
247        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
248        t = get_thread_table(trace);
249        assert(t);
250        //printf("Yay Started perpkt thread #%d\n", (int) get_thread_table_num(trace));
251        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
252
253        /* ~~~~~~~~~~~ Setup complete now we loop ~~~~~~~~~~~~~~~ */
254        // Send a message to say we've started
255
256        message.code = MESSAGE_STARTED;
257        message.sender = t;
258        message.additional = NULL;
259
260        // Let the per_packet function know we have started
261        (*trace->per_pkt)(trace, NULL, &message, t);
262
263
264        for (;;) {
265                int psize;
266
267                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
268                        switch (message.code) {
269                                case MESSAGE_DO_PAUSE: // This is internal
270                                        // Send message to say we are pausing, TODO consider sender
271                                        message.code = MESSAGE_PAUSING;
272                                        (*trace->per_pkt)(trace, NULL, &message, t);
273                                        // If a hasher thread is running empty input queues so we don't loose data
274                                        if (trace_has_dedicated_hasher(trace)) {
275                                                fprintf(stderr, "Trace is using a hasher thread emptying queues\n");
276                                                // The hasher has stopped by this point, so the queue shouldn't be filling
277                                                while(!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
278                                                        psize = trace_pread_packet(trace, &packet);
279                                                        if (psize > 0) {
280                                                                packet = (*trace->per_pkt)(trace, packet, NULL, t);
281                                                        } else {
282                                                                fprintf(stderr, "Psize=%d empty=%d this is probably EOF or message waiting, but if this continues we have a bug!!!\n", psize, libtrace_ringbuffer_is_empty(&t->rbuffer));
283                                                        }
284                                                }
285                                        }
286                                        // Send a paused message as a final chance to memory copy any packets
287                                        message.code = MESSAGE_PAUSED;
288                                        (*trace->per_pkt)(trace, NULL, &message, t);
289                                        // Now we do the actual pause, this returns when we are done
290                                        trace_thread_pause(trace);
291                                        // Check for new messages as soon as we return
292                                        continue;
293                                case MESSAGE_DO_STOP: // This is internal
294                                        goto stop;
295                        }
296                        (*trace->per_pkt)(trace, NULL, &message, t);
297                        continue;
298                }
299
300                if (trace->perpkt_thread_count == 1) {
301                        if (!packet) {
302                                if (!libtrace_ringbuffer_try_sread_bl(&trace->packet_freelist, (void **) &packet))
303                                        packet = trace_create_packet();
304                        }
305                        assert(packet);
306                        if ((psize = trace_read_packet(trace, packet)) <1) {
307                                break;
308                        }
309                } else {
310                        psize = trace_pread_packet(trace, &packet);
311                }
312
313                if (psize > 0) {
314                        packet = (*trace->per_pkt)(trace, packet, NULL, t);
315                        continue;
316                }
317
318                if (psize == -2)
319                        continue; // We have a message
320
321                if (psize < 1) { // consider sending a message
322                        break;
323                }
324
325        }
326
327
328stop:
329        /* ~~~~~~~~~~~~~~ Trace is finished do tear down ~~~~~~~~~~~~~~~~~~~~~ */
330        // Let the per_packet function know we have stopped
331        message.code = MESSAGE_STOPPED;
332        message.sender = message.additional = NULL;
333        (*trace->per_pkt)(trace, NULL, &message, t);
334
335        // And we're at the end free the memories
336        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
337        t->state = THREAD_FINISHED;
338        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
339
340        // Notify only after we've defiantly set the state to finished
341        message.code = MESSAGE_PERPKT_ENDED;
342        message.additional = NULL;
343        trace_send_message_to_reducer(trace, &message);
344
345        pthread_exit(NULL);
346};
347
348/** True if trace has dedicated hasher thread otherwise false */
349inline int trace_has_dedicated_hasher(libtrace_t * libtrace)
350{
351        return libtrace->hasher_thread.type == THREAD_HASHER;
352}
353
354/**
355 * The start point for our single threaded hasher thread, this will read
356 * and hash a packet from a data source and queue it against the correct
357 * core to process it.
358 */
359static void* hasher_start(void *data) {
360        libtrace_t *trace = (libtrace_t *)data;
361        libtrace_thread_t * t;
362        int i;
363        libtrace_packet_t * packet;
364        libtrace_message_t message = {0};
365
366        assert(trace_has_dedicated_hasher(trace));
367        /* Wait until all threads are started and objects are initialised (ring buffers) */
368        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
369        t = &trace->hasher_thread;
370        assert(t->type == THREAD_HASHER && pthread_equal(pthread_self(), t->tid));
371        printf("Hasher Thread started\n");
372        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
373        int pkt_skipped = 0;
374        /* Read all packets in then hash and queue against the correct thread */
375        while (1) {
376                int thread;
377                if (!pkt_skipped && !libtrace_ringbuffer_try_sread_bl(&trace->packet_freelist, (void **) &packet))
378                        packet = trace_create_packet();
379                assert(packet);
380
381                if (libtrace_halt) // Signal to die has been sent - TODO
382                        break;
383
384                // Check for messages that we expect MESSAGE_DO_PAUSE, (internal messages only)
385                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
386                        switch(message.code) {
387                                case MESSAGE_DO_PAUSE:
388                                        fprintf(stderr, "Pausing hasher thread\n");
389                                        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
390                                        trace->perpkts_pausing++;
391                                        pthread_cond_broadcast(&trace->perpkt_cond);
392                                        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
393                                                assert(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock) == 0);
394                                        }
395                                        // trace->perpkts_pausing--; // Don't do this the pausing thread will do this for us
396                                        pthread_cond_broadcast(&trace->perpkt_cond);
397                                        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
398                                        fprintf(stdout, "Mapper resuming\n");
399                                        break;
400                                case MESSAGE_DO_STOP:
401                                        // Stop called after pause
402                                        assert(trace->started == false);
403                                        assert(trace->state == STATE_FINSHED);
404                                default:
405                                        fprintf(stderr, "Hasher thread didn't expect message code=%d\n", message.code);
406                        }
407                        pkt_skipped = 1;
408                        continue;
409                }
410
411                if ((packet->error = trace_read_packet(trace, packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
412                        break; /* We are EOF or error'd either way we stop  */
413                }
414
415                /* We are guaranteed to have a hash function i.e. != NULL */
416                trace_packet_set_hash(packet, (*trace->hasher)(packet, trace->hasher_data));
417                thread = trace_packet_get_hash(packet) % trace->perpkt_thread_count;
418                /* Blocking write to the correct queue - I'm the only writer */
419                if (trace->perpkt_threads[thread].state != THREAD_FINISHED) {
420                        libtrace_ringbuffer_write(&trace->perpkt_threads[thread].rbuffer, packet);
421                        pkt_skipped = 0;
422                } else {
423                        pkt_skipped = 1; // Reuse that packet no one read it
424                }
425        }
426
427        /* Broadcast our last failed read to all threads */
428        for (i = 0; i < trace->perpkt_thread_count; i++) {
429                libtrace_packet_t * bcast;
430                printf("Broadcasting error/EOF now the trace is over\n");
431                if (i == trace->perpkt_thread_count - 1) {
432                        bcast = packet;
433                } else {
434                        bcast = trace_create_packet();
435                        bcast->error = packet->error;
436                }
437                assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
438                if (trace->perpkt_threads[i].state != THREAD_FINISHED) {
439                        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
440                        // Unlock early otherwise we could deadlock
441                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, bcast);
442                } else {
443                        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
444                }
445        }
446        // We dont need to free packet
447
448        // And we're at the end free the memories
449        t->state = THREAD_FINISHED;
450
451        // Notify only after we've defiantly set the state to finished
452        message.code = MESSAGE_PERPKT_ENDED;
453        message.additional = NULL;
454        trace_send_message_to_reducer(trace, &message);
455
456        // TODO remove from TTABLE t sometime
457        pthread_exit(NULL);
458};
459
460/**
461 * Moves src into dest(Complete copy) and copies the memory buffer and
462 * its flags from dest into src ready for reuse without needing extra mallocs.
463 */
464static inline void swap_packets(libtrace_packet_t *dest, libtrace_packet_t *src) {
465        // Save the passed in buffer status
466        assert(dest->trace == NULL); // Must be a empty packet
467        void * temp_buf = dest->buffer;
468        buf_control_t temp_buf_control = dest->buf_control;
469        // Completely copy StoredPacket into packet
470        memcpy(dest, src, sizeof(libtrace_packet_t));
471        // Set the buffer settings on the returned packet
472        src->buffer = temp_buf;
473        src->buf_control = temp_buf_control;
474        src->trace = NULL;
475}
476
477/* Our simplest case when a thread becomes ready it can obtain a exclusive
478 * lock to read a packet from the underlying trace.
479 */
480inline static int trace_pread_packet_first_in_first_served(libtrace_t *libtrace, libtrace_packet_t **packet)
481{
482        // We need this to fill the 'first' packet table
483        libtrace_thread_t *t = get_thread_table(libtrace);
484        if (!*packet) {
485                if (!libtrace_ringbuffer_try_sread_bl(&libtrace->packet_freelist, (void **) packet))
486                        *packet = trace_create_packet();
487        }
488        assert(*packet);
489        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
490        /* Read a packet */
491        (*packet)->error = trace_read_packet(libtrace, *packet);
492        // Doing this inside the lock ensures the first packet is always
493        // recorded first
494        store_first_packet(libtrace, *packet, t);
495
496        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
497        return (*packet)->error;
498}
499
500/**
501 * For the case that we have a dedicated hasher thread
502 * 1. We read a packet from our buffer
503 * 2. Move that into the packet provided (packet)
504 */
505inline static int trace_pread_packet_hasher_thread(libtrace_t *libtrace, libtrace_packet_t **packet)
506{
507        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
508        libtrace_thread_t* t = &libtrace->perpkt_threads[this_thread];
509
510        if (*packet) // Recycle the old get the new
511                if (!libtrace_ringbuffer_try_swrite_bl(&libtrace->packet_freelist, (void *) *packet))
512                        trace_destroy_packet(*packet);
513        *packet = libtrace_ringbuffer_read(&t->rbuffer);
514
515        if (*packet) {
516                return (*packet)->error;
517        } else {
518                // This is how we do a notify, we send a message before hand to note that the trace is over etc.
519                // And this will notify the perpkt thread to read that message, this is easiest
520                // since cases like pause can also be dealt with this way without actually
521                // having to be the end of the stream.
522                fprintf(stderr, "Got a NULL packet from hasher therefore message waiting\n");
523                return -2;
524        }
525}
526
527/**
528 * Tries to read from our queue and returns 1 if a packet was retrieved
529 */
530static inline int try_waiting_queue(libtrace_t *libtrace, libtrace_thread_t * t, libtrace_packet_t **packet, int * ret)
531{
532        libtrace_packet_t* retrived_packet;
533
534        /* Lets see if we have one waiting */
535        if (libtrace_ringbuffer_try_read(&t->rbuffer, (void **) &retrived_packet)) {
536                /* Copy paste from trace_pread_packet_hasher_thread() except that we try read (non-blocking) */
537                assert(retrived_packet);
538
539                if (*packet) // Recycle the old get the new
540                        if (!libtrace_ringbuffer_try_swrite_bl(&libtrace->packet_freelist, (void *) *packet))
541                                trace_destroy_packet(*packet);
542                *packet = retrived_packet;
543                *ret = (*packet)->error;
544                return 1;
545        }
546        return 0;
547}
548
549/**
550 * Allows us to ensure all threads are finished writing to our threads ring_buffer
551 * before returning EOF/error.
552 */
553inline static int trace_handle_finishing_perpkt(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t)
554{
555        /* We are waiting for the condition that another thread ends to check
556         * our queue for new data, once all threads end we can go to finished */
557        bool complete = false;
558        int ret;
559
560        do {
561                // Wait for a thread to end
562                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
563
564                // Check before
565                if (libtrace->perpkts_finishing == libtrace->perpkt_thread_count) {
566                        complete = true;
567                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
568                        continue;
569                }
570
571                assert(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock) == 0);
572
573                // Check after
574                if (libtrace->perpkts_finishing == libtrace->perpkt_thread_count) {
575                        complete = true;
576                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
577                        continue;
578                }
579
580                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
581
582                // Always trying to keep our buffer empty for the unlikely case more threads than buffer space want to write into our queue
583                if(try_waiting_queue(libtrace, t, packet, &ret))
584                        return ret;
585        } while (!complete);
586
587        // We can only end up here once all threads complete
588        try_waiting_queue(libtrace, t, packet, &ret);
589
590        return ret;
591        // TODO rethink this logic fix bug here
592}
593
594/**
595 * Expects the libtrace_lock to not be held
596 */
597inline static int trace_finish_perpkt(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t)
598{
599        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
600        t->state = THREAD_FINISHING;
601        libtrace->perpkts_finishing++;
602        pthread_cond_broadcast(&libtrace->perpkt_cond);
603        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
604        return trace_handle_finishing_perpkt(libtrace, packet, t);
605}
606
607/**
608 * This case is much like the dedicated hasher, except that we will become
609 * hasher if we don't have a a packet waiting.
610 *
611 * Note: This is only every used if we have are doing hashing.
612 *
613 * TODO: Can block on zero copy formats such as ring: and dpdk: if the
614 * queue sizes in total are larger than the ring size.
615 *
616 * 1. We read a packet from our buffer
617 * 2. Move that into the packet provided (packet)
618 */
619inline static int trace_pread_packet_hash_locked(libtrace_t *libtrace, libtrace_packet_t **packet)
620{
621        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
622        libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
623        int thread, ret/*, psize*/;
624
625        while (1) {
626                if(try_waiting_queue(libtrace, t, packet, &ret))
627                        return ret;
628                // Can still block here if another thread is writing to a full queue
629                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
630
631                // Its impossible for our own queue to overfill, because no one can write
632                // when we are in the lock
633                if(try_waiting_queue(libtrace, t, packet, &ret)) {
634                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
635                        return ret;
636                }
637
638                // Another thread cannot write a packet because a queue has filled up. Is it ours?
639                if (libtrace->perpkt_queue_full) {
640                        contention_stats[this_thread].wait_for_fill_complete_hits++;
641                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
642                        continue;
643                }
644
645                if (!*packet) {
646                        if (!libtrace_ringbuffer_try_sread_bl(&libtrace->packet_freelist, (void **) packet))
647                                *packet = trace_create_packet();
648                }
649                assert(*packet);
650
651                // If we fail here we can guarantee that our queue is empty (and no new data will be added because we hold the lock)
652                if (libtrace_halt || ((*packet)->error = trace_read_packet(libtrace, *packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
653                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
654                        if (libtrace_halt)
655                                return 0;
656                        else
657                                return (*packet)->error;
658                }
659
660                trace_packet_set_hash(*packet, (*libtrace->hasher)(*packet, libtrace->hasher_data));
661                thread = trace_packet_get_hash(*packet) % libtrace->perpkt_thread_count;
662                if (thread == this_thread) {
663                        // If it's this thread we must be in order because we checked the buffer once we got the lock
664                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
665                        return (*packet)->error;
666                }
667
668                if (libtrace->perpkt_threads[thread].state != THREAD_FINISHED) {
669                        while (!libtrace_ringbuffer_try_swrite_bl(&libtrace->perpkt_threads[thread].rbuffer, *packet)) {
670                                libtrace->perpkt_queue_full = true;
671                                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
672                                contention_stats[this_thread].full_queue_hits++;
673                                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
674                        }
675                        *packet = NULL;
676                        libtrace->perpkt_queue_full = false;
677                } else {
678                        /* We can get here if the user closes the thread before natural completion/or error */
679                        assert (!"packet_hash_locked() The user terminated the trace in a abnormal manner");
680                }
681                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
682        }
683}
684
685/**
686 * This case is much like the dedicated hasher, except that we will become
687 * hasher if we don't have a a packet waiting.
688 *
689 * TODO: You can loose the tail of a trace if the final thread
690 * fills its own queue and therefore breaks early and doesn't empty the sliding window.
691 *
692 * TODO: Can block on zero copy formats such as ring: and dpdk: if the
693 * queue sizes in total are larger than the ring size.
694 *
695 * 1. We read a packet from our buffer
696 * 2. Move that into the packet provided (packet)
697 */
698inline static int trace_pread_packet_sliding_window(libtrace_t *libtrace, libtrace_packet_t **packet)
699{
700        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
701        libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
702        int ret, i, thread/*, psize*/;
703
704        if (t->state == THREAD_FINISHING)
705                return trace_handle_finishing_perpkt(libtrace, packet, t);
706
707        while (1) {
708                // Check if we have packets ready
709                if(try_waiting_queue(libtrace, t, packet, &ret))
710                        return ret;
711
712                // We limit the number of packets we get to the size of the sliding window
713                // such that it is impossible for any given thread to fail to store a packet
714                assert(sem_wait(&libtrace->sem) == 0);
715                /*~~~~Single threaded read of a packet~~~~*/
716                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
717
718                /* Re-check our queue things we might have data waiting */
719                if(try_waiting_queue(libtrace, t, packet, &ret)) {
720                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
721                        assert(sem_post(&libtrace->sem) == 0);
722                        return ret;
723                }
724
725                // TODO put on *proper* condition variable
726                if (libtrace->perpkt_queue_full) {
727                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
728                        assert(sem_post(&libtrace->sem) == 0);
729                        contention_stats[this_thread].wait_for_fill_complete_hits++;
730                        continue;
731                }
732
733                if (!*packet) {
734                        if (!libtrace_ringbuffer_try_sread_bl(&libtrace->packet_freelist, (void **) packet))
735                                *packet = trace_create_packet();
736                }
737                assert(*packet);
738
739                if (libtrace_halt || ((*packet)->error = trace_read_packet(libtrace, *packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
740                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
741                        assert(sem_post(&libtrace->sem) == 0);
742                        // Finish this thread ensuring that any data written later by another thread is retrieved also
743                        if (libtrace_halt)
744                                return 0;
745                        else
746                                return trace_finish_perpkt(libtrace, packet, t);
747                }
748                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
749
750                /* ~~~~Multiple threads can run the hasher~~~~ */
751                trace_packet_set_hash(*packet, (*libtrace->hasher)(*packet, libtrace->hasher_data));
752
753                /* Yes this is correct opposite read lock for a write operation */
754                assert(pthread_rwlock_rdlock(&libtrace->window_lock) == 0);
755                if (!libtrace_slidingwindow_try_write(&libtrace->sliding_window, trace_packet_get_order(*packet), *packet))
756                        assert(!"Semaphore should stop us from ever overfilling the sliding window");
757                assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
758                *packet = NULL;
759
760                // Always try read any data from the sliding window
761                while (libtrace_slidingwindow_read_ready(&libtrace->sliding_window)) {
762                        assert(pthread_rwlock_wrlock(&libtrace->window_lock) == 0);
763                        if (libtrace->perpkt_queue_full) {
764                                // I might be the holdup in which case if I can read my queue I should do that and return
765                                if(try_waiting_queue(libtrace, t, packet, &ret)) {
766                                        assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
767                                        return ret;
768                                }
769                                assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
770                                continue;
771                        }
772                        // Read greedily as many as we can
773                        while (libtrace_slidingwindow_try_read(&libtrace->sliding_window, (void **) packet, NULL)) {
774                                thread = trace_packet_get_hash(*packet) % libtrace->perpkt_thread_count;
775                                if (libtrace->perpkt_threads[thread].state != THREAD_FINISHED) {
776                                        while (!libtrace_ringbuffer_try_swrite_bl(&libtrace->perpkt_threads[thread].rbuffer, *packet)) {
777                                                if (this_thread == thread)
778                                                {
779                                                        // TODO think about this case more because we have to stop early if this were to happen on the last read
780                                                        // before EOF/error we might not have emptied the sliding window
781                                                        printf("!~!~!~!~!~!~In this Code~!~!~!~!\n");
782                                                        // Its our queue we must have a packet to read out
783                                                        if(try_waiting_queue(libtrace, t, packet, &ret)) {
784                                                                // We must be able to write this now 100% without fail
785                                                                libtrace_ringbuffer_write(&libtrace->perpkt_threads[thread].rbuffer, *packet);
786                                                                assert(sem_post(&libtrace->sem) == 0);
787                                                                assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
788                                                                return ret;
789                                                        } else {
790                                                                assert(!"Our queue is full but I cannot read from it??");
791                                                        }
792                                                }
793                                                // Not us we have to give the other threads a chance to write there packets then
794                                                libtrace->perpkt_queue_full = true;
795                                                assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
796                                                for (i = 0; i < libtrace->perpkt_thread_count-1; i++) // Release all other threads to read there packets
797                                                        assert(sem_post(&libtrace->sem) == 0);
798
799                                                contention_stats[this_thread].full_queue_hits++;
800                                                assert(pthread_rwlock_wrlock(&libtrace->window_lock) == 0);
801                                                // Grab these back
802                                                for (i = 0; i < libtrace->perpkt_thread_count-1; i++) // Release all other threads to read there packets
803                                                        assert(sem_wait(&libtrace->sem) == 0);
804                                                libtrace->perpkt_queue_full = false;
805                                        }
806                                        assert(sem_post(&libtrace->sem) == 0);
807                                        *packet = NULL;
808                                } else {
809                                        // Cannot write to a queue if no ones waiting (I think this is unreachable)
810                                        // in the general case (unless the user ends early without proper clean up).
811                                        assert (!"unreachable code??");
812                                }
813                        }
814                        assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
815                }
816                // Now we go back to checking our queue anyways
817        }
818}
819
820
821/**
822 * For the first packet of each queue we keep a copy and note the system
823 * time it was received at.
824 *
825 * This is used for finding the first packet when playing back a trace
826 * in trace time. And can be used by real time applications to print
827 * results out every XXX seconds.
828 */
829inline void store_first_packet(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t)
830{
831        if (!t->recorded_first) {
832                struct timeval tv;
833                libtrace_packet_t * dup;
834                // For what it's worth we can call these outside of the lock
835                gettimeofday(&tv, NULL);
836                dup = trace_copy_packet(packet);
837                assert(pthread_spin_lock(&libtrace->first_packets.lock) == 0);
838                libtrace->first_packets.packets[t->perpkt_num].packet = dup;
839                //printf("Stored first packet time=%f\n", trace_get_seconds(dup));
840                memcpy(&libtrace->first_packets.packets[t->perpkt_num].tv, &tv, sizeof(tv));
841                // Now update the first
842                libtrace->first_packets.count++;
843                if (libtrace->first_packets.count == 1) {
844                        // We the first entry hence also the first known packet
845                        libtrace->first_packets.first = t->perpkt_num;
846                } else {
847                        // Check if we are newer than the previous 'first' packet
848                        size_t first = libtrace->first_packets.first;
849                        if (trace_get_seconds(dup) <
850                                trace_get_seconds(libtrace->first_packets.packets[first].packet))
851                                libtrace->first_packets.first = t->perpkt_num;
852                }
853                assert(pthread_spin_unlock(&libtrace->first_packets.lock) == 0);
854                libtrace_message_t mesg = {0};
855                mesg.code = MESSAGE_FIRST_PACKET;
856                mesg.additional = NULL;
857                trace_send_message_to_reducer(libtrace, &mesg);
858                t->recorded_first = true;
859        }
860}
861
862/**
863 * Returns 1 if it's certain that the first packet is truly the first packet
864 * rather than a best guess based upon threads that have published so far.
865 * Otherwise 0 is returned.
866 * It's recommended that this result is stored rather than calling this
867 * function again.
868 */
869DLLEXPORT int retrive_first_packet(libtrace_t *libtrace, libtrace_packet_t **packet, struct timeval **tv)
870{
871        int ret = 0;
872        assert(pthread_spin_lock(&libtrace->first_packets.lock) == 0);
873        if (libtrace->first_packets.count) {
874                *packet = libtrace->first_packets.packets[libtrace->first_packets.first].packet;
875                *tv = &libtrace->first_packets.packets[libtrace->first_packets.first].tv;
876                if (libtrace->first_packets.count == libtrace->perpkt_thread_count) {
877                        ret = 1;
878                } else {
879                        struct timeval curr_tv;
880                        // If a second has passed since the first entry we will assume this is the very first packet
881                        gettimeofday(&curr_tv, NULL);
882                        if (curr_tv.tv_sec > (*tv)->tv_sec) {
883                                if(curr_tv.tv_usec > (*tv)->tv_usec || curr_tv.tv_sec - (*tv)->tv_sec > 1) {
884                                        ret = 1;
885                                }
886                        }
887                }
888        } else {
889                *packet = NULL;
890                *tv = NULL;
891        }
892        assert(pthread_spin_unlock(&libtrace->first_packets.lock) == 0);
893        return ret;
894}
895
896
897DLLEXPORT inline uint64_t tv_to_usec(struct timeval *tv)
898{
899        return (uint64_t) tv->tv_sec*1000000ull + (uint64_t) tv->tv_usec;
900}
901
902inline static struct timeval usec_to_tv(uint64_t usec)
903{
904        struct timeval tv;
905        tv.tv_sec = usec / 1000000;
906        tv.tv_usec = usec % 1000000;
907        return tv;
908}
909
910
911/**
912 * Delays a packets playback so the playback will be in trace time
913 */
914static inline void delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t) {
915        struct timeval curr_tv, pkt_tv;
916        uint64_t next_release = t->tracetime_offset_usec; // Time at which to release the packet
917        uint64_t curr_usec;
918        /* Tracetime we might delay releasing this packet */
919        if (!t->tracetime_offset_usec) {
920                libtrace_packet_t * first_pkt;
921                struct timeval *sys_tv;
922                int64_t initial_offset;
923                int stable = retrive_first_packet(libtrace, &first_pkt, &sys_tv);
924                assert(first_pkt);
925                pkt_tv = trace_get_timeval(first_pkt);
926                initial_offset = (int64_t)tv_to_usec(sys_tv) - (int64_t)tv_to_usec(&pkt_tv);
927                if (stable)
928                        // 0->1 because 0 is used to mean unset
929                        t->tracetime_offset_usec = initial_offset ? initial_offset: 1;
930                next_release = initial_offset;
931        }
932        /* next_release == offset */
933        pkt_tv = trace_get_timeval(packet);
934        next_release += tv_to_usec(&pkt_tv);
935        gettimeofday(&curr_tv, NULL);
936        curr_usec = tv_to_usec(&curr_tv);
937        if (next_release > curr_usec) {
938                // We need to wait
939                struct timeval delay_tv = usec_to_tv(next_release-curr_usec);
940                //printf("WAITING for %d.%d next=%"PRIu64" curr=%"PRIu64" seconds packettime %f\n", delay_tv.tv_sec, delay_tv.tv_usec, next_release, curr_usec, trace_get_seconds(packet));
941                select(0, NULL, NULL, NULL, &delay_tv);
942        }
943}
944
945/* Read one packet from the trace into a buffer. Note that this function will
946 * block until a packet is read (or EOF is reached).
947 *
948 * @param libtrace      the libtrace opaque pointer
949 * @param packet        the packet opaque pointer
950 * @returns 0 on EOF, negative value on error
951 *
952 * Note this is identical to read_packet but calls pread_packet instead of
953 * read packet in the format.
954 *
955 */
956static inline int trace_pread_packet_wrapper(libtrace_t *libtrace, libtrace_packet_t *packet) {
957
958        assert(libtrace && "You called trace_read_packet() with a NULL libtrace parameter!\n");
959        if (trace_is_err(libtrace))
960                return -1;
961        if (!libtrace->started) {
962                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,"You must call libtrace_start() before trace_read_packet()\n");
963                return -1;
964        }
965        if (!(packet->buf_control==TRACE_CTRL_PACKET || packet->buf_control==TRACE_CTRL_EXTERNAL)) {
966                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,"Packet passed to trace_read_packet() is invalid\n");
967                return -1;
968        }
969        assert(packet);
970
971        if (libtrace->format->read_packet) {
972                do {
973                        size_t ret;
974                        /* Finalise the packet, freeing any resources the format module
975                         * may have allocated it and zeroing all data associated with it.
976                         */
977                        trace_fin_packet(packet);
978                        /* Store the trace we are reading from into the packet opaque
979                         * structure */
980                        packet->trace = libtrace;
981                        ret=libtrace->format->pread_packet(libtrace,packet);
982                        if (ret==(size_t)-1 || ret==(size_t)-2 || ret==0) {
983                                return ret;
984                        }
985                        if (libtrace->filter) {
986                                /* If the filter doesn't match, read another
987                                 * packet
988                                 */
989                                if (!trace_apply_filter(libtrace->filter,packet)){
990                                        ++libtrace->filtered_packets;
991                                        continue;
992                                }
993                        }
994                        if (libtrace->snaplen>0) {
995                                /* Snap the packet */
996                                trace_set_capture_length(packet,
997                                                libtrace->snaplen);
998                        }
999                        trace_packet_set_order(packet, libtrace->accepted_packets);
1000                        ++libtrace->accepted_packets;
1001                        return ret;
1002                } while(1);
1003        }
1004        trace_set_err(libtrace,TRACE_ERR_UNSUPPORTED,"This format does not support reading packets\n");
1005        return ~0U;
1006}
1007
1008/**
1009 * Read a packet from the parallel trace
1010 */
1011DLLEXPORT int trace_pread_packet(libtrace_t *libtrace, libtrace_packet_t **packet)
1012{
1013        int ret;
1014        libtrace_thread_t *t = get_thread_table(libtrace);
1015
1016        // Cleanup the packet passed back
1017        if (*packet)
1018                trace_fin_packet(*packet);
1019
1020        if (trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
1021                if (!*packet)
1022                        *packet = trace_create_packet();
1023                ret = trace_pread_packet_wrapper(libtrace, *packet);
1024        } else if (trace_has_dedicated_hasher(libtrace)) {
1025                ret = trace_pread_packet_hasher_thread(libtrace, packet);
1026        } else if (!trace_has_dedicated_hasher(libtrace)) {
1027                /* We don't care about which core a packet goes to */
1028                ret = trace_pread_packet_first_in_first_served(libtrace, packet);
1029        } /* else if (libtrace->reducer_flags & PERPKT_USE_SLIDING_WINDOW) {
1030                ret = trace_pread_packet_sliding_window(libtrace, packet);
1031        } else {
1032                ret = trace_pread_packet_hash_locked(libtrace, packet);
1033        }*/
1034
1035        // Formats can also optionally do this internally to ensure the first
1036        // packet is always reported correctly
1037        if (ret > 0) {
1038                store_first_packet(libtrace, *packet, t);
1039                if (libtrace->tracetime)
1040                        delay_tracetime(libtrace, *packet, t);
1041        }
1042
1043        return ret;
1044}
1045
1046/* Starts perpkt threads
1047 * @return threads_started
1048 */
1049static inline int trace_start_perpkt_threads (libtrace_t *libtrace) {
1050        int i;
1051
1052        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1053                libtrace_thread_t *t = &libtrace->perpkt_threads[i];
1054                assert(pthread_create(&t->tid, NULL, perpkt_threads_entry, (void *) libtrace) == 0);
1055        }
1056        return libtrace->perpkt_thread_count;
1057}
1058
1059/* Start an input trace in a parallel fashion.
1060 *
1061 * @param libtrace the input trace to start
1062 * @param global_blob some global data you can share with the new perpkt threads
1063 * @returns 0 on success
1064 */
1065DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob, fn_per_pkt per_pkt, fn_reducer reducer)
1066{
1067        int i;
1068        sigset_t sig_before, sig_block_all;
1069
1070        assert(libtrace);
1071        if (trace_is_err(libtrace))
1072                return -1;
1073        if (libtrace->state == STATE_PAUSED) {
1074                assert (libtrace->perpkts_pausing != 0);
1075                fprintf(stderr, "Restarting trace\n");
1076                int err;
1077                if (libtrace->perpkt_thread_count > 1 && trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
1078                        printf("This format has direct support for p's\n");
1079                        err = libtrace->format->pstart_input(libtrace);
1080                } else {
1081                        if (libtrace->format->start_input) {
1082                                err = libtrace->format->start_input(libtrace);
1083                        }
1084                }
1085                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1086                libtrace->started = true;
1087                libtrace->state = STATE_RUNNING;
1088                assert(pthread_cond_broadcast(&libtrace->perpkt_cond) == 0);
1089                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1090                return 0;
1091        }
1092
1093        assert(libtrace->state == STATE_NEW);
1094        libtrace_parallel = 1;
1095
1096        // Store the user defined things against the trace
1097        libtrace->global_blob = global_blob;
1098        libtrace->per_pkt = per_pkt;
1099        libtrace->reducer = reducer;
1100        libtrace->perpkts_finishing = 0;
1101        // libtrace->hasher = &rand_hash; /* Hasher now set via option */
1102
1103        assert(pthread_mutex_init(&libtrace->libtrace_lock, NULL) == 0);
1104        assert(pthread_cond_init(&libtrace->perpkt_cond, NULL) == 0);
1105        assert(pthread_rwlock_init(&libtrace->window_lock, NULL) == 0);
1106        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1107
1108        // Set default buffer sizes
1109        if (libtrace->perpkt_buffer_size <= 0)
1110                libtrace->perpkt_buffer_size = 1000;
1111
1112        if (libtrace->perpkt_thread_count <= 0)
1113                libtrace->perpkt_thread_count = 2; // XXX scale to system
1114
1115        if(libtrace->packet_freelist_size <= 0)
1116                libtrace->packet_freelist_size = (libtrace->perpkt_buffer_size + 1) * libtrace->perpkt_thread_count;
1117
1118        if(libtrace->packet_freelist_size <
1119                (libtrace->perpkt_buffer_size + 1) * libtrace->perpkt_thread_count)
1120                fprintf(stderr, "WARNING deadlocks may occur and extra memory allocating buffer sizes (packet_freelist_size) mismatched\n");
1121
1122        libtrace->started=true; // Before we start the threads otherwise we could have issues
1123        libtrace->state = STATE_RUNNING;
1124        /* Disable signals - Pthread signal handling */
1125
1126        sigemptyset(&sig_block_all);
1127
1128        assert(pthread_sigmask(SIG_SETMASK, &sig_block_all, &sig_before) == 0);
1129
1130        // If we are using a hasher start it
1131        if (libtrace->hasher && libtrace->hasher_thread.type == THREAD_HASHER) {
1132                libtrace_thread_t *t = &libtrace->hasher_thread;
1133                t->trace = libtrace;
1134                t->ret = NULL;
1135                t->type = THREAD_HASHER;
1136                t->state = THREAD_RUNNING;
1137                libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
1138                assert(pthread_create(&t->tid, NULL, hasher_start, (void *) libtrace) == 0);
1139        } else {
1140                libtrace->hasher_thread.type = THREAD_EMPTY;
1141        }
1142        libtrace_ringbuffer_init(&libtrace->packet_freelist, libtrace->packet_freelist_size, LIBTRACE_RINGBUFFER_BLOCKING);
1143        libtrace_slidingwindow_init(&libtrace->sliding_window, libtrace->packet_freelist_size, 0);
1144        assert(sem_init(&libtrace->sem, 0, libtrace->packet_freelist_size) == 0);
1145        // This will be applied to every new thread that starts, i.e. they will block all signals
1146        // Lets start a fixed number of reading threads
1147
1148        // For now we never have a dedicated thread for the reducer
1149        // i.e. This main thread is used as the reducer
1150        libtrace->reducer_thread.tid = pthread_self();
1151        libtrace->reducer_thread.type = THREAD_REDUCER;
1152        libtrace->reducer_thread.state = THREAD_RUNNING;
1153        libtrace_message_queue_init(&libtrace->reducer_thread.messages, sizeof(libtrace_message_t));
1154
1155        /* Ready some storages */
1156        libtrace->first_packets.first = 0;
1157        libtrace->first_packets.count = 0;
1158        assert(pthread_spin_init(&libtrace->first_packets.lock, 0) == 0);
1159        libtrace->first_packets.packets = calloc(libtrace->perpkt_thread_count, sizeof(struct  __packet_storage_magic_type));
1160
1161
1162        /* Start all of our perpkt threads */
1163        libtrace->perpkt_threads = calloc(sizeof(libtrace_thread_t), libtrace->perpkt_thread_count);
1164        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1165                libtrace_thread_t *t = &libtrace->perpkt_threads[i];
1166                t->trace = libtrace;
1167                t->ret = NULL;
1168                t->type = THREAD_PERPKT;
1169                t->state = THREAD_RUNNING;
1170                t->user_data = NULL;
1171                // t->tid DONE on create
1172                t->perpkt_num = i;
1173                if (libtrace->hasher)
1174                        libtrace_ringbuffer_init(&t->rbuffer, libtrace->perpkt_buffer_size, LIBTRACE_RINGBUFFER_BLOCKING);
1175                // Depending on the mode vector or deque might be chosen
1176                libtrace_vector_init(&t->vector, sizeof(libtrace_result_t));
1177                libtrace_deque_init(&t->deque, sizeof(libtrace_result_t));
1178                libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
1179                t->tmp_key = 0;
1180                t->tmp_data = NULL;
1181                t->recorded_first = false;
1182                assert(pthread_spin_init(&t->tmp_spinlock, 0) == 0);
1183                t->tracetime_offset_usec = 0;;
1184        }
1185
1186        int threads_started = 0;
1187        /* Setup the trace and start our threads */
1188        if (libtrace->perpkt_thread_count > 1 && trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
1189                printf("This format has direct support for p's\n");
1190                threads_started = libtrace->format->pstart_input(libtrace);
1191        } else {
1192                if (libtrace->format->start_input) {
1193                        threads_started=libtrace->format->start_input(libtrace);
1194                }
1195        }
1196        if (threads_started == 0)
1197                threads_started = trace_start_perpkt_threads(libtrace);
1198
1199
1200        // Revert back - Allow signals again
1201        assert(pthread_sigmask(SIG_SETMASK, &sig_before, NULL) == 0);
1202        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1203
1204        if (threads_started < 0)
1205                // Error
1206                return threads_started;
1207
1208        // TODO fix these leaks etc
1209        if (libtrace->perpkt_thread_count != threads_started)
1210                printf("Warning started threads not equal requested s=%d r=%d", threads_started, libtrace->perpkt_thread_count);
1211
1212
1213        return 0;
1214}
1215
1216/**
1217 * Pauses a trace, this should only be called by the main thread
1218 * 1. Set started = false
1219 * 2. All perpkt threads are paused waiting on a condition var
1220 * 3. Then call ppause on the underlying format if found
1221 * 4. Return with perpkts_pausing set to perpkt_count (Used when restarting so we reuse the threads)
1222 *
1223 * Once done you should be able to modify the trace setup and call pstart again
1224 * TODO handle changing thread numbers
1225 */
1226DLLEXPORT int trace_ppause(libtrace_t *libtrace)
1227{
1228        libtrace_thread_t *t;
1229        int i;
1230        assert(libtrace);
1231        if (!libtrace->started || libtrace->state != STATE_RUNNING) {
1232                fprintf(stderr, "pause failed started=%d state=%s (%d)\n", libtrace->started, get_trace_state_name(libtrace->state), libtrace->state);
1233                trace_set_err(libtrace,TRACE_ERR_BAD_STATE, "You must call trace_start() before calling trace_ppause()");
1234                return -1;
1235        }
1236
1237        t = get_thread_table(libtrace);
1238
1239        // Set pausing
1240        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1241        libtrace->state = STATE_PAUSING;
1242        pthread_cond_broadcast(&libtrace->perpkt_cond);
1243        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1244
1245        // Special case handle the hasher thread case
1246        if (trace_has_dedicated_hasher(libtrace)) {
1247                fprintf(stderr, "Hasher thread running we deal with this special!\n");
1248                libtrace_message_t message = {0};
1249                message.code = MESSAGE_DO_PAUSE;
1250                message.additional = NULL;
1251                trace_send_message_to_thread(libtrace, &libtrace->hasher_thread, &message);
1252                // Wait for it to pause
1253                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1254                while (1 != libtrace->perpkts_pausing) {
1255                        assert(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock) == 0);
1256                }
1257                libtrace->perpkts_pausing--; // Do this on the hasher's behalf
1258                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1259        }
1260
1261        fprintf(stderr, "Sending messages \n");
1262        // Stop threads, skip this one if its a perpkt
1263        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1264                if (&libtrace->perpkt_threads[i] != t) {
1265                        libtrace_message_t message = {0};
1266                        message.code = MESSAGE_DO_PAUSE;
1267                        message.additional = NULL;
1268                        trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message);
1269                        if(trace_has_dedicated_hasher(libtrace)) {
1270                                // The hasher has stopped and other threads have messages waiting therefore
1271                                // If the queues are empty the other threads would have no data
1272                                // So send some NULL packets to simply ask the threads to check there message queues
1273                                // We are the only writer since hasher has paused
1274                                libtrace_ringbuffer_write(&libtrace->perpkt_threads[i].rbuffer, NULL);
1275                        }
1276                } else {
1277                        fprintf(stderr, "Mapper threads should not be used to pause a trace this could cause any number of problems!!\n");
1278                }
1279        }
1280
1281        // Formats must support native message handling if a message is ready
1282        // Approach per Perry's suggestion is a non-blocking read
1283        // followed by a blocking read. XXX STRIP THIS OUT
1284
1285        if (t) {
1286                // A perpkt is doing the pausing interesting fake a extra thread paused
1287                // We rely on the user to not return before starting the trace again
1288                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1289                libtrace->perpkts_pausing++;
1290                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1291        }
1292
1293        fprintf(stderr, "Asking threads to pause\n");
1294
1295        // Wait for all threads to pause
1296        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1297        while (libtrace->perpkt_thread_count != libtrace->perpkts_pausing) {
1298                assert(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock) == 0);
1299        }
1300        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1301
1302        fprintf(stderr, "Threads have paused\n");
1303
1304        if (trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
1305                libtrace->started = false;
1306                if (libtrace->format->ppause_input)
1307                        libtrace->format->ppause_input(libtrace);
1308                // TODO What happens if we don't have pause input??
1309        } else {
1310                int err;
1311                fprintf(stderr, "Trace is not parallel so we are doing a normal pause %s\n", libtrace->uridata);
1312                err = trace_pause(libtrace);
1313                // We should handle this a bit better
1314                if (err)
1315                        return err;
1316        }
1317
1318        // Only set as paused after the pause has been called on the trace
1319        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1320        libtrace->state = STATE_PAUSED;
1321        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1322        return 0;
1323}
1324
1325/**
1326 * Stop trace finish prematurely as though it meet an EOF
1327 * This should only be called by the main thread
1328 * 1. Calls ppause
1329 * 2. Sends a message asking for threads to finish
1330 *
1331 */
1332DLLEXPORT int trace_pstop(libtrace_t *libtrace)
1333{
1334        int i;
1335        libtrace_message_t message = {0};
1336        assert(libtrace);
1337        if (!libtrace->started) {
1338                trace_set_err(libtrace,TRACE_ERR_BAD_STATE, "You must call trace_start() before calling trace_pstop()");
1339                return -1;
1340        }
1341
1342        // Ensure all threads have paused and the underlying trace format has
1343        // been closed and all packets associated are cleaned up
1344        trace_ppause(libtrace);
1345
1346        // Now send a message asking the threads to stop
1347        // This will be retrieved before trying to read another packet
1348       
1349        message.code = MESSAGE_DO_STOP;
1350        message.additional = NULL;
1351        trace_send_message_to_perpkts(libtrace, &message);
1352        if (trace_has_dedicated_hasher(libtrace))
1353                trace_send_message_to_thread(libtrace, &libtrace->hasher_thread, &message);
1354       
1355        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1356                trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message);
1357        }
1358
1359        // Now release the threads and let them stop
1360        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1361        libtrace->state = STATE_FINSHED;
1362        assert(pthread_cond_broadcast(&libtrace->perpkt_cond) == 0);
1363        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1364        return 0;
1365}
1366
1367/**
1368 * Set the hasher type along with a selected function, if hardware supports
1369 * that generic type of hashing it will be used otherwise the supplied
1370 * hasher function will be used and passed data when called.
1371 *
1372 * @return 0 if successful otherwise -1 on error
1373 */
1374DLLEXPORT int trace_set_hasher(libtrace_t *trace, enum hasher_types type, fn_hasher hasher, void *data) {
1375        int ret = -1;
1376        if (type == HASHER_HARDWARE || (type == HASHER_CUSTOM && !hasher) || (type == HASHER_BALANCE && hasher)) {
1377                return -1;
1378        }
1379
1380        // Save the requirements
1381        trace->hasher_type = type;
1382        if (hasher) {
1383                trace->hasher = hasher;
1384                trace->hasher_data = data;
1385        } else {
1386                trace->hasher = NULL;
1387                // TODO consider how to handle freeing this
1388                trace->hasher_data = NULL;
1389        }
1390
1391        // Try push this to hardware - NOTE hardware could do custom if
1392        // there is a more efficient way to apply it, in this case
1393        // it will simply grab the function out of libtrace_t
1394        if (trace->format->pconfig_input)
1395                ret = trace->format->pconfig_input(trace, TRACE_OPTION_SET_HASHER, &type);
1396
1397        if (ret == -1) {
1398                // We have to deal with this ourself
1399                // This most likely means single threaded reading of the trace
1400                if (!hasher) {
1401                        switch (type)
1402                        {
1403                                case HASHER_CUSTOM:
1404                                case HASHER_BALANCE:
1405                                        return 0;
1406                                case HASHER_BIDIRECTIONAL:
1407                                        trace->hasher = &toeplitz_hash_packet;
1408                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
1409                                        toeplitz_init_config(trace->hasher_data, 1);
1410                                        return 0;
1411                                case HASHER_UNIDIRECTIONAL:
1412                                        trace->hasher = &toeplitz_hash_packet;
1413                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
1414                                        toeplitz_init_config(trace->hasher_data, 0);
1415                                        return 0;
1416                                case HASHER_HARDWARE:
1417                                        return -1;
1418                        }
1419                        return -1;
1420                }
1421        } else {
1422                // The hardware is dealing with this yay
1423                trace->hasher_type = HASHER_HARDWARE;
1424        }
1425
1426        return 0;
1427}
1428
1429// Waits for all threads to finish
1430DLLEXPORT void trace_join(libtrace_t *libtrace) {
1431        int i;
1432
1433        /* Firstly wait for the perpkt threads to finish, since these are
1434         * user controlled */
1435        for (i=0; i< libtrace->perpkt_thread_count; i++) {
1436                //printf("Waiting to join with perpkt #%d\n", i);
1437                assert(pthread_join(libtrace->perpkt_threads[i].tid, NULL) == 0);
1438                //printf("Joined with perpkt #%d\n", i);
1439                // So we must do our best effort to empty the queue - so
1440                // the producer (or any other threads) don't block.
1441                libtrace_packet_t * packet;
1442                // Mark that we are no longer accepting packets
1443                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1444                libtrace->perpkt_threads[i].state = THREAD_FINISHED; // Important we are finished before we empty the buffer
1445                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1446                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
1447                        if (packet) // This could be NULL iff the perpkt finishes early
1448                                trace_destroy_packet(packet);
1449        }
1450
1451        /* Now the hasher */
1452        // XXX signal it to stop
1453        if (trace_has_dedicated_hasher(libtrace)) {
1454                fprintf(stderr, "Waiting to join with the hasher\n");
1455                pthread_join(libtrace->hasher_thread.tid, NULL);
1456                fprintf(stderr, "Joined with with the hasher\n");
1457                libtrace->hasher_thread.state = THREAD_FINISHED;
1458        }
1459
1460        // Now that everything is finished nothing can be touching our
1461        // buffers so clean them up
1462        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1463                // Its possible 1 packet got added by the reducer (or 1 per any other thread) since we cleaned up
1464                // if they lost timeslice before-during a write
1465                libtrace_packet_t * packet;
1466                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
1467                        trace_destroy_packet(packet);
1468                if (libtrace->hasher) {
1469                        assert(libtrace_ringbuffer_is_empty(&libtrace->perpkt_threads[i].rbuffer));
1470                        libtrace_ringbuffer_destroy(&libtrace->perpkt_threads[i].rbuffer);
1471                }
1472                // Cannot destroy vector yet, this happens with trace_destroy
1473        }
1474
1475        // Lets mark this as done for now
1476        libtrace->joined = true;
1477}
1478
1479// Don't use extra overhead = :( directly place in storage structure using
1480// post
1481DLLEXPORT libtrace_result_t *trace_create_result()
1482{
1483        libtrace_result_t *result = malloc(sizeof(libtrace_result_t));
1484        assert(result);
1485        result->key = 0;
1486        result->value = NULL;
1487        // TODO automatically back with a free list!!
1488        return result;
1489}
1490
1491DLLEXPORT int libtrace_thread_get_message_count(libtrace_t * libtrace)
1492{
1493        libtrace_thread_t * t = get_thread_descriptor(libtrace);
1494        assert(t);
1495        return libtrace_message_queue_count(&t->messages);
1496}
1497
1498DLLEXPORT int libtrace_thread_get_message(libtrace_t * libtrace, libtrace_message_t * message)
1499{
1500        libtrace_thread_t * t = get_thread_descriptor(libtrace);
1501        assert(t);
1502        return libtrace_message_queue_get(&t->messages, message);
1503}
1504
1505DLLEXPORT int libtrace_thread_try_get_message(libtrace_t * libtrace, libtrace_message_t * message)
1506{
1507        libtrace_thread_t * t = get_thread_descriptor(libtrace);
1508        assert(t);
1509        return libtrace_message_queue_try_get(&t->messages, message);
1510}
1511
1512/**
1513 * Return backlog indicator
1514 */
1515DLLEXPORT int trace_post_reduce(libtrace_t *libtrace)
1516{
1517        libtrace_message_t message = {0};
1518        message.code = MESSAGE_POST_REDUCE;
1519        message.additional = NULL;
1520        message.sender = get_thread_descriptor(libtrace);
1521        return libtrace_message_queue_put(&libtrace->reducer_thread.messages, (void *) &message);
1522}
1523
1524/**
1525 * Return backlog indicator
1526 */
1527DLLEXPORT int trace_send_message_to_reducer(libtrace_t * libtrace, libtrace_message_t * message)
1528{
1529        //printf("Sending message code=%d to reducer\n", message->code);
1530        message->sender = get_thread_descriptor(libtrace);
1531        return libtrace_message_queue_put(&libtrace->reducer_thread.messages, message);
1532}
1533
1534/**
1535 *
1536 */
1537DLLEXPORT int trace_send_message_to_thread(libtrace_t * libtrace, libtrace_thread_t *t, libtrace_message_t * message)
1538{
1539        //printf("Sending message code=%d to reducer\n", message->code);
1540        message->sender = get_thread_descriptor(libtrace);
1541        return libtrace_message_queue_put(&t->messages, message);
1542}
1543
1544DLLEXPORT int trace_send_message_to_perpkts(libtrace_t * libtrace, libtrace_message_t * message)
1545{
1546        int i;
1547        message->sender = get_thread_descriptor(libtrace);
1548        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1549                libtrace_message_queue_put(&libtrace->perpkt_threads[i].messages, message);
1550        }
1551        //printf("Sending message code=%d to reducer\n", message->code);
1552        return 0;
1553}
1554
1555DLLEXPORT void libtrace_result_set_key(libtrace_result_t * result, uint64_t key) {
1556        result->key = key;
1557}
1558DLLEXPORT uint64_t libtrace_result_get_key(libtrace_result_t * result) {
1559        return result->key;
1560}
1561DLLEXPORT void libtrace_result_set_value(libtrace_result_t * result, void * value) {
1562        result->value = value;
1563}
1564DLLEXPORT void* libtrace_result_get_value(libtrace_result_t * result) {
1565        return result->value;
1566}
1567DLLEXPORT void libtrace_result_set_key_value(libtrace_result_t * result, uint64_t key, void * value) {
1568        result->key = key;
1569        result->value = value;
1570}
1571DLLEXPORT void trace_destroy_result(libtrace_result_t ** result) {
1572        free(*result);
1573        result = NULL;
1574        // TODO automatically back with a free list!!
1575}
1576
1577DLLEXPORT void * trace_get_global(libtrace_t *trace)
1578{
1579        return trace->global_blob;
1580}
1581
1582DLLEXPORT void * trace_set_global(libtrace_t *trace, void * data)
1583{
1584        if (trace->global_blob && trace->global_blob != data) {
1585                void * ret = trace->global_blob;
1586                trace->global_blob = data;
1587                return ret;
1588        } else {
1589                trace->global_blob = data;
1590                return NULL;
1591        }
1592}
1593
1594DLLEXPORT void * trace_get_tls(libtrace_thread_t *t)
1595{
1596        return t->user_data;
1597}
1598
1599DLLEXPORT void * trace_set_tls(libtrace_thread_t *t, void * data)
1600{
1601        if(t->user_data && t->user_data != data) {
1602                void *ret = t->user_data;
1603                t->user_data = data;
1604                return ret;
1605        } else {
1606                t->user_data = data;
1607                return NULL;
1608        }
1609}
1610
1611/**
1612 * Note: This function grabs a lock and expects trace_update_inprogress_result
1613 * to be called to release the lock.
1614 *
1615 * Expected to be used in trace-time situations to allow a result to be pending
1616 * a publish that can be taken by the reducer before publish if it wants to
1617 * publish a result. Such as publish a result every second but a queue hasn't
1618 * processed a packet (or is overloaded) and hasn't published yet.
1619 *
1620 * Currently this only supports a single temporary result,
1621 * as such if a key is different to the current temporary result the existing
1622 * one will be published and NULL returned.
1623 */
1624DLLEXPORT void * trace_retrive_inprogress_result(libtrace_t *libtrace, uint64_t key)
1625{
1626        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
1627        libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
1628
1629        assert (pthread_spin_lock(&t->tmp_spinlock) == 0);
1630        if (t->tmp_key != key) {
1631                if (t->tmp_data) {
1632                        //printf("publising data key=%"PRIu64"\n", t->tmp_key);
1633                        trace_publish_result(libtrace, t->tmp_key, t->tmp_data);
1634                }
1635                t->tmp_data = NULL;
1636                t->tmp_key = key;
1637        }
1638        return t->tmp_data;
1639}
1640
1641/**
1642 * Updates a temporary result and releases the lock previously grabbed by trace_retrive_inprogress_result
1643 */
1644DLLEXPORT void trace_update_inprogress_result(libtrace_t *libtrace, uint64_t key, void * value)
1645{
1646        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
1647        libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
1648        if (t->tmp_key != key) {
1649                if (t->tmp_data) {
1650                        printf("BAD publihsing data key=%"PRIu64"\n", t->tmp_key);
1651                        trace_publish_result(libtrace, t->tmp_key, t->tmp_data);
1652                }
1653                t->tmp_key = key;
1654        }
1655        t->tmp_data = value;
1656        assert (pthread_spin_unlock(&t->tmp_spinlock) == 0);
1657}
1658
1659/**
1660 * Publish to the reduce queue, return
1661 */
1662DLLEXPORT void trace_publish_result(libtrace_t *libtrace, uint64_t key, void * value) {
1663        libtrace_result_t res;
1664        res.is_packet = 0;
1665        // Who am I???
1666        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
1667        libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
1668        // Now put it into my table
1669        static __thread int count = 0;
1670
1671
1672        libtrace_result_set_key_value(&res, key, value);
1673        /*
1674        if (count == 1)
1675                printf("My vector size is %d\n", libtrace_vector_get_size(&t->vector));
1676        count = (count+1) %1000;
1677        libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :)
1678        */
1679        /*if (count == 1)
1680                printf("My vector size is %d\n", libtrace_deque_get_size(&t->deque));
1681        count = (count+1)%1000;*/
1682        if (libtrace->reducer_flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED)) {
1683                if (libtrace_deque_get_size(&t->deque) >= 800) {
1684                        trace_post_reduce(libtrace);
1685                }
1686                //while (libtrace_deque_get_size(&t->deque) >= 1000)
1687                //      sched_yield();
1688                libtrace_deque_push_back(&t->deque, &res); // Automatically locking for us :)
1689        } else {
1690                //while (libtrace_vector_get_size(&t->vector) >= 1000)
1691                //      sched_yield();
1692
1693                if (libtrace_vector_get_size(&t->vector) >= 800) {
1694                        trace_post_reduce(libtrace);
1695                }
1696                libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :)
1697        }
1698}
1699
1700DLLEXPORT void trace_publish_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
1701        libtrace_result_t res;
1702        // Who am I???
1703        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
1704        libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
1705        // Now put it into my table
1706        static __thread int count = 0;
1707
1708        res.is_packet = 1;
1709        libtrace_result_set_key_value(&res, trace_packet_get_order(packet), packet);
1710        /*
1711        if (count == 1)
1712                printf("My vector size is %d\n", libtrace_vector_get_size(&t->vector));
1713        count = (count+1) %1000;
1714        libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :)
1715        */
1716        /*if (count == 1)
1717                printf("My vector size is %d\n", libtrace_deque_get_size(&t->deque));
1718        count = (count+1)%1000;*/
1719        if (libtrace->reducer_flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED)) {
1720                if (libtrace_deque_get_size(&t->deque) >= 800) {
1721                        trace_post_reduce(libtrace);
1722                }
1723                //while (libtrace_deque_get_size(&t->deque) >= 1000)
1724                //      sched_yield();
1725                libtrace_deque_push_back(&t->deque, &res); // Automatically locking for us :)
1726        } else {
1727                //while (libtrace_vector_get_size(&t->vector) >= 1000)
1728                //      sched_yield();
1729
1730                if (libtrace_vector_get_size(&t->vector) >= 800) {
1731                        trace_post_reduce(libtrace);
1732                }
1733                libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :)
1734        }
1735}
1736
1737
1738static int compareres(const void* p1, const void* p2)
1739{
1740        if (libtrace_result_get_key((libtrace_result_t *) p1) < libtrace_result_get_key((libtrace_result_t *) p2))
1741                return -1;
1742        if (libtrace_result_get_key((libtrace_result_t *) p1) == libtrace_result_get_key((libtrace_result_t *) p2))
1743                return 0;
1744        else
1745                return 1;
1746}
1747
1748/* Retrieves all results with the key requested from the temporary result
1749 * holding zone.
1750 */
1751DLLEXPORT int trace_get_results_check_temp(libtrace_t *libtrace, libtrace_vector_t *results, uint64_t key)
1752{
1753        int i;
1754
1755        libtrace_vector_empty(results);
1756        // Check all of the temp queues
1757        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1758                libtrace_result_t r = {0,0};
1759                assert (pthread_spin_lock(&libtrace->perpkt_threads[i].tmp_spinlock) == 0);
1760                if (libtrace->perpkt_threads[i].tmp_key == key) {
1761                        libtrace_result_set_key_value(&r, key, libtrace->perpkt_threads[i].tmp_data);
1762                        libtrace->perpkt_threads[i].tmp_data = NULL;
1763                        printf("Found in temp queue\n");
1764                }
1765                assert (pthread_spin_unlock(&libtrace->perpkt_threads[i].tmp_spinlock) == 0);
1766                if (libtrace_result_get_value(&r)) {
1767                        // Got a result still in temporary
1768                        printf("Publishing from temp queue\n");
1769                        libtrace_vector_push_back(results, &r);
1770                } else {
1771                        // This might be waiting on the actual queue
1772                        libtrace_queue_t *v = &libtrace->perpkt_threads[i].deque;
1773                        if (libtrace_deque_peek_front(v, (void *) &r) &&
1774                                        libtrace_result_get_value(&r)) {
1775                                assert (libtrace_deque_pop_front(&libtrace->perpkt_threads[i].deque, (void *) &r) == 1);
1776                                printf("Found in real queue\n");
1777                                libtrace_vector_push_back(results, &r);
1778                        } // else no data (probably means no packets)
1779                        else {
1780                                printf("Result missing in real queue\n");
1781                        }
1782                }
1783        }
1784        //printf("Loop done yo, that means we've got #%d results to print fool!\n", libtrace_vector_get_size(results));
1785        return libtrace_vector_get_size(results);
1786}
1787
1788DLLEXPORT int trace_get_results(libtrace_t *libtrace, libtrace_vector_t * results) {
1789        int i;
1790        int flags = libtrace->reducer_flags; // Hint these aren't a changing
1791
1792        libtrace_vector_empty(results);
1793
1794        /* Here we assume queues are in order ascending order and they want
1795         * the smallest result first. If they are not in order the results
1796         * may not be in order.
1797         */
1798        if (flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED)) {
1799                int live_count = 0;
1800                bool live[libtrace->perpkt_thread_count]; // Set if a trace is alive
1801                uint64_t key[libtrace->perpkt_thread_count]; // Cached keys
1802                uint64_t min_key = UINT64_MAX; // XXX use max int here stdlimit.h?
1803                int min_queue = -1;
1804
1805                /* Loop through check all are alive (have data) and find the smallest */
1806                for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1807                        libtrace_queue_t *v = &libtrace->perpkt_threads[i].deque;
1808                        if (libtrace_deque_get_size(v) != 0) {
1809                                libtrace_result_t r;
1810                                libtrace_deque_peek_front(v, (void *) &r);
1811                                live_count++;
1812                                live[i] = 1;
1813                                key[i] = libtrace_result_get_key(&r);
1814                                if (i==0 || min_key > key[i]) {
1815                                        min_key = key[i];
1816                                        min_queue = i;
1817                                }
1818                        } else {
1819                                live[i] = 0;
1820                        }
1821                }
1822
1823                /* Now remove the smallest and loop - special case if all threads have joined we always flush whats left */
1824                while ((live_count == libtrace->perpkt_thread_count) || (live_count &&
1825                                ((flags & REDUCE_SEQUENTIAL && min_key == libtrace->expected_key) ||
1826                                libtrace->joined))) {
1827                        /* Get the minimum queue and then do stuff */
1828                        libtrace_result_t r;
1829
1830                        assert (libtrace_deque_pop_front(&libtrace->perpkt_threads[min_queue].deque, (void *) &r) == 1);
1831                        libtrace_vector_push_back(results, &r);
1832
1833                        // We expect the key we read +1 now
1834                        libtrace->expected_key = key[min_queue] + 1;
1835
1836                        // Now update the one we just removed
1837                        if (libtrace_deque_get_size(&libtrace->perpkt_threads[min_queue].deque) )
1838                        {
1839                                libtrace_deque_peek_front(&libtrace->perpkt_threads[min_queue].deque, (void *) &r);
1840                                key[min_queue] = libtrace_result_get_key(&r);
1841                                if (key[min_queue] <= min_key) {
1842                                        // We are still the smallest, might be out of order though :(
1843                                        min_key = key[min_queue];
1844                                } else {
1845                                        min_key = key[min_queue]; // Update our minimum
1846                                        // Check all find the smallest again - all are alive
1847                                        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1848                                                if (live[i] && min_key > key[i]) {
1849                                                        min_key = key[i];
1850                                                        min_queue = i;
1851                                                }
1852                                        }
1853                                }
1854                        } else {
1855                                live[min_queue] = 0;
1856                                live_count--;
1857                                min_key = UINT64_MAX; // Update our minimum
1858                                // Check all find the smallest again - all are alive
1859                                for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1860                                        // Still not 100% TODO (what if order is wrong or not increasing)
1861                                        if (live[i] && min_key >= key[i]) {
1862                                                min_key = key[i];
1863                                                min_queue = i;
1864                                        }
1865                                }
1866                        }
1867                }
1868        } else { // Queues are not in order - return all results in the queue
1869                for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1870                        libtrace_vector_append(results, &libtrace->perpkt_threads[i].vector);
1871                }
1872                if (flags & REDUCE_SORT) {
1873                        qsort(results->elements, results->size, results->element_size, &compareres);
1874                }
1875        }
1876        return libtrace_vector_get_size(results);
1877}
1878
1879DLLEXPORT uint64_t trace_packet_get_order(libtrace_packet_t * packet) {
1880        return packet->order;
1881}
1882
1883DLLEXPORT uint64_t trace_packet_get_hash(libtrace_packet_t * packet) {
1884        return packet->hash;
1885}
1886
1887DLLEXPORT void trace_packet_set_order(libtrace_packet_t * packet, uint64_t order) {
1888        packet->order = order;
1889}
1890
1891DLLEXPORT void trace_packet_set_hash(libtrace_packet_t * packet, uint64_t hash) {
1892        packet->hash = hash;
1893}
1894
1895DLLEXPORT int trace_finished(libtrace_t * libtrace) {
1896        int i;
1897        int b = 0;
1898        // TODO I don't like using this so much
1899        //assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1900        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1901                if (libtrace->perpkt_threads[i].state == THREAD_RUNNING)
1902                        b++;
1903        }
1904        //assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1905        return !b;
1906}
1907
1908DLLEXPORT int trace_parallel_config(libtrace_t *libtrace, trace_parallel_option_t option, void *value)
1909{
1910        int ret = -1;
1911        switch (option) {
1912                case TRACE_OPTION_SET_HASHER:
1913                        return trace_set_hasher(libtrace, (enum hasher_types) *((int *) value), NULL, NULL);
1914                case TRACE_OPTION_SET_PERPKT_BUFFER_SIZE:
1915                        libtrace->perpkt_buffer_size = *((int *) value);
1916                        return 1;
1917                case TRACE_OPTION_SET_PACKET_FREELIST_SIZE:
1918                        libtrace->packet_freelist_size = *((int *) value);
1919                        return 1;
1920                case TRACE_OPTION_SET_PERPKT_THREAD_COUNT:
1921                        libtrace->perpkt_thread_count = *((int *) value);
1922                        return 1;
1923                case TRACE_DROP_OUT_OF_ORDER:
1924                        if (*((int *) value))
1925                                libtrace->reducer_flags |= REDUCE_DROP_OOO;
1926                        else
1927                                libtrace->reducer_flags &= ~REDUCE_DROP_OOO;
1928                        return 1;
1929                case TRACE_OPTION_SEQUENTIAL:
1930                        if (*((int *) value))
1931                                libtrace->reducer_flags |= REDUCE_SEQUENTIAL;
1932                        else
1933                                libtrace->reducer_flags &= ~REDUCE_SEQUENTIAL;
1934                        return 1;
1935                case TRACE_OPTION_ORDERED:
1936                        if (*((int *) value))
1937                                libtrace->reducer_flags |= REDUCE_ORDERED;
1938                        else
1939                                libtrace->reducer_flags &= ~REDUCE_ORDERED;
1940                        return 1;
1941                case TRACE_OPTION_USE_DEDICATED_HASHER:
1942                        if (*((int *) value))
1943                                libtrace->hasher_thread.type = THREAD_HASHER;
1944                        else
1945                                libtrace->hasher_thread.type = THREAD_EMPTY;
1946                        return 1;
1947                case TRACE_OPTION_USE_SLIDING_WINDOW_BUFFER:
1948                        if (*((int *) value))
1949                                libtrace->reducer_flags |= PERPKT_USE_SLIDING_WINDOW;
1950                        else
1951                                libtrace->reducer_flags &= ~PERPKT_USE_SLIDING_WINDOW;
1952                        return 1;
1953                case TRACE_OPTION_TRACETIME:
1954                        if(*((int *) value))
1955                                libtrace->tracetime = 1;
1956                        else
1957                                libtrace->tracetime = 0;
1958                        return 0;
1959        }
1960        return 0;
1961}
1962
1963DLLEXPORT libtrace_packet_t* trace_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet) {
1964        libtrace_packet_t* result;
1965        if (!libtrace_ringbuffer_try_sread_bl(&libtrace->packet_freelist, (void **) &result))
1966                result = trace_create_packet();
1967        assert(result);
1968        swap_packets(result, packet); // Move the current packet into our copy
1969        return result;
1970}
1971
1972DLLEXPORT void trace_free_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet) {
1973        // Try write back the packet
1974        assert(packet);
1975        // Always release any resources this might be holding such as a slot in a ringbuffer
1976        trace_fin_packet(packet);
1977        if (!libtrace_ringbuffer_try_swrite_bl(&libtrace->packet_freelist, packet)) {
1978                /* We couldn't, oh well lets just destroy it - XXX consider non managed formats i.e. rings buffers loosing packets and jamming up :( */
1979                //assert(1 == 90);
1980                trace_destroy_packet(packet);
1981        }
1982}
Note: See TracBrowser for help on using the repository browser.