source: lib/trace_parallel.c @ 18961094

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 18961094 was 18961094, checked in by Richard Sanger <rsangerarj@…>, 7 years ago

Removes the DEDICATED HASHER option, since this is always automatically inferred from the settings.

  • Property mode set to 100644
File size: 66.0 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * All rights reserved.
8 *
9 * This code has been developed by the University of Waikato WAND
10 * research group. For further information please see http://www.wand.net.nz/
11 *
12 * libtrace is free software; you can redistribute it and/or modify
13 * it under the terms of the GNU General Public License as published by
14 * the Free Software Foundation; either version 2 of the License, or
15 * (at your option) any later version.
16 *
17 * libtrace is distributed in the hope that it will be useful,
18 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 * GNU General Public License for more details.
21 *
22 * You should have received a copy of the GNU General Public License
23 * along with libtrace; if not, write to the Free Software
24 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25 *
26 * $Id$
27 *
28 */
29
30
31#define _GNU_SOURCE
32#include "common.h"
33#include "config.h"
34#include <assert.h>
35#include <errno.h>
36#include <fcntl.h>
37#include <stdio.h>
38#include <stdlib.h>
39#include <string.h>
40#include <sys/stat.h>
41#include <sys/types.h>
42#ifndef WIN32
43#include <sys/socket.h>
44#endif
45#include <stdarg.h>
46#include <sys/param.h>
47
48#ifdef HAVE_LIMITS_H
49#  include <limits.h>
50#endif
51
52#ifdef HAVE_SYS_LIMITS_H
53#  include <sys/limits.h>
54#endif
55
56#ifdef HAVE_NET_IF_ARP_H
57#  include <net/if_arp.h>
58#endif
59
60#ifdef HAVE_NET_IF_H
61#  include <net/if.h>
62#endif
63
64#ifdef HAVE_NETINET_IN_H
65#  include <netinet/in.h>
66#endif
67
68#ifdef HAVE_NET_ETHERNET_H
69#  include <net/ethernet.h>
70#endif
71
72#ifdef HAVE_NETINET_IF_ETHER_H
73#  include <netinet/if_ether.h>
74#endif
75
76#include <time.h>
77#ifdef WIN32
78#include <sys/timeb.h>
79#endif
80
81#include "libtrace.h"
82#include "libtrace_int.h"
83
84#ifdef HAVE_PCAP_BPF_H
85#  include <pcap-bpf.h>
86#else
87#  ifdef HAVE_NET_BPF_H
88#    include <net/bpf.h>
89#  endif
90#endif
91
92
93#include "libtrace_int.h"
94#include "format_helper.h"
95#include "rt_protocol.h"
96#include "hash_toeplitz.h"
97
98#include <pthread.h>
99#include <signal.h>
100#include <unistd.h>
101
102extern int libtrace_parallel;
103
104struct multithreading_stats {
105        uint64_t full_queue_hits;
106        uint64_t wait_for_fill_complete_hits;
107} contention_stats[1024];
108
109inline int trace_has_dedicated_hasher(libtrace_t * libtrace);
110
111/**
112 * @return True if the format supports parallel threads.
113 */
114static inline bool trace_supports_parallel(libtrace_t *trace)
115{
116        assert(trace);
117        assert(trace->format);
118        if (trace->format->pstart_input)
119                return true;
120        else
121                return false;
122        //return trace->format->pstart_input;
123}
124
125DLLEXPORT void print_contention_stats(libtrace_t *libtrace) {
126        int i;
127        struct multithreading_stats totals = {0};
128        for (i = 0; i < libtrace->perpkt_thread_count ; i++) {
129                fprintf(stderr, "\nStats for perpkt thread#%d\n", i);
130                fprintf(stderr, "\tfull_queue_hits: %"PRIu64"\n", contention_stats[i].full_queue_hits);
131                totals.full_queue_hits += contention_stats[i].full_queue_hits;
132                fprintf(stderr, "\twait_for_fill_complete_hits: %"PRIu64"\n", contention_stats[i].wait_for_fill_complete_hits);
133                totals.wait_for_fill_complete_hits += contention_stats[i].wait_for_fill_complete_hits;
134        }
135        fprintf(stderr, "\nTotals for perpkt threads\n");
136        fprintf(stderr, "\tfull_queue_hits: %"PRIu64"\n", totals.full_queue_hits);
137        fprintf(stderr, "\twait_for_fill_complete_hits: %"PRIu64"\n", totals.wait_for_fill_complete_hits);
138
139        return;
140}
141
142void libtrace_zero_thread(libtrace_thread_t * t) {
143        t->trace = NULL;
144        t->ret = NULL;
145        t->type = THREAD_EMPTY;
146        libtrace_zero_ringbuffer(&t->rbuffer);
147        libtrace_zero_vector(&t->vector);
148        libtrace_zero_deque(&t->deque);
149        t->recorded_first = false;
150        t->perpkt_num = -1;
151}
152
153// Ints are aligned int is atomic so safe to read and write at same time
154// However write must be locked, read doesn't (We never try read before written to table)
155libtrace_thread_t * get_thread_table(libtrace_t *libtrace) {
156        int i = 0;
157        pthread_t tid = pthread_self();
158
159        for (;i<libtrace->perpkt_thread_count ;++i) {
160                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
161                        return &libtrace->perpkt_threads[i];
162        }
163        return NULL;
164}
165
166int get_thread_table_num(libtrace_t *libtrace) {
167        int i = 0;
168        pthread_t tid = pthread_self();
169        for (;i<libtrace->perpkt_thread_count; ++i) {
170                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
171                        return i;
172        }
173        return -1;
174}
175
176static libtrace_thread_t * get_thread_descriptor(libtrace_t *libtrace) {
177        libtrace_thread_t *ret;
178        if (!(ret = get_thread_table(libtrace))) {
179                pthread_t tid = pthread_self();
180                // Check if we are reducer or something else
181                if (pthread_equal(tid, libtrace->reducer_thread.tid))
182                        ret = &libtrace->reducer_thread;
183                else if (pthread_equal(tid, libtrace->hasher_thread.tid))
184                        ret = &libtrace->hasher_thread;
185                else
186                        ret = NULL;
187        }
188        return ret;
189}
190
191/** Used below in trace_make_results_packets_safe*/
192static void do_copy_result_packet(void *data)
193{
194        libtrace_result_t *res = (libtrace_result_t *)data;
195        if (res->is_packet) {
196                // Duplicate the packet in standard malloc'd memory and free the
197                // original
198                libtrace_packet_t *oldpkt, *dup;
199                oldpkt = (libtrace_packet_t *) res->value;
200                dup = trace_copy_packet(oldpkt);
201                res->value = (void *)dup;
202                trace_destroy_packet(oldpkt);
203                fprintf(stderr, "Made a packet safe!!\n");
204        }
205}
206
207/**
208 * Make a safe replacement copy of any result packets that are owned
209 * by the format in the result queue. Used when pausing traces.
210 */ 
211static void trace_make_results_packets_safe(libtrace_t *trace) {
212        libtrace_thread_t *t = get_thread_descriptor(trace);
213        if (trace->reducer_flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED))
214                libtrace_deque_apply_function(&t->deque, &do_copy_result_packet);
215        else 
216                libtrace_vector_apply_function(&t->vector, &do_copy_result_packet);
217}
218
219/**
220 * Holds threads in a paused state, until released by broadcasting
221 * the condition mutex.
222 */
223static void trace_thread_pause(libtrace_t *trace) {
224        printf("Pausing thread #%d\n", get_thread_table_num(trace));
225        trace_make_results_packets_safe(trace);
226        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
227        trace->perpkts_pausing++;
228        pthread_cond_broadcast(&trace->perpkt_cond);
229        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
230                assert(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock) == 0);
231        }
232        trace->perpkts_pausing--;
233        pthread_cond_broadcast(&trace->perpkt_cond);
234        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
235        printf("Releasing thread #%d\n", get_thread_table_num(trace));
236}
237
238/**
239 * The is the entry point for our packet processing threads.
240 */
241static void* perpkt_threads_entry(void *data) {
242        libtrace_t *trace = (libtrace_t *)data;
243        libtrace_thread_t * t;
244        libtrace_message_t message = {0};
245        libtrace_packet_t *packet = NULL;
246
247        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
248        t = get_thread_table(trace);
249        assert(t);
250        //printf("Yay Started perpkt thread #%d\n", (int) get_thread_table_num(trace));
251        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
252
253        /* ~~~~~~~~~~~ Setup complete now we loop ~~~~~~~~~~~~~~~ */
254        // Send a message to say we've started
255
256        message.code = MESSAGE_STARTED;
257        message.sender = t;
258
259        // Let the per_packet function know we have started
260        (*trace->per_pkt)(trace, NULL, &message, t);
261
262
263        for (;;) {
264                int psize;
265
266                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
267                        switch (message.code) {
268                                case MESSAGE_DO_PAUSE: // This is internal
269                                        // Send message to say we are pausing, TODO consider sender
270                                        message.code = MESSAGE_PAUSING;
271                                        (*trace->per_pkt)(trace, NULL, &message, t);
272                                        // If a hasher thread is running empty input queues so we don't loose data
273                                        if (trace_has_dedicated_hasher(trace)) {
274                                                fprintf(stderr, "Trace is using a hasher thread emptying queues\n");
275                                                // The hasher has stopped by this point, so the queue shouldn't be filling
276                                                while(!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
277                                                        psize = trace_pread_packet(trace, &packet);
278                                                        if (psize > 0) {
279                                                                packet = (*trace->per_pkt)(trace, packet, NULL, t);
280                                                        } else {
281                                                                fprintf(stderr, "Psize=%d empty=%d this is probably EOF or message waiting, but if this continues we have a bug!!!\n", psize, libtrace_ringbuffer_is_empty(&t->rbuffer));
282                                                        }
283                                                }
284                                        }
285                                        // Send a paused message as a final chance to memory copy any packets
286                                        message.code = MESSAGE_PAUSED;
287                                        (*trace->per_pkt)(trace, NULL, &message, t);
288                                        // Now we do the actual pause, this returns when we are done
289                                        trace_thread_pause(trace);
290                                        // Check for new messages as soon as we return
291                                        continue;
292                                case MESSAGE_DO_STOP: // This is internal
293                                        goto stop;
294                        }
295                        (*trace->per_pkt)(trace, NULL, &message, t);
296                        continue;
297                }
298
299                if (trace->perpkt_thread_count == 1) {
300                        if (!packet) {
301                                if (!libtrace_ringbuffer_try_sread_bl(&trace->packet_freelist, (void **) &packet))
302                                        packet = trace_create_packet();
303                        }
304                        assert(packet);
305                        if ((psize = trace_read_packet(trace, packet)) <1) {
306                                break;
307                        }
308                } else {
309                        psize = trace_pread_packet(trace, &packet);
310                }
311
312                if (psize > 0) {
313                        packet = (*trace->per_pkt)(trace, packet, NULL, t);
314                        continue;
315                }
316
317                if (psize == -2)
318                        continue; // We have a message
319
320                if (psize < 1) { // consider sending a message
321                        break;
322                }
323
324        }
325
326
327stop:
328        /* ~~~~~~~~~~~~~~ Trace is finished do tear down ~~~~~~~~~~~~~~~~~~~~~ */
329        // Let the per_packet function know we have stopped
330        message.code = MESSAGE_STOPPED;
331        message.sender = NULL;
332        message.additional.uint64 = 0;
333        (*trace->per_pkt)(trace, NULL, &message, t);
334
335        // Free our last packet
336        if (packet)
337                trace_destroy_packet(packet);
338
339        // And we're at the end free the memories
340        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
341        t->state = THREAD_FINISHED;
342        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
343
344        // Notify only after we've defiantly set the state to finished
345        message.code = MESSAGE_PERPKT_ENDED;
346        message.additional.uint64 = 0;
347        trace_send_message_to_reducer(trace, &message);
348
349        pthread_exit(NULL);
350};
351
352/** True if trace has dedicated hasher thread otherwise false */
353inline int trace_has_dedicated_hasher(libtrace_t * libtrace)
354{
355        return libtrace->hasher_thread.type == THREAD_HASHER;
356}
357
358/**
359 * The start point for our single threaded hasher thread, this will read
360 * and hash a packet from a data source and queue it against the correct
361 * core to process it.
362 */
363static void* hasher_start(void *data) {
364        libtrace_t *trace = (libtrace_t *)data;
365        libtrace_thread_t * t;
366        int i;
367        libtrace_packet_t * packet;
368        libtrace_message_t message = {0};
369
370        assert(trace_has_dedicated_hasher(trace));
371        /* Wait until all threads are started and objects are initialised (ring buffers) */
372        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
373        t = &trace->hasher_thread;
374        assert(t->type == THREAD_HASHER && pthread_equal(pthread_self(), t->tid));
375        printf("Hasher Thread started\n");
376        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
377        int pkt_skipped = 0;
378        /* Read all packets in then hash and queue against the correct thread */
379        while (1) {
380                int thread;
381                if (!pkt_skipped && !libtrace_ringbuffer_try_sread_bl(&trace->packet_freelist, (void **) &packet))
382                        packet = trace_create_packet();
383                assert(packet);
384
385                if (libtrace_halt) // Signal to die has been sent - TODO
386                        break;
387
388                // Check for messages that we expect MESSAGE_DO_PAUSE, (internal messages only)
389                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
390                        switch(message.code) {
391                                case MESSAGE_DO_PAUSE:
392                                        fprintf(stderr, "Pausing hasher thread\n");
393                                        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
394                                        trace->perpkts_pausing++;
395                                        pthread_cond_broadcast(&trace->perpkt_cond);
396                                        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
397                                                assert(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock) == 0);
398                                        }
399                                        // trace->perpkts_pausing--; // Don't do this the pausing thread will do this for us
400                                        pthread_cond_broadcast(&trace->perpkt_cond);
401                                        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
402                                        fprintf(stdout, "Mapper resuming\n");
403                                        break;
404                                case MESSAGE_DO_STOP:
405                                        // Stop called after pause
406                                        assert(trace->started == false);
407                                        assert(trace->state == STATE_FINSHED);
408                                default:
409                                        fprintf(stderr, "Hasher thread didn't expect message code=%d\n", message.code);
410                        }
411                        pkt_skipped = 1;
412                        continue;
413                }
414
415                if ((packet->error = trace_read_packet(trace, packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
416                        break; /* We are EOF or error'd either way we stop  */
417                }
418
419                /* We are guaranteed to have a hash function i.e. != NULL */
420                trace_packet_set_hash(packet, (*trace->hasher)(packet, trace->hasher_data));
421                thread = trace_packet_get_hash(packet) % trace->perpkt_thread_count;
422                /* Blocking write to the correct queue - I'm the only writer */
423                if (trace->perpkt_threads[thread].state != THREAD_FINISHED) {
424                        libtrace_ringbuffer_write(&trace->perpkt_threads[thread].rbuffer, packet);
425                        pkt_skipped = 0;
426                } else {
427                        pkt_skipped = 1; // Reuse that packet no one read it
428                }
429        }
430
431        /* Broadcast our last failed read to all threads */
432        for (i = 0; i < trace->perpkt_thread_count; i++) {
433                libtrace_packet_t * bcast;
434                printf("Broadcasting error/EOF now the trace is over\n");
435                if (i == trace->perpkt_thread_count - 1) {
436                        bcast = packet;
437                } else {
438                        bcast = trace_create_packet();
439                        bcast->error = packet->error;
440                }
441                assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
442                if (trace->perpkt_threads[i].state != THREAD_FINISHED) {
443                        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
444                        // Unlock early otherwise we could deadlock
445                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, bcast);
446                } else {
447                        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
448                }
449        }
450        // We dont need to free packet
451
452        // And we're at the end free the memories
453        t->state = THREAD_FINISHED;
454
455        // Notify only after we've defiantly set the state to finished
456        message.code = MESSAGE_PERPKT_ENDED;
457        message.additional.uint64 = 0;
458        trace_send_message_to_reducer(trace, &message);
459
460        // TODO remove from TTABLE t sometime
461        pthread_exit(NULL);
462};
463
464/**
465 * Moves src into dest(Complete copy) and copies the memory buffer and
466 * its flags from dest into src ready for reuse without needing extra mallocs.
467 */
468static inline void swap_packets(libtrace_packet_t *dest, libtrace_packet_t *src) {
469        // Save the passed in buffer status
470        assert(dest->trace == NULL); // Must be a empty packet
471        void * temp_buf = dest->buffer;
472        buf_control_t temp_buf_control = dest->buf_control;
473        // Completely copy StoredPacket into packet
474        memcpy(dest, src, sizeof(libtrace_packet_t));
475        // Set the buffer settings on the returned packet
476        src->buffer = temp_buf;
477        src->buf_control = temp_buf_control;
478        src->trace = NULL;
479}
480
481/* Our simplest case when a thread becomes ready it can obtain a exclusive
482 * lock to read a packet from the underlying trace.
483 */
484inline static int trace_pread_packet_first_in_first_served(libtrace_t *libtrace, libtrace_packet_t **packet)
485{
486        // We need this to fill the 'first' packet table
487        libtrace_thread_t *t = get_thread_table(libtrace);
488        if (!*packet) {
489                if (!libtrace_ringbuffer_try_sread_bl(&libtrace->packet_freelist, (void **) packet))
490                        *packet = trace_create_packet();
491        }
492        assert(*packet);
493        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
494        /* Read a packet */
495        (*packet)->error = trace_read_packet(libtrace, *packet);
496        // Doing this inside the lock ensures the first packet is always
497        // recorded first
498        if ((*packet)->error > 0)
499                store_first_packet(libtrace, *packet, t);
500
501        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
502        return (*packet)->error;
503}
504
505/**
506 * For the case that we have a dedicated hasher thread
507 * 1. We read a packet from our buffer
508 * 2. Move that into the packet provided (packet)
509 */
510inline static int trace_pread_packet_hasher_thread(libtrace_t *libtrace, libtrace_packet_t **packet)
511{
512        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
513        libtrace_thread_t* t = &libtrace->perpkt_threads[this_thread];
514
515        if (*packet) // Recycle the old get the new
516                if (!libtrace_ringbuffer_try_swrite_bl(&libtrace->packet_freelist, (void *) *packet))
517                        trace_destroy_packet(*packet);
518        *packet = libtrace_ringbuffer_read(&t->rbuffer);
519
520        if (*packet) {
521                return (*packet)->error;
522        } else {
523                // This is how we do a notify, we send a message before hand to note that the trace is over etc.
524                // And this will notify the perpkt thread to read that message, this is easiest
525                // since cases like pause can also be dealt with this way without actually
526                // having to be the end of the stream.
527                fprintf(stderr, "Got a NULL packet from hasher therefore message waiting\n");
528                return -2;
529        }
530}
531
532/**
533 * Tries to read from our queue and returns 1 if a packet was retrieved
534 */
535static inline int try_waiting_queue(libtrace_t *libtrace, libtrace_thread_t * t, libtrace_packet_t **packet, int * ret)
536{
537        libtrace_packet_t* retrived_packet;
538
539        /* Lets see if we have one waiting */
540        if (libtrace_ringbuffer_try_read(&t->rbuffer, (void **) &retrived_packet)) {
541                /* Copy paste from trace_pread_packet_hasher_thread() except that we try read (non-blocking) */
542                assert(retrived_packet);
543
544                if (*packet) // Recycle the old get the new
545                        if (!libtrace_ringbuffer_try_swrite_bl(&libtrace->packet_freelist, (void *) *packet))
546                                trace_destroy_packet(*packet);
547                *packet = retrived_packet;
548                *ret = (*packet)->error;
549                return 1;
550        }
551        return 0;
552}
553
554/**
555 * Allows us to ensure all threads are finished writing to our threads ring_buffer
556 * before returning EOF/error.
557 */
558inline static int trace_handle_finishing_perpkt(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t)
559{
560        /* We are waiting for the condition that another thread ends to check
561         * our queue for new data, once all threads end we can go to finished */
562        bool complete = false;
563        int ret;
564
565        do {
566                // Wait for a thread to end
567                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
568
569                // Check before
570                if (libtrace->perpkts_finishing == libtrace->perpkt_thread_count) {
571                        complete = true;
572                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
573                        continue;
574                }
575
576                assert(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock) == 0);
577
578                // Check after
579                if (libtrace->perpkts_finishing == libtrace->perpkt_thread_count) {
580                        complete = true;
581                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
582                        continue;
583                }
584
585                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
586
587                // Always trying to keep our buffer empty for the unlikely case more threads than buffer space want to write into our queue
588                if(try_waiting_queue(libtrace, t, packet, &ret))
589                        return ret;
590        } while (!complete);
591
592        // We can only end up here once all threads complete
593        try_waiting_queue(libtrace, t, packet, &ret);
594
595        return ret;
596        // TODO rethink this logic fix bug here
597}
598
599/**
600 * Expects the libtrace_lock to not be held
601 */
602inline static int trace_finish_perpkt(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t)
603{
604        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
605        t->state = THREAD_FINISHING;
606        libtrace->perpkts_finishing++;
607        pthread_cond_broadcast(&libtrace->perpkt_cond);
608        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
609        return trace_handle_finishing_perpkt(libtrace, packet, t);
610}
611
612/**
613 * This case is much like the dedicated hasher, except that we will become
614 * hasher if we don't have a a packet waiting.
615 *
616 * Note: This is only every used if we have are doing hashing.
617 *
618 * TODO: Can block on zero copy formats such as ring: and dpdk: if the
619 * queue sizes in total are larger than the ring size.
620 *
621 * 1. We read a packet from our buffer
622 * 2. Move that into the packet provided (packet)
623 */
624inline static int trace_pread_packet_hash_locked(libtrace_t *libtrace, libtrace_packet_t **packet)
625{
626        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
627        libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
628        int thread, ret/*, psize*/;
629
630        while (1) {
631                if(try_waiting_queue(libtrace, t, packet, &ret))
632                        return ret;
633                // Can still block here if another thread is writing to a full queue
634                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
635
636                // Its impossible for our own queue to overfill, because no one can write
637                // when we are in the lock
638                if(try_waiting_queue(libtrace, t, packet, &ret)) {
639                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
640                        return ret;
641                }
642
643                // Another thread cannot write a packet because a queue has filled up. Is it ours?
644                if (libtrace->perpkt_queue_full) {
645                        contention_stats[this_thread].wait_for_fill_complete_hits++;
646                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
647                        continue;
648                }
649
650                if (!*packet) {
651                        if (!libtrace_ringbuffer_try_sread_bl(&libtrace->packet_freelist, (void **) packet))
652                                *packet = trace_create_packet();
653                }
654                assert(*packet);
655
656                // If we fail here we can guarantee that our queue is empty (and no new data will be added because we hold the lock)
657                if (libtrace_halt || ((*packet)->error = trace_read_packet(libtrace, *packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
658                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
659                        if (libtrace_halt)
660                                return 0;
661                        else
662                                return (*packet)->error;
663                }
664
665                trace_packet_set_hash(*packet, (*libtrace->hasher)(*packet, libtrace->hasher_data));
666                thread = trace_packet_get_hash(*packet) % libtrace->perpkt_thread_count;
667                if (thread == this_thread) {
668                        // If it's this thread we must be in order because we checked the buffer once we got the lock
669                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
670                        return (*packet)->error;
671                }
672
673                if (libtrace->perpkt_threads[thread].state != THREAD_FINISHED) {
674                        while (!libtrace_ringbuffer_try_swrite_bl(&libtrace->perpkt_threads[thread].rbuffer, *packet)) {
675                                libtrace->perpkt_queue_full = true;
676                                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
677                                contention_stats[this_thread].full_queue_hits++;
678                                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
679                        }
680                        *packet = NULL;
681                        libtrace->perpkt_queue_full = false;
682                } else {
683                        /* We can get here if the user closes the thread before natural completion/or error */
684                        assert (!"packet_hash_locked() The user terminated the trace in a abnormal manner");
685                }
686                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
687        }
688}
689
690/**
691 * This case is much like the dedicated hasher, except that we will become
692 * hasher if we don't have a a packet waiting.
693 *
694 * TODO: You can loose the tail of a trace if the final thread
695 * fills its own queue and therefore breaks early and doesn't empty the sliding window.
696 *
697 * TODO: Can block on zero copy formats such as ring: and dpdk: if the
698 * queue sizes in total are larger than the ring size.
699 *
700 * 1. We read a packet from our buffer
701 * 2. Move that into the packet provided (packet)
702 */
703inline static int trace_pread_packet_sliding_window(libtrace_t *libtrace, libtrace_packet_t **packet)
704{
705        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
706        libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
707        int ret, i, thread/*, psize*/;
708
709        if (t->state == THREAD_FINISHING)
710                return trace_handle_finishing_perpkt(libtrace, packet, t);
711
712        while (1) {
713                // Check if we have packets ready
714                if(try_waiting_queue(libtrace, t, packet, &ret))
715                        return ret;
716
717                // We limit the number of packets we get to the size of the sliding window
718                // such that it is impossible for any given thread to fail to store a packet
719                assert(sem_wait(&libtrace->sem) == 0);
720                /*~~~~Single threaded read of a packet~~~~*/
721                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
722
723                /* Re-check our queue things we might have data waiting */
724                if(try_waiting_queue(libtrace, t, packet, &ret)) {
725                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
726                        assert(sem_post(&libtrace->sem) == 0);
727                        return ret;
728                }
729
730                // TODO put on *proper* condition variable
731                if (libtrace->perpkt_queue_full) {
732                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
733                        assert(sem_post(&libtrace->sem) == 0);
734                        contention_stats[this_thread].wait_for_fill_complete_hits++;
735                        continue;
736                }
737
738                if (!*packet) {
739                        if (!libtrace_ringbuffer_try_sread_bl(&libtrace->packet_freelist, (void **) packet))
740                                *packet = trace_create_packet();
741                }
742                assert(*packet);
743
744                if (libtrace_halt || ((*packet)->error = trace_read_packet(libtrace, *packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
745                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
746                        assert(sem_post(&libtrace->sem) == 0);
747                        // Finish this thread ensuring that any data written later by another thread is retrieved also
748                        if (libtrace_halt)
749                                return 0;
750                        else
751                                return trace_finish_perpkt(libtrace, packet, t);
752                }
753                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
754
755                /* ~~~~Multiple threads can run the hasher~~~~ */
756                trace_packet_set_hash(*packet, (*libtrace->hasher)(*packet, libtrace->hasher_data));
757
758                /* Yes this is correct opposite read lock for a write operation */
759                assert(pthread_rwlock_rdlock(&libtrace->window_lock) == 0);
760                if (!libtrace_slidingwindow_try_write(&libtrace->sliding_window, trace_packet_get_order(*packet), *packet))
761                        assert(!"Semaphore should stop us from ever overfilling the sliding window");
762                assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
763                *packet = NULL;
764
765                // Always try read any data from the sliding window
766                while (libtrace_slidingwindow_read_ready(&libtrace->sliding_window)) {
767                        assert(pthread_rwlock_wrlock(&libtrace->window_lock) == 0);
768                        if (libtrace->perpkt_queue_full) {
769                                // I might be the holdup in which case if I can read my queue I should do that and return
770                                if(try_waiting_queue(libtrace, t, packet, &ret)) {
771                                        assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
772                                        return ret;
773                                }
774                                assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
775                                continue;
776                        }
777                        // Read greedily as many as we can
778                        while (libtrace_slidingwindow_try_read(&libtrace->sliding_window, (void **) packet, NULL)) {
779                                thread = trace_packet_get_hash(*packet) % libtrace->perpkt_thread_count;
780                                if (libtrace->perpkt_threads[thread].state != THREAD_FINISHED) {
781                                        while (!libtrace_ringbuffer_try_swrite_bl(&libtrace->perpkt_threads[thread].rbuffer, *packet)) {
782                                                if (this_thread == thread)
783                                                {
784                                                        // TODO think about this case more because we have to stop early if this were to happen on the last read
785                                                        // before EOF/error we might not have emptied the sliding window
786                                                        printf("!~!~!~!~!~!~In this Code~!~!~!~!\n");
787                                                        // Its our queue we must have a packet to read out
788                                                        if(try_waiting_queue(libtrace, t, packet, &ret)) {
789                                                                // We must be able to write this now 100% without fail
790                                                                libtrace_ringbuffer_write(&libtrace->perpkt_threads[thread].rbuffer, *packet);
791                                                                assert(sem_post(&libtrace->sem) == 0);
792                                                                assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
793                                                                return ret;
794                                                        } else {
795                                                                assert(!"Our queue is full but I cannot read from it??");
796                                                        }
797                                                }
798                                                // Not us we have to give the other threads a chance to write there packets then
799                                                libtrace->perpkt_queue_full = true;
800                                                assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
801                                                for (i = 0; i < libtrace->perpkt_thread_count-1; i++) // Release all other threads to read there packets
802                                                        assert(sem_post(&libtrace->sem) == 0);
803
804                                                contention_stats[this_thread].full_queue_hits++;
805                                                assert(pthread_rwlock_wrlock(&libtrace->window_lock) == 0);
806                                                // Grab these back
807                                                for (i = 0; i < libtrace->perpkt_thread_count-1; i++) // Release all other threads to read there packets
808                                                        assert(sem_wait(&libtrace->sem) == 0);
809                                                libtrace->perpkt_queue_full = false;
810                                        }
811                                        assert(sem_post(&libtrace->sem) == 0);
812                                        *packet = NULL;
813                                } else {
814                                        // Cannot write to a queue if no ones waiting (I think this is unreachable)
815                                        // in the general case (unless the user ends early without proper clean up).
816                                        assert (!"unreachable code??");
817                                }
818                        }
819                        assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
820                }
821                // Now we go back to checking our queue anyways
822        }
823}
824
825
826/**
827 * For the first packet of each queue we keep a copy and note the system
828 * time it was received at.
829 *
830 * This is used for finding the first packet when playing back a trace
831 * in trace time. And can be used by real time applications to print
832 * results out every XXX seconds.
833 */
834void store_first_packet(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t)
835{
836        if (!t->recorded_first) {
837                struct timeval tv;
838                libtrace_packet_t * dup;
839                // For what it's worth we can call these outside of the lock
840                gettimeofday(&tv, NULL);
841                dup = trace_copy_packet(packet);
842                assert(pthread_spin_lock(&libtrace->first_packets.lock) == 0);
843                libtrace->first_packets.packets[t->perpkt_num].packet = dup;
844                //printf("Stored first packet time=%f\n", trace_get_seconds(dup));
845                memcpy(&libtrace->first_packets.packets[t->perpkt_num].tv, &tv, sizeof(tv));
846                // Now update the first
847                libtrace->first_packets.count++;
848                if (libtrace->first_packets.count == 1) {
849                        // We the first entry hence also the first known packet
850                        libtrace->first_packets.first = t->perpkt_num;
851                } else {
852                        // Check if we are newer than the previous 'first' packet
853                        size_t first = libtrace->first_packets.first;
854                        if (trace_get_seconds(dup) <
855                                trace_get_seconds(libtrace->first_packets.packets[first].packet))
856                                libtrace->first_packets.first = t->perpkt_num;
857                }
858                assert(pthread_spin_unlock(&libtrace->first_packets.lock) == 0);
859                libtrace_message_t mesg = {0};
860                mesg.code = MESSAGE_FIRST_PACKET;
861                trace_send_message_to_reducer(libtrace, &mesg);
862                t->recorded_first = true;
863        }
864}
865
866/**
867 * Returns 1 if it's certain that the first packet is truly the first packet
868 * rather than a best guess based upon threads that have published so far.
869 * Otherwise 0 is returned.
870 * It's recommended that this result is stored rather than calling this
871 * function again.
872 */
873DLLEXPORT int retrive_first_packet(libtrace_t *libtrace, libtrace_packet_t **packet, struct timeval **tv)
874{
875        int ret = 0;
876        assert(pthread_spin_lock(&libtrace->first_packets.lock) == 0);
877        if (libtrace->first_packets.count) {
878                *packet = libtrace->first_packets.packets[libtrace->first_packets.first].packet;
879                *tv = &libtrace->first_packets.packets[libtrace->first_packets.first].tv;
880                if (libtrace->first_packets.count == (size_t) libtrace->perpkt_thread_count) {
881                        ret = 1;
882                } else {
883                        struct timeval curr_tv;
884                        // If a second has passed since the first entry we will assume this is the very first packet
885                        gettimeofday(&curr_tv, NULL);
886                        if (curr_tv.tv_sec > (*tv)->tv_sec) {
887                                if(curr_tv.tv_usec > (*tv)->tv_usec || curr_tv.tv_sec - (*tv)->tv_sec > 1) {
888                                        ret = 1;
889                                }
890                        }
891                }
892        } else {
893                *packet = NULL;
894                *tv = NULL;
895        }
896        assert(pthread_spin_unlock(&libtrace->first_packets.lock) == 0);
897        return ret;
898}
899
900
901DLLEXPORT uint64_t tv_to_usec(struct timeval *tv)
902{
903        return (uint64_t) tv->tv_sec*1000000ull + (uint64_t) tv->tv_usec;
904}
905
906inline static struct timeval usec_to_tv(uint64_t usec)
907{
908        struct timeval tv;
909        tv.tv_sec = usec / 1000000;
910        tv.tv_usec = usec % 1000000;
911        return tv;
912}
913
914/** Similar to delay_tracetime but send messages to all threads periodically */
915static void* keepalive_entry(void *data) {
916        struct timeval prev, next;
917        libtrace_message_t message = {0};
918        libtrace_t *trace = (libtrace_t *)data;
919        uint64_t next_release;
920        fprintf(stderr, "keepalive thread is starting\n");
921
922        gettimeofday(&prev, NULL);
923        message.code = MESSAGE_TICK;
924        while (trace->state != STATE_FINSHED) {
925                fd_set rfds;
926                next_release = tv_to_usec(&prev) + (trace->tick_interval * 1000);
927                gettimeofday(&next, NULL);
928                if (next_release > tv_to_usec(&next)) {
929                        next = usec_to_tv(next_release - tv_to_usec(&next));
930                        // Wait for timeout or a message
931                        FD_ZERO(&rfds);
932                FD_SET(libtrace_message_queue_get_fd(&trace->keepalive_thread.messages), &rfds);
933                        if (select(libtrace_message_queue_get_fd(&trace->keepalive_thread.messages)+1, &rfds, NULL, NULL, &next) == 1) {
934                                libtrace_message_t msg;
935                                libtrace_message_queue_get(&trace->keepalive_thread.messages, &msg);
936                                assert(msg.code == MESSAGE_DO_STOP);
937                                goto done;
938                        }
939                }
940                prev = usec_to_tv(next_release);
941                if (trace->state == STATE_RUNNING) {
942                        message.additional.uint64 = tv_to_usec(&prev);
943                        trace_send_message_to_perpkts(trace, &message);
944                }
945        }
946done:
947        fprintf(stderr, "keepalive thread is finishing\n");
948        trace->keepalive_thread.state = THREAD_FINISHED;
949        return NULL;
950}
951
952/**
953 * Delays a packets playback so the playback will be in trace time
954 */
955static inline void delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t) {
956        struct timeval curr_tv, pkt_tv;
957        uint64_t next_release = t->tracetime_offset_usec; // Time at which to release the packet
958        uint64_t curr_usec;
959        /* Tracetime we might delay releasing this packet */
960        if (!t->tracetime_offset_usec) {
961                libtrace_packet_t * first_pkt;
962                struct timeval *sys_tv;
963                int64_t initial_offset;
964                int stable = retrive_first_packet(libtrace, &first_pkt, &sys_tv);
965                assert(first_pkt);
966                pkt_tv = trace_get_timeval(first_pkt);
967                initial_offset = (int64_t)tv_to_usec(sys_tv) - (int64_t)tv_to_usec(&pkt_tv);
968                if (stable)
969                        // 0->1 because 0 is used to mean unset
970                        t->tracetime_offset_usec = initial_offset ? initial_offset: 1;
971                next_release = initial_offset;
972        }
973        /* next_release == offset */
974        pkt_tv = trace_get_timeval(packet);
975        next_release += tv_to_usec(&pkt_tv);
976        gettimeofday(&curr_tv, NULL);
977        curr_usec = tv_to_usec(&curr_tv);
978        if (next_release > curr_usec) {
979                // We need to wait
980                struct timeval delay_tv = usec_to_tv(next_release-curr_usec);
981                //printf("WAITING for %d.%d next=%"PRIu64" curr=%"PRIu64" seconds packettime %f\n", delay_tv.tv_sec, delay_tv.tv_usec, next_release, curr_usec, trace_get_seconds(packet));
982                select(0, NULL, NULL, NULL, &delay_tv);
983        }
984}
985
986/* Read one packet from the trace into a buffer. Note that this function will
987 * block until a packet is read (or EOF is reached).
988 *
989 * @param libtrace      the libtrace opaque pointer
990 * @param packet        the packet opaque pointer
991 * @returns 0 on EOF, negative value on error
992 *
993 * Note this is identical to read_packet but calls pread_packet instead of
994 * read packet in the format.
995 *
996 */
997static inline int trace_pread_packet_wrapper(libtrace_t *libtrace, libtrace_packet_t *packet) {
998
999        assert(libtrace && "You called trace_read_packet() with a NULL libtrace parameter!\n");
1000        if (trace_is_err(libtrace))
1001                return -1;
1002        if (!libtrace->started) {
1003                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,"You must call libtrace_start() before trace_read_packet()\n");
1004                return -1;
1005        }
1006        if (!(packet->buf_control==TRACE_CTRL_PACKET || packet->buf_control==TRACE_CTRL_EXTERNAL)) {
1007                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,"Packet passed to trace_read_packet() is invalid\n");
1008                return -1;
1009        }
1010        assert(packet);
1011
1012        if (libtrace->format->read_packet) {
1013                do {
1014                        size_t ret;
1015                        /* Finalise the packet, freeing any resources the format module
1016                         * may have allocated it and zeroing all data associated with it.
1017                         */
1018                        trace_fin_packet(packet);
1019                        /* Store the trace we are reading from into the packet opaque
1020                         * structure */
1021                        packet->trace = libtrace;
1022                        ret=libtrace->format->pread_packet(libtrace,packet);
1023                        if (ret==(size_t)-1 || ret==(size_t)-2 || ret==0) {
1024                                return ret;
1025                        }
1026                        if (libtrace->filter) {
1027                                /* If the filter doesn't match, read another
1028                                 * packet
1029                                 */
1030                                if (!trace_apply_filter(libtrace->filter,packet)){
1031                                        ++libtrace->filtered_packets;
1032                                        continue;
1033                                }
1034                        }
1035                        if (libtrace->snaplen>0) {
1036                                /* Snap the packet */
1037                                trace_set_capture_length(packet,
1038                                                libtrace->snaplen);
1039                        }
1040                        trace_packet_set_order(packet, libtrace->accepted_packets);
1041                        ++libtrace->accepted_packets;
1042                        return ret;
1043                } while(1);
1044        }
1045        trace_set_err(libtrace,TRACE_ERR_UNSUPPORTED,"This format does not support reading packets\n");
1046        return ~0U;
1047}
1048
1049/**
1050 * Read a packet from the parallel trace
1051 */
1052DLLEXPORT int trace_pread_packet(libtrace_t *libtrace, libtrace_packet_t **packet)
1053{
1054        int ret;
1055        libtrace_thread_t *t = get_thread_table(libtrace);
1056
1057        // Cleanup the packet passed back
1058        if (*packet)
1059                trace_fin_packet(*packet);
1060
1061        if (trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
1062                if (!*packet)
1063                        *packet = trace_create_packet();
1064                ret = trace_pread_packet_wrapper(libtrace, *packet);
1065        } else if (trace_has_dedicated_hasher(libtrace)) {
1066                ret = trace_pread_packet_hasher_thread(libtrace, packet);
1067        } else if (!trace_has_dedicated_hasher(libtrace)) {
1068                /* We don't care about which core a packet goes to */
1069                ret = trace_pread_packet_first_in_first_served(libtrace, packet);
1070        } /* else if (libtrace->reducer_flags & PERPKT_USE_SLIDING_WINDOW) {
1071                ret = trace_pread_packet_sliding_window(libtrace, packet);
1072        } else {
1073                ret = trace_pread_packet_hash_locked(libtrace, packet);
1074        }*/
1075
1076        // Formats can also optionally do this internally to ensure the first
1077        // packet is always reported correctly
1078        if (ret > 0) {
1079                store_first_packet(libtrace, *packet, t);
1080                if (libtrace->tracetime)
1081                        delay_tracetime(libtrace, *packet, t);
1082        }
1083
1084        return ret;
1085}
1086
1087/* Starts perpkt threads
1088 * @return threads_started
1089 */
1090static inline int trace_start_perpkt_threads (libtrace_t *libtrace) {
1091        int i;
1092
1093        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1094                libtrace_thread_t *t = &libtrace->perpkt_threads[i];
1095                assert(pthread_create(&t->tid, NULL, perpkt_threads_entry, (void *) libtrace) == 0);
1096        }
1097        return libtrace->perpkt_thread_count;
1098}
1099
1100/* Start an input trace in a parallel fashion.
1101 *
1102 * @param libtrace the input trace to start
1103 * @param global_blob some global data you can share with the new perpkt threads
1104 * @returns 0 on success
1105 */
1106DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob, fn_per_pkt per_pkt, fn_reducer reducer)
1107{
1108        int i;
1109        sigset_t sig_before, sig_block_all;
1110
1111        assert(libtrace);
1112        if (trace_is_err(libtrace))
1113                return -1;
1114        if (libtrace->state == STATE_PAUSED) {
1115                assert (libtrace->perpkts_pausing != 0);
1116                fprintf(stderr, "Restarting trace\n");
1117                UNUSED int err;
1118                if (libtrace->perpkt_thread_count > 1 && trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
1119                        printf("This format has direct support for p's\n");
1120                        err = libtrace->format->pstart_input(libtrace);
1121                } else {
1122                        if (libtrace->format->start_input) {
1123                                err = libtrace->format->start_input(libtrace);
1124                        }
1125                }
1126                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1127                libtrace->started = true;
1128                libtrace->state = STATE_RUNNING;
1129                assert(pthread_cond_broadcast(&libtrace->perpkt_cond) == 0);
1130                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1131                return 0;
1132        }
1133
1134        assert(libtrace->state == STATE_NEW);
1135        libtrace_parallel = 1;
1136
1137        // Store the user defined things against the trace
1138        libtrace->global_blob = global_blob;
1139        libtrace->per_pkt = per_pkt;
1140        libtrace->reducer = reducer;
1141        libtrace->perpkts_finishing = 0;
1142
1143        assert(pthread_mutex_init(&libtrace->libtrace_lock, NULL) == 0);
1144        assert(pthread_cond_init(&libtrace->perpkt_cond, NULL) == 0);
1145        assert(pthread_rwlock_init(&libtrace->window_lock, NULL) == 0);
1146        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1147
1148        // Set default buffer sizes
1149        if (libtrace->perpkt_buffer_size <= 0)
1150                libtrace->perpkt_buffer_size = 1000;
1151
1152        if (libtrace->perpkt_thread_count <= 0) {
1153                // TODO add BSD support
1154                libtrace->perpkt_thread_count = sysconf(_SC_NPROCESSORS_ONLN);
1155                if (libtrace->perpkt_thread_count <= 0)
1156                        // Lets just use one
1157                        libtrace->perpkt_thread_count = 1;
1158        }
1159
1160        if(libtrace->packet_freelist_size <= 0)
1161                libtrace->packet_freelist_size = (libtrace->perpkt_buffer_size + 1) * libtrace->perpkt_thread_count;
1162
1163        if(libtrace->packet_freelist_size <
1164                (libtrace->perpkt_buffer_size + 1) * libtrace->perpkt_thread_count)
1165                fprintf(stderr, "WARNING deadlocks may occur and extra memory allocating buffer sizes (packet_freelist_size) mismatched\n");
1166
1167        libtrace->started=true; // Before we start the threads otherwise we could have issues
1168        libtrace->state = STATE_RUNNING;
1169        /* Disable signals - Pthread signal handling */
1170
1171        sigemptyset(&sig_block_all);
1172
1173        assert(pthread_sigmask(SIG_SETMASK, &sig_block_all, &sig_before) == 0);
1174
1175        // If we are using a hasher start it
1176        // If single threaded we don't need a hasher
1177        if (libtrace->perpkt_thread_count > 1 && libtrace->hasher && libtrace->hasher_type != HASHER_HARDWARE) {
1178                libtrace_thread_t *t = &libtrace->hasher_thread;
1179                t->trace = libtrace;
1180                t->ret = NULL;
1181                t->type = THREAD_HASHER;
1182                t->state = THREAD_RUNNING;
1183                libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
1184                assert(pthread_create(&t->tid, NULL, hasher_start, (void *) libtrace) == 0);
1185        } else {
1186                libtrace->hasher_thread.type = THREAD_EMPTY;
1187        }
1188        libtrace_ringbuffer_init(&libtrace->packet_freelist, libtrace->packet_freelist_size, LIBTRACE_RINGBUFFER_BLOCKING);
1189        //libtrace_slidingwindow_init(&libtrace->sliding_window, libtrace->packet_freelist_size, 0);
1190        assert(sem_init(&libtrace->sem, 0, libtrace->packet_freelist_size) == 0);
1191        // This will be applied to every new thread that starts, i.e. they will block all signals
1192        // Lets start a fixed number of reading threads
1193
1194        // For now we never have a dedicated thread for the reducer
1195        // i.e. This main thread is used as the reducer
1196        libtrace->reducer_thread.tid = pthread_self();
1197        libtrace->reducer_thread.type = THREAD_REDUCER;
1198        libtrace->reducer_thread.state = THREAD_RUNNING;
1199        libtrace_message_queue_init(&libtrace->reducer_thread.messages, sizeof(libtrace_message_t));
1200
1201        /* Ready some storages */
1202        libtrace->first_packets.first = 0;
1203        libtrace->first_packets.count = 0;
1204        assert(pthread_spin_init(&libtrace->first_packets.lock, 0) == 0);
1205        libtrace->first_packets.packets = calloc(libtrace->perpkt_thread_count, sizeof(struct  __packet_storage_magic_type));
1206
1207
1208        /* Start all of our perpkt threads */
1209        libtrace->perpkt_threads = calloc(sizeof(libtrace_thread_t), libtrace->perpkt_thread_count);
1210        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1211                libtrace_thread_t *t = &libtrace->perpkt_threads[i];
1212                t->trace = libtrace;
1213                t->ret = NULL;
1214                t->type = THREAD_PERPKT;
1215                t->state = THREAD_RUNNING;
1216                t->user_data = NULL;
1217                // t->tid DONE on create
1218                t->perpkt_num = i;
1219                if (libtrace->hasher)
1220                        libtrace_ringbuffer_init(&t->rbuffer, libtrace->perpkt_buffer_size, LIBTRACE_RINGBUFFER_BLOCKING);
1221                // Depending on the mode vector or deque might be chosen
1222                libtrace_vector_init(&t->vector, sizeof(libtrace_result_t));
1223                libtrace_deque_init(&t->deque, sizeof(libtrace_result_t));
1224                libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
1225                t->tmp_key = 0;
1226                t->tmp_data = NULL;
1227                t->recorded_first = false;
1228                assert(pthread_spin_init(&t->tmp_spinlock, 0) == 0);
1229                t->tracetime_offset_usec = 0;;
1230        }
1231
1232        int threads_started = 0;
1233        /* Setup the trace and start our threads */
1234        if (libtrace->perpkt_thread_count > 1 && trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
1235                printf("This format has direct support for p's\n");
1236                threads_started = libtrace->format->pstart_input(libtrace);
1237        } else {
1238                if (libtrace->format->start_input) {
1239                        threads_started=libtrace->format->start_input(libtrace);
1240                }
1241        }
1242        if (threads_started == 0)
1243                threads_started = trace_start_perpkt_threads(libtrace);
1244
1245        if (libtrace->tick_interval > 0) {
1246                libtrace->keepalive_thread.type = THREAD_KEEPALIVE;
1247                libtrace->keepalive_thread.state = THREAD_RUNNING;
1248                libtrace_message_queue_init(&libtrace->keepalive_thread.messages, sizeof(libtrace_message_t));
1249                assert(pthread_create(&libtrace->keepalive_thread.tid, NULL, keepalive_entry, (void *) libtrace) == 0);
1250        }
1251
1252        // Revert back - Allow signals again
1253        assert(pthread_sigmask(SIG_SETMASK, &sig_before, NULL) == 0);
1254        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1255
1256        if (threads_started < 0)
1257                // Error
1258                return threads_started;
1259
1260        // TODO fix these leaks etc
1261        if (libtrace->perpkt_thread_count != threads_started)
1262                printf("Warning started threads not equal requested s=%d r=%d", threads_started, libtrace->perpkt_thread_count);
1263
1264
1265        return 0;
1266}
1267
1268/**
1269 * Pauses a trace, this should only be called by the main thread
1270 * 1. Set started = false
1271 * 2. All perpkt threads are paused waiting on a condition var
1272 * 3. Then call ppause on the underlying format if found
1273 * 4. Return with perpkts_pausing set to perpkt_count (Used when restarting so we reuse the threads)
1274 *
1275 * Once done you should be able to modify the trace setup and call pstart again
1276 * TODO handle changing thread numbers
1277 */
1278DLLEXPORT int trace_ppause(libtrace_t *libtrace)
1279{
1280        libtrace_thread_t *t;
1281        int i;
1282        assert(libtrace);
1283        if (!libtrace->started || libtrace->state != STATE_RUNNING) {
1284                fprintf(stderr, "pause failed started=%d state=%s (%d)\n", libtrace->started, get_trace_state_name(libtrace->state), libtrace->state);
1285                trace_set_err(libtrace,TRACE_ERR_BAD_STATE, "You must call trace_start() before calling trace_ppause()");
1286                return -1;
1287        }
1288
1289        t = get_thread_table(libtrace);
1290
1291        // Set pausing
1292        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1293        libtrace->state = STATE_PAUSING;
1294        pthread_cond_broadcast(&libtrace->perpkt_cond);
1295        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1296
1297        // Special case handle the hasher thread case
1298        if (trace_has_dedicated_hasher(libtrace)) {
1299                fprintf(stderr, "Hasher thread running we deal with this special!\n");
1300                libtrace_message_t message = {0};
1301                message.code = MESSAGE_DO_PAUSE;
1302                trace_send_message_to_thread(libtrace, &libtrace->hasher_thread, &message);
1303                // Wait for it to pause
1304                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1305                while (1 != libtrace->perpkts_pausing) {
1306                        assert(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock) == 0);
1307                }
1308                libtrace->perpkts_pausing--; // Do this on the hasher's behalf
1309                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1310        }
1311
1312        fprintf(stderr, "Sending messages \n");
1313        // Stop threads, skip this one if its a perpkt
1314        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1315                if (&libtrace->perpkt_threads[i] != t) {
1316                        libtrace_message_t message = {0};
1317                        message.code = MESSAGE_DO_PAUSE;
1318                        trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message);
1319                        if(trace_has_dedicated_hasher(libtrace)) {
1320                                // The hasher has stopped and other threads have messages waiting therefore
1321                                // If the queues are empty the other threads would have no data
1322                                // So send some NULL packets to simply ask the threads to check there message queues
1323                                // We are the only writer since hasher has paused
1324                                libtrace_ringbuffer_write(&libtrace->perpkt_threads[i].rbuffer, NULL);
1325                        }
1326                } else {
1327                        fprintf(stderr, "Mapper threads should not be used to pause a trace this could cause any number of problems!!\n");
1328                }
1329        }
1330
1331        // Formats must support native message handling if a message is ready
1332        // Approach per Perry's suggestion is a non-blocking read
1333        // followed by a blocking read. XXX STRIP THIS OUT
1334
1335        if (t) {
1336                // A perpkt is doing the pausing interesting fake a extra thread paused
1337                // We rely on the user to not return before starting the trace again
1338                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1339                libtrace->perpkts_pausing++;
1340                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1341        }
1342
1343        fprintf(stderr, "Asking threads to pause\n");
1344
1345        // Wait for all threads to pause
1346        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1347        while (libtrace->perpkt_thread_count != libtrace->perpkts_pausing) {
1348                assert(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock) == 0);
1349        }
1350        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1351
1352        fprintf(stderr, "Threads have paused\n");
1353
1354        if (trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
1355                libtrace->started = false;
1356                if (libtrace->format->ppause_input)
1357                        libtrace->format->ppause_input(libtrace);
1358                // TODO What happens if we don't have pause input??
1359        } else {
1360                int err;
1361                fprintf(stderr, "Trace is not parallel so we are doing a normal pause %s\n", libtrace->uridata);
1362                err = trace_pause(libtrace);
1363                // We should handle this a bit better
1364                if (err)
1365                        return err;
1366        }
1367
1368        // Only set as paused after the pause has been called on the trace
1369        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1370        libtrace->state = STATE_PAUSED;
1371        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1372        return 0;
1373}
1374
1375/**
1376 * Stop trace finish prematurely as though it meet an EOF
1377 * This should only be called by the main thread
1378 * 1. Calls ppause
1379 * 2. Sends a message asking for threads to finish
1380 *
1381 */
1382DLLEXPORT int trace_pstop(libtrace_t *libtrace)
1383{
1384        int i;
1385        libtrace_message_t message = {0};
1386        assert(libtrace);
1387        if (!libtrace->started) {
1388                trace_set_err(libtrace,TRACE_ERR_BAD_STATE, "You must call trace_start() before calling trace_pstop()");
1389                return -1;
1390        }
1391
1392        // Ensure all threads have paused and the underlying trace format has
1393        // been closed and all packets associated are cleaned up
1394        trace_ppause(libtrace);
1395
1396        // Now send a message asking the threads to stop
1397        // This will be retrieved before trying to read another packet
1398       
1399        message.code = MESSAGE_DO_STOP;
1400        trace_send_message_to_perpkts(libtrace, &message);
1401        if (trace_has_dedicated_hasher(libtrace))
1402                trace_send_message_to_thread(libtrace, &libtrace->hasher_thread, &message);
1403       
1404        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1405                trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message);
1406        }
1407
1408        // Now release the threads and let them stop
1409        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1410        libtrace->state = STATE_FINSHED;
1411        assert(pthread_cond_broadcast(&libtrace->perpkt_cond) == 0);
1412        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1413        return 0;
1414}
1415
1416/**
1417 * Set the hasher type along with a selected function, if hardware supports
1418 * that generic type of hashing it will be used otherwise the supplied
1419 * hasher function will be used and passed data when called.
1420 *
1421 * @return 0 if successful otherwise -1 on error
1422 */
1423DLLEXPORT int trace_set_hasher(libtrace_t *trace, enum hasher_types type, fn_hasher hasher, void *data) {
1424        int ret = -1;
1425        if (type == HASHER_HARDWARE || (type == HASHER_CUSTOM && !hasher) || (type == HASHER_BALANCE && hasher)) {
1426                return -1;
1427        }
1428
1429        // Save the requirements
1430        trace->hasher_type = type;
1431        if (hasher) {
1432                trace->hasher = hasher;
1433                trace->hasher_data = data;
1434        } else {
1435                trace->hasher = NULL;
1436                // TODO consider how to handle freeing this
1437                trace->hasher_data = NULL;
1438        }
1439
1440        // Try push this to hardware - NOTE hardware could do custom if
1441        // there is a more efficient way to apply it, in this case
1442        // it will simply grab the function out of libtrace_t
1443        if (trace->format->pconfig_input)
1444                ret = trace->format->pconfig_input(trace, TRACE_OPTION_SET_HASHER, &type);
1445
1446        if (ret == -1) {
1447                // We have to deal with this ourself
1448                // This most likely means single threaded reading of the trace
1449                if (!hasher) {
1450                        switch (type)
1451                        {
1452                                case HASHER_CUSTOM:
1453                                case HASHER_BALANCE:
1454                                        return 0;
1455                                case HASHER_BIDIRECTIONAL:
1456                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
1457                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
1458                                        toeplitz_init_config(trace->hasher_data, 1);
1459                                        return 0;
1460                                case HASHER_UNIDIRECTIONAL:
1461                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
1462                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
1463                                        toeplitz_init_config(trace->hasher_data, 0);
1464                                        return 0;
1465                                case HASHER_HARDWARE:
1466                                        return -1;
1467                        }
1468                        return -1;
1469                }
1470        } else {
1471                // The hardware is dealing with this yay
1472                trace->hasher_type = HASHER_HARDWARE;
1473        }
1474
1475        return 0;
1476}
1477
1478// Waits for all threads to finish
1479DLLEXPORT void trace_join(libtrace_t *libtrace) {
1480        int i;
1481
1482        /* Firstly wait for the perpkt threads to finish, since these are
1483         * user controlled */
1484        for (i=0; i< libtrace->perpkt_thread_count; i++) {
1485                //printf("Waiting to join with perpkt #%d\n", i);
1486                assert(pthread_join(libtrace->perpkt_threads[i].tid, NULL) == 0);
1487                //printf("Joined with perpkt #%d\n", i);
1488                // So we must do our best effort to empty the queue - so
1489                // the producer (or any other threads) don't block.
1490                libtrace_packet_t * packet;
1491                // Mark that we are no longer accepting packets
1492                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1493                libtrace->perpkt_threads[i].state = THREAD_FINISHED; // Important we are finished before we empty the buffer
1494                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1495                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
1496                        if (packet) // This could be NULL iff the perpkt finishes early
1497                                trace_destroy_packet(packet);
1498        }
1499
1500        /* Now the hasher */
1501        // XXX signal it to stop
1502        if (trace_has_dedicated_hasher(libtrace)) {
1503                fprintf(stderr, "Waiting to join with the hasher\n");
1504                pthread_join(libtrace->hasher_thread.tid, NULL);
1505                fprintf(stderr, "Joined with with the hasher\n");
1506                libtrace->hasher_thread.state = THREAD_FINISHED;
1507        }
1508
1509        // Now that everything is finished nothing can be touching our
1510        // buffers so clean them up
1511        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1512                // Its possible 1 packet got added by the reducer (or 1 per any other thread) since we cleaned up
1513                // if they lost timeslice before-during a write
1514                libtrace_packet_t * packet;
1515                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
1516                        trace_destroy_packet(packet);
1517                if (libtrace->hasher) {
1518                        assert(libtrace_ringbuffer_is_empty(&libtrace->perpkt_threads[i].rbuffer));
1519                        libtrace_ringbuffer_destroy(&libtrace->perpkt_threads[i].rbuffer);
1520                }
1521                // Cannot destroy vector yet, this happens with trace_destroy
1522        }
1523        libtrace->state = STATE_FINSHED;
1524       
1525        // Wait for the tick (keepalive) thread if its been started
1526        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE && libtrace->keepalive_thread.state == THREAD_RUNNING) {
1527                libtrace_message_t msg = {0};
1528                msg.code = MESSAGE_DO_STOP;
1529                fprintf(stderr, "Waiting to join with the keepalive\n");
1530                trace_send_message_to_thread(libtrace, &libtrace->keepalive_thread, &msg);
1531                pthread_join(libtrace->keepalive_thread.tid, NULL);
1532                fprintf(stderr, "Joined with with the keepalive\n");
1533        }
1534        // Lets mark this as done for now
1535        libtrace->state = STATE_JOINED;
1536}
1537
1538DLLEXPORT int libtrace_thread_get_message_count(libtrace_t * libtrace)
1539{
1540        libtrace_thread_t * t = get_thread_descriptor(libtrace);
1541        assert(t);
1542        return libtrace_message_queue_count(&t->messages);
1543}
1544
1545DLLEXPORT int libtrace_thread_get_message(libtrace_t * libtrace, libtrace_message_t * message)
1546{
1547        libtrace_thread_t * t = get_thread_descriptor(libtrace);
1548        assert(t);
1549        return libtrace_message_queue_get(&t->messages, message);
1550}
1551
1552DLLEXPORT int libtrace_thread_try_get_message(libtrace_t * libtrace, libtrace_message_t * message)
1553{
1554        libtrace_thread_t * t = get_thread_descriptor(libtrace);
1555        assert(t);
1556        return libtrace_message_queue_try_get(&t->messages, message);
1557}
1558
1559/**
1560 * Return backlog indicator
1561 */
1562DLLEXPORT int trace_post_reduce(libtrace_t *libtrace)
1563{
1564        libtrace_message_t message = {0};
1565        message.code = MESSAGE_POST_REDUCE;
1566        message.sender = get_thread_descriptor(libtrace);
1567        return libtrace_message_queue_put(&libtrace->reducer_thread.messages, (void *) &message);
1568}
1569
1570/**
1571 * Return backlog indicator
1572 */
1573DLLEXPORT int trace_send_message_to_reducer(libtrace_t * libtrace, libtrace_message_t * message)
1574{
1575        //printf("Sending message code=%d to reducer\n", message->code);
1576        message->sender = get_thread_descriptor(libtrace);
1577        return libtrace_message_queue_put(&libtrace->reducer_thread.messages, message);
1578}
1579
1580/**
1581 *
1582 */
1583DLLEXPORT int trace_send_message_to_thread(libtrace_t * libtrace, libtrace_thread_t *t, libtrace_message_t * message)
1584{
1585        //printf("Sending message code=%d to reducer\n", message->code);
1586        message->sender = get_thread_descriptor(libtrace);
1587        return libtrace_message_queue_put(&t->messages, message);
1588}
1589
1590DLLEXPORT int trace_send_message_to_perpkts(libtrace_t * libtrace, libtrace_message_t * message)
1591{
1592        int i;
1593        message->sender = get_thread_descriptor(libtrace);
1594        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1595                libtrace_message_queue_put(&libtrace->perpkt_threads[i].messages, message);
1596        }
1597        //printf("Sending message code=%d to reducer\n", message->code);
1598        return 0;
1599}
1600
1601DLLEXPORT void libtrace_result_set_key(libtrace_result_t * result, uint64_t key) {
1602        result->key = key;
1603}
1604DLLEXPORT uint64_t libtrace_result_get_key(libtrace_result_t * result) {
1605        return result->key;
1606}
1607DLLEXPORT void libtrace_result_set_value(libtrace_result_t * result, void * value) {
1608        result->value = value;
1609}
1610DLLEXPORT void* libtrace_result_get_value(libtrace_result_t * result) {
1611        return result->value;
1612}
1613DLLEXPORT void libtrace_result_set_key_value(libtrace_result_t * result, uint64_t key, void * value) {
1614        result->key = key;
1615        result->value = value;
1616}
1617DLLEXPORT void trace_destroy_result(libtrace_result_t ** result) {
1618        free(*result);
1619        result = NULL;
1620        // TODO automatically back with a free list!!
1621}
1622
1623DLLEXPORT void * trace_get_global(libtrace_t *trace)
1624{
1625        return trace->global_blob;
1626}
1627
1628DLLEXPORT void * trace_set_global(libtrace_t *trace, void * data)
1629{
1630        if (trace->global_blob && trace->global_blob != data) {
1631                void * ret = trace->global_blob;
1632                trace->global_blob = data;
1633                return ret;
1634        } else {
1635                trace->global_blob = data;
1636                return NULL;
1637        }
1638}
1639
1640DLLEXPORT void * trace_get_tls(libtrace_thread_t *t)
1641{
1642        return t->user_data;
1643}
1644
1645DLLEXPORT void * trace_set_tls(libtrace_thread_t *t, void * data)
1646{
1647        if(t->user_data && t->user_data != data) {
1648                void *ret = t->user_data;
1649                t->user_data = data;
1650                return ret;
1651        } else {
1652                t->user_data = data;
1653                return NULL;
1654        }
1655}
1656
1657/**
1658 * Publish to the reduce queue, return
1659 */
1660DLLEXPORT void trace_publish_result(libtrace_t *libtrace, uint64_t key, void * value) {
1661        libtrace_result_t res;
1662        res.is_packet = 0;
1663        // Who am I???
1664        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
1665        libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
1666        // Now put it into my table
1667        UNUSED static __thread int count = 0;
1668
1669
1670        libtrace_result_set_key_value(&res, key, value);
1671        /*
1672        if (count == 1)
1673                printf("My vector size is %d\n", libtrace_vector_get_size(&t->vector));
1674        count = (count+1) %1000;
1675        libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :)
1676        */
1677        /*if (count == 1)
1678                printf("My vector size is %d\n", libtrace_deque_get_size(&t->deque));
1679        count = (count+1)%1000;*/
1680        if (libtrace->reducer_flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED)) {
1681                if (libtrace_deque_get_size(&t->deque) >= 800) {
1682                        trace_post_reduce(libtrace);
1683                }
1684                //while (libtrace_deque_get_size(&t->deque) >= 1000)
1685                //      sched_yield();
1686                libtrace_deque_push_back(&t->deque, &res); // Automatically locking for us :)
1687        } else {
1688                //while (libtrace_vector_get_size(&t->vector) >= 1000)
1689                //      sched_yield();
1690
1691                if (libtrace_vector_get_size(&t->vector) >= 800) {
1692                        trace_post_reduce(libtrace);
1693                }
1694                libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :)
1695        }
1696}
1697
1698DLLEXPORT void trace_publish_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
1699        libtrace_result_t res;
1700        // Who am I???
1701        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
1702        libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
1703        // Now put it into my table
1704        UNUSED static __thread int count = 0;
1705
1706        res.is_packet = 1;
1707        libtrace_result_set_key_value(&res, trace_packet_get_order(packet), packet);
1708        /*
1709        if (count == 1)
1710                printf("My vector size is %d\n", libtrace_vector_get_size(&t->vector));
1711        count = (count+1) %1000;
1712        libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :)
1713        */
1714        /*if (count == 1)
1715                printf("My vector size is %d\n", libtrace_deque_get_size(&t->deque));
1716        count = (count+1)%1000;*/
1717        if (libtrace->reducer_flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED)) {
1718                if (libtrace_deque_get_size(&t->deque) >= 800) {
1719                        trace_post_reduce(libtrace);
1720                }
1721                //while (libtrace_deque_get_size(&t->deque) >= 1000)
1722                //      sched_yield();
1723                libtrace_deque_push_back(&t->deque, &res); // Automatically locking for us :)
1724        } else {
1725                //while (libtrace_vector_get_size(&t->vector) >= 1000)
1726                //      sched_yield();
1727
1728                if (libtrace_vector_get_size(&t->vector) >= 800) {
1729                        trace_post_reduce(libtrace);
1730                }
1731                libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :)
1732        }
1733}
1734
1735
1736static int compareres(const void* p1, const void* p2)
1737{
1738        if (libtrace_result_get_key((libtrace_result_t *) p1) < libtrace_result_get_key((libtrace_result_t *) p2))
1739                return -1;
1740        if (libtrace_result_get_key((libtrace_result_t *) p1) == libtrace_result_get_key((libtrace_result_t *) p2))
1741                return 0;
1742        else
1743                return 1;
1744}
1745
1746DLLEXPORT int trace_get_results(libtrace_t *libtrace, libtrace_vector_t * results) {
1747        int i;
1748        int flags = libtrace->reducer_flags; // Hint these aren't a changing
1749
1750        libtrace_vector_empty(results);
1751
1752        /* Here we assume queues are in order ascending order and they want
1753         * the smallest result first. If they are not in order the results
1754         * may not be in order.
1755         */
1756        if (flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED)) {
1757                int live_count = 0;
1758                bool live[libtrace->perpkt_thread_count]; // Set if a trace is alive
1759                uint64_t key[libtrace->perpkt_thread_count]; // Cached keys
1760                uint64_t min_key = UINT64_MAX; // XXX use max int here stdlimit.h?
1761                int min_queue = -1;
1762
1763                /* Loop through check all are alive (have data) and find the smallest */
1764                for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1765                        libtrace_queue_t *v = &libtrace->perpkt_threads[i].deque;
1766                        if (libtrace_deque_get_size(v) != 0) {
1767                                libtrace_result_t r;
1768                                libtrace_deque_peek_front(v, (void *) &r);
1769                                live_count++;
1770                                live[i] = 1;
1771                                key[i] = libtrace_result_get_key(&r);
1772                                if (i==0 || min_key > key[i]) {
1773                                        min_key = key[i];
1774                                        min_queue = i;
1775                                }
1776                        } else {
1777                                live[i] = 0;
1778                        }
1779                }
1780
1781                /* Now remove the smallest and loop - special case if all threads have joined we always flush whats left */
1782                while ((live_count == libtrace->perpkt_thread_count) || (live_count &&
1783                                ((flags & REDUCE_SEQUENTIAL && min_key == libtrace->expected_key) ||
1784                                libtrace->state == STATE_JOINED))) {
1785                        /* Get the minimum queue and then do stuff */
1786                        libtrace_result_t r;
1787
1788                        assert (libtrace_deque_pop_front(&libtrace->perpkt_threads[min_queue].deque, (void *) &r) == 1);
1789                        libtrace_vector_push_back(results, &r);
1790
1791                        // We expect the key we read +1 now
1792                        libtrace->expected_key = key[min_queue] + 1;
1793
1794                        // Now update the one we just removed
1795                        if (libtrace_deque_get_size(&libtrace->perpkt_threads[min_queue].deque) )
1796                        {
1797                                libtrace_deque_peek_front(&libtrace->perpkt_threads[min_queue].deque, (void *) &r);
1798                                key[min_queue] = libtrace_result_get_key(&r);
1799                                if (key[min_queue] <= min_key) {
1800                                        // We are still the smallest, might be out of order though :(
1801                                        min_key = key[min_queue];
1802                                } else {
1803                                        min_key = key[min_queue]; // Update our minimum
1804                                        // Check all find the smallest again - all are alive
1805                                        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1806                                                if (live[i] && min_key > key[i]) {
1807                                                        min_key = key[i];
1808                                                        min_queue = i;
1809                                                }
1810                                        }
1811                                }
1812                        } else {
1813                                live[min_queue] = 0;
1814                                live_count--;
1815                                min_key = UINT64_MAX; // Update our minimum
1816                                // Check all find the smallest again - all are alive
1817                                for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1818                                        // Still not 100% TODO (what if order is wrong or not increasing)
1819                                        if (live[i] && min_key >= key[i]) {
1820                                                min_key = key[i];
1821                                                min_queue = i;
1822                                        }
1823                                }
1824                        }
1825                }
1826        } else { // Queues are not in order - return all results in the queue
1827                for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1828                        libtrace_vector_append(results, &libtrace->perpkt_threads[i].vector);
1829                }
1830                if (flags & REDUCE_SORT) {
1831                        qsort(results->elements, results->size, results->element_size, &compareres);
1832                }
1833        }
1834        return libtrace_vector_get_size(results);
1835}
1836
1837DLLEXPORT uint64_t trace_packet_get_order(libtrace_packet_t * packet) {
1838        return packet->order;
1839}
1840
1841DLLEXPORT uint64_t trace_packet_get_hash(libtrace_packet_t * packet) {
1842        return packet->hash;
1843}
1844
1845DLLEXPORT void trace_packet_set_order(libtrace_packet_t * packet, uint64_t order) {
1846        packet->order = order;
1847}
1848
1849DLLEXPORT void trace_packet_set_hash(libtrace_packet_t * packet, uint64_t hash) {
1850        packet->hash = hash;
1851}
1852
1853DLLEXPORT int trace_finished(libtrace_t * libtrace) {
1854        int i;
1855        int b = 0;
1856        // TODO I don't like using this so much
1857        //assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1858        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1859                if (libtrace->perpkt_threads[i].state == THREAD_RUNNING)
1860                        b++;
1861        }
1862        //assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1863        return !b;
1864}
1865
1866DLLEXPORT int trace_parallel_config(libtrace_t *libtrace, trace_parallel_option_t option, void *value)
1867{
1868        UNUSED int ret = -1;
1869        switch (option) {
1870                case TRACE_OPTION_TICK_INTERVAL:
1871                        libtrace->tick_interval = *((int *) value);
1872                        return 1;
1873                case TRACE_OPTION_SET_HASHER:
1874                        return trace_set_hasher(libtrace, (enum hasher_types) *((int *) value), NULL, NULL);
1875                case TRACE_OPTION_SET_PERPKT_BUFFER_SIZE:
1876                        libtrace->perpkt_buffer_size = *((int *) value);
1877                        return 1;
1878                case TRACE_OPTION_SET_PACKET_FREELIST_SIZE:
1879                        libtrace->packet_freelist_size = *((int *) value);
1880                        return 1;
1881                case TRACE_OPTION_SET_PERPKT_THREAD_COUNT:
1882                        libtrace->perpkt_thread_count = *((int *) value);
1883                        return 1;
1884                case TRACE_DROP_OUT_OF_ORDER:
1885                        if (*((int *) value))
1886                                libtrace->reducer_flags |= REDUCE_DROP_OOO;
1887                        else
1888                                libtrace->reducer_flags &= ~REDUCE_DROP_OOO;
1889                        return 1;
1890                case TRACE_OPTION_SEQUENTIAL:
1891                        if (*((int *) value))
1892                                libtrace->reducer_flags |= REDUCE_SEQUENTIAL;
1893                        else
1894                                libtrace->reducer_flags &= ~REDUCE_SEQUENTIAL;
1895                        return 1;
1896                case TRACE_OPTION_ORDERED:
1897                        if (*((int *) value))
1898                                libtrace->reducer_flags |= REDUCE_ORDERED;
1899                        else
1900                                libtrace->reducer_flags &= ~REDUCE_ORDERED;
1901                        return 1;
1902                case TRACE_OPTION_USE_SLIDING_WINDOW_BUFFER:
1903                        if (*((int *) value))
1904                                libtrace->reducer_flags |= PERPKT_USE_SLIDING_WINDOW;
1905                        else
1906                                libtrace->reducer_flags &= ~PERPKT_USE_SLIDING_WINDOW;
1907                        return 1;
1908                case TRACE_OPTION_TRACETIME:
1909                        if(*((int *) value))
1910                                libtrace->tracetime = 1;
1911                        else
1912                                libtrace->tracetime = 0;
1913                        return 0;
1914        }
1915        return 0;
1916}
1917
1918DLLEXPORT libtrace_packet_t* trace_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet) {
1919        libtrace_packet_t* result;
1920        if (!libtrace_ringbuffer_try_sread_bl(&libtrace->packet_freelist, (void **) &result))
1921                result = trace_create_packet();
1922        assert(result);
1923        swap_packets(result, packet); // Move the current packet into our copy
1924        return result;
1925}
1926
1927DLLEXPORT void trace_free_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet) {
1928        // Try write back the packet
1929        assert(packet);
1930        // Always release any resources this might be holding such as a slot in a ringbuffer
1931        trace_fin_packet(packet);
1932        if (!libtrace_ringbuffer_try_swrite_bl(&libtrace->packet_freelist, packet)) {
1933                /* We couldn't, oh well lets just destroy it - XXX consider non managed formats i.e. rings buffers loosing packets and jamming up :( */
1934                //assert(1 == 90);
1935                trace_destroy_packet(packet);
1936        }
1937}
1938
1939DLLEXPORT libtrace_info_t *trace_get_information(libtrace_t * libtrace) {
1940        if (libtrace->format)
1941                return &libtrace->format->info;
1942        else
1943                return NULL;
1944}
Note: See TracBrowser for help on using the repository browser.