source: lib/trace_parallel.c @ 9857d1c

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 9857d1c was 9857d1c, checked in by Richard Sanger <rsangerarj@…>, 7 years ago

Fix some memory leaks

  • Property mode set to 100644
File size: 66.1 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * All rights reserved.
8 *
9 * This code has been developed by the University of Waikato WAND
10 * research group. For further information please see http://www.wand.net.nz/
11 *
12 * libtrace is free software; you can redistribute it and/or modify
13 * it under the terms of the GNU General Public License as published by
14 * the Free Software Foundation; either version 2 of the License, or
15 * (at your option) any later version.
16 *
17 * libtrace is distributed in the hope that it will be useful,
18 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 * GNU General Public License for more details.
21 *
22 * You should have received a copy of the GNU General Public License
23 * along with libtrace; if not, write to the Free Software
24 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25 *
26 * $Id$
27 *
28 */
29
30
31#define _GNU_SOURCE
32#include "common.h"
33#include "config.h"
34#include <assert.h>
35#include <errno.h>
36#include <fcntl.h>
37#include <stdio.h>
38#include <stdlib.h>
39#include <string.h>
40#include <sys/stat.h>
41#include <sys/types.h>
42#ifndef WIN32
43#include <sys/socket.h>
44#endif
45#include <stdarg.h>
46#include <sys/param.h>
47
48#ifdef HAVE_LIMITS_H
49#  include <limits.h>
50#endif
51
52#ifdef HAVE_SYS_LIMITS_H
53#  include <sys/limits.h>
54#endif
55
56#ifdef HAVE_NET_IF_ARP_H
57#  include <net/if_arp.h>
58#endif
59
60#ifdef HAVE_NET_IF_H
61#  include <net/if.h>
62#endif
63
64#ifdef HAVE_NETINET_IN_H
65#  include <netinet/in.h>
66#endif
67
68#ifdef HAVE_NET_ETHERNET_H
69#  include <net/ethernet.h>
70#endif
71
72#ifdef HAVE_NETINET_IF_ETHER_H
73#  include <netinet/if_ether.h>
74#endif
75
76#include <time.h>
77#ifdef WIN32
78#include <sys/timeb.h>
79#endif
80
81#include "libtrace.h"
82#include "libtrace_int.h"
83
84#ifdef HAVE_PCAP_BPF_H
85#  include <pcap-bpf.h>
86#else
87#  ifdef HAVE_NET_BPF_H
88#    include <net/bpf.h>
89#  endif
90#endif
91
92
93#include "libtrace_int.h"
94#include "format_helper.h"
95#include "rt_protocol.h"
96#include "hash_toeplitz.h"
97
98#include <pthread.h>
99#include <signal.h>
100#include <unistd.h>
101
102extern int libtrace_parallel;
103
104struct multithreading_stats {
105        uint64_t full_queue_hits;
106        uint64_t wait_for_fill_complete_hits;
107} contention_stats[1024];
108
109inline int trace_has_dedicated_hasher(libtrace_t * libtrace);
110
111/**
112 * @return True if the format supports parallel threads.
113 */
114static inline bool trace_supports_parallel(libtrace_t *trace)
115{
116        assert(trace);
117        assert(trace->format);
118        if (trace->format->pstart_input)
119                return true;
120        else
121                return false;
122        //return trace->format->pstart_input;
123}
124
125DLLEXPORT void print_contention_stats(libtrace_t *libtrace) {
126        int i;
127        struct multithreading_stats totals = {0};
128        for (i = 0; i < libtrace->perpkt_thread_count ; i++) {
129                fprintf(stderr, "\nStats for perpkt thread#%d\n", i);
130                fprintf(stderr, "\tfull_queue_hits: %"PRIu64"\n", contention_stats[i].full_queue_hits);
131                totals.full_queue_hits += contention_stats[i].full_queue_hits;
132                fprintf(stderr, "\twait_for_fill_complete_hits: %"PRIu64"\n", contention_stats[i].wait_for_fill_complete_hits);
133                totals.wait_for_fill_complete_hits += contention_stats[i].wait_for_fill_complete_hits;
134        }
135        fprintf(stderr, "\nTotals for perpkt threads\n");
136        fprintf(stderr, "\tfull_queue_hits: %"PRIu64"\n", totals.full_queue_hits);
137        fprintf(stderr, "\twait_for_fill_complete_hits: %"PRIu64"\n", totals.wait_for_fill_complete_hits);
138
139        return;
140}
141
142void libtrace_zero_thread(libtrace_thread_t * t) {
143        t->trace = NULL;
144        t->ret = NULL;
145        t->type = THREAD_EMPTY;
146        libtrace_zero_ringbuffer(&t->rbuffer);
147        libtrace_zero_vector(&t->vector);
148        libtrace_zero_deque(&t->deque);
149        t->recorded_first = false;
150        t->perpkt_num = -1;
151}
152
153// Ints are aligned int is atomic so safe to read and write at same time
154// However write must be locked, read doesn't (We never try read before written to table)
155libtrace_thread_t * get_thread_table(libtrace_t *libtrace) {
156        int i = 0;
157        pthread_t tid = pthread_self();
158
159        for (;i<libtrace->perpkt_thread_count ;++i) {
160                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
161                        return &libtrace->perpkt_threads[i];
162        }
163        return NULL;
164}
165
166int get_thread_table_num(libtrace_t *libtrace) {
167        int i = 0;
168        pthread_t tid = pthread_self();
169        for (;i<libtrace->perpkt_thread_count; ++i) {
170                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
171                        return i;
172        }
173        return -1;
174}
175
176static libtrace_thread_t * get_thread_descriptor(libtrace_t *libtrace) {
177        libtrace_thread_t *ret;
178        if (!(ret = get_thread_table(libtrace))) {
179                pthread_t tid = pthread_self();
180                // Check if we are reducer or something else
181                if (pthread_equal(tid, libtrace->reducer_thread.tid))
182                        ret = &libtrace->reducer_thread;
183                else if (pthread_equal(tid, libtrace->hasher_thread.tid))
184                        ret = &libtrace->hasher_thread;
185                else
186                        ret = NULL;
187        }
188        return ret;
189}
190
191/** Used below in trace_make_results_packets_safe*/
192static void do_copy_result_packet(void *data)
193{
194        libtrace_result_t *res = (libtrace_result_t *)data;
195        if (res->is_packet) {
196                // Duplicate the packet in standard malloc'd memory and free the
197                // original
198                libtrace_packet_t *oldpkt, *dup;
199                oldpkt = (libtrace_packet_t *) res->value;
200                dup = trace_copy_packet(oldpkt);
201                res->value = (void *)dup;
202                trace_destroy_packet(oldpkt);
203                fprintf(stderr, "Made a packet safe!!\n");
204        }
205}
206
207/**
208 * Make a safe replacement copy of any result packets that are owned
209 * by the format in the result queue. Used when pausing traces.
210 */ 
211static void trace_make_results_packets_safe(libtrace_t *trace) {
212        libtrace_thread_t *t = get_thread_descriptor(trace);
213        if (trace->reducer_flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED))
214                libtrace_deque_apply_function(&t->deque, &do_copy_result_packet);
215        else 
216                libtrace_vector_apply_function(&t->vector, &do_copy_result_packet);
217}
218
219/**
220 * Holds threads in a paused state, until released by broadcasting
221 * the condition mutex.
222 */
223static void trace_thread_pause(libtrace_t *trace) {
224        printf("Pausing thread #%d\n", get_thread_table_num(trace));
225        trace_make_results_packets_safe(trace);
226        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
227        trace->perpkts_pausing++;
228        pthread_cond_broadcast(&trace->perpkt_cond);
229        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
230                assert(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock) == 0);
231        }
232        trace->perpkts_pausing--;
233        pthread_cond_broadcast(&trace->perpkt_cond);
234        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
235        printf("Releasing thread #%d\n", get_thread_table_num(trace));
236}
237
238/**
239 * The is the entry point for our packet processing threads.
240 */
241static void* perpkt_threads_entry(void *data) {
242        libtrace_t *trace = (libtrace_t *)data;
243        libtrace_thread_t * t;
244        libtrace_message_t message = {0};
245        libtrace_packet_t *packet = NULL;
246
247        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
248        t = get_thread_table(trace);
249        assert(t);
250        //printf("Yay Started perpkt thread #%d\n", (int) get_thread_table_num(trace));
251        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
252
253        /* ~~~~~~~~~~~ Setup complete now we loop ~~~~~~~~~~~~~~~ */
254        // Send a message to say we've started
255
256        message.code = MESSAGE_STARTED;
257        message.sender = t;
258
259        // Let the per_packet function know we have started
260        (*trace->per_pkt)(trace, NULL, &message, t);
261
262
263        for (;;) {
264                int psize;
265
266                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
267                        switch (message.code) {
268                                case MESSAGE_DO_PAUSE: // This is internal
269                                        // Send message to say we are pausing, TODO consider sender
270                                        message.code = MESSAGE_PAUSING;
271                                        (*trace->per_pkt)(trace, NULL, &message, t);
272                                        // If a hasher thread is running empty input queues so we don't loose data
273                                        if (trace_has_dedicated_hasher(trace)) {
274                                                fprintf(stderr, "Trace is using a hasher thread emptying queues\n");
275                                                // The hasher has stopped by this point, so the queue shouldn't be filling
276                                                while(!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
277                                                        psize = trace_pread_packet(trace, &packet);
278                                                        if (psize > 0) {
279                                                                packet = (*trace->per_pkt)(trace, packet, NULL, t);
280                                                        } else {
281                                                                fprintf(stderr, "Psize=%d empty=%d this is probably EOF or message waiting, but if this continues we have a bug!!!\n", psize, libtrace_ringbuffer_is_empty(&t->rbuffer));
282                                                        }
283                                                }
284                                        }
285                                        // Send a paused message as a final chance to memory copy any packets
286                                        message.code = MESSAGE_PAUSED;
287                                        (*trace->per_pkt)(trace, NULL, &message, t);
288                                        // Now we do the actual pause, this returns when we are done
289                                        trace_thread_pause(trace);
290                                        // Check for new messages as soon as we return
291                                        continue;
292                                case MESSAGE_DO_STOP: // This is internal
293                                        goto stop;
294                        }
295                        (*trace->per_pkt)(trace, NULL, &message, t);
296                        continue;
297                }
298
299                if (trace->perpkt_thread_count == 1) {
300                        if (!packet) {
301                                if (!libtrace_ringbuffer_try_sread_bl(&trace->packet_freelist, (void **) &packet))
302                                        packet = trace_create_packet();
303                        }
304                        assert(packet);
305                        if ((psize = trace_read_packet(trace, packet)) <1) {
306                                break;
307                        }
308                } else {
309                        psize = trace_pread_packet(trace, &packet);
310                }
311
312                if (psize > 0) {
313                        packet = (*trace->per_pkt)(trace, packet, NULL, t);
314                        continue;
315                }
316
317                if (psize == -2)
318                        continue; // We have a message
319
320                if (psize < 1) { // consider sending a message
321                        break;
322                }
323
324        }
325
326
327stop:
328        /* ~~~~~~~~~~~~~~ Trace is finished do tear down ~~~~~~~~~~~~~~~~~~~~~ */
329        // Let the per_packet function know we have stopped
330        message.code = MESSAGE_STOPPED;
331        message.sender = NULL;
332        message.additional.uint64 = 0;
333        (*trace->per_pkt)(trace, NULL, &message, t);
334
335        // Free our last packet
336        if (packet)
337                trace_destroy_packet(packet);
338
339        // And we're at the end free the memories
340        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
341        t->state = THREAD_FINISHED;
342        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
343
344        // Notify only after we've defiantly set the state to finished
345        message.code = MESSAGE_PERPKT_ENDED;
346        message.additional.uint64 = 0;
347        trace_send_message_to_reducer(trace, &message);
348
349        pthread_exit(NULL);
350};
351
352/** True if trace has dedicated hasher thread otherwise false */
353inline int trace_has_dedicated_hasher(libtrace_t * libtrace)
354{
355        return libtrace->hasher_thread.type == THREAD_HASHER;
356}
357
358/**
359 * The start point for our single threaded hasher thread, this will read
360 * and hash a packet from a data source and queue it against the correct
361 * core to process it.
362 */
363static void* hasher_start(void *data) {
364        libtrace_t *trace = (libtrace_t *)data;
365        libtrace_thread_t * t;
366        int i;
367        libtrace_packet_t * packet;
368        libtrace_message_t message = {0};
369
370        assert(trace_has_dedicated_hasher(trace));
371        /* Wait until all threads are started and objects are initialised (ring buffers) */
372        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
373        t = &trace->hasher_thread;
374        assert(t->type == THREAD_HASHER && pthread_equal(pthread_self(), t->tid));
375        printf("Hasher Thread started\n");
376        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
377        int pkt_skipped = 0;
378        /* Read all packets in then hash and queue against the correct thread */
379        while (1) {
380                int thread;
381                if (!pkt_skipped && !libtrace_ringbuffer_try_sread_bl(&trace->packet_freelist, (void **) &packet))
382                        packet = trace_create_packet();
383                assert(packet);
384
385                if (libtrace_halt) // Signal to die has been sent - TODO
386                        break;
387
388                // Check for messages that we expect MESSAGE_DO_PAUSE, (internal messages only)
389                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
390                        switch(message.code) {
391                                case MESSAGE_DO_PAUSE:
392                                        fprintf(stderr, "Pausing hasher thread\n");
393                                        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
394                                        trace->perpkts_pausing++;
395                                        pthread_cond_broadcast(&trace->perpkt_cond);
396                                        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
397                                                assert(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock) == 0);
398                                        }
399                                        // trace->perpkts_pausing--; // Don't do this the pausing thread will do this for us
400                                        pthread_cond_broadcast(&trace->perpkt_cond);
401                                        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
402                                        fprintf(stdout, "Mapper resuming\n");
403                                        break;
404                                case MESSAGE_DO_STOP:
405                                        // Stop called after pause
406                                        assert(trace->started == false);
407                                        assert(trace->state == STATE_FINSHED);
408                                default:
409                                        fprintf(stderr, "Hasher thread didn't expect message code=%d\n", message.code);
410                        }
411                        pkt_skipped = 1;
412                        continue;
413                }
414
415                if ((packet->error = trace_read_packet(trace, packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
416                        break; /* We are EOF or error'd either way we stop  */
417                }
418
419                /* We are guaranteed to have a hash function i.e. != NULL */
420                trace_packet_set_hash(packet, (*trace->hasher)(packet, trace->hasher_data));
421                thread = trace_packet_get_hash(packet) % trace->perpkt_thread_count;
422                /* Blocking write to the correct queue - I'm the only writer */
423                if (trace->perpkt_threads[thread].state != THREAD_FINISHED) {
424                        libtrace_ringbuffer_write(&trace->perpkt_threads[thread].rbuffer, packet);
425                        pkt_skipped = 0;
426                } else {
427                        pkt_skipped = 1; // Reuse that packet no one read it
428                }
429        }
430
431        /* Broadcast our last failed read to all threads */
432        for (i = 0; i < trace->perpkt_thread_count; i++) {
433                libtrace_packet_t * bcast;
434                printf("Broadcasting error/EOF now the trace is over\n");
435                if (i == trace->perpkt_thread_count - 1) {
436                        bcast = packet;
437                } else {
438                        bcast = trace_create_packet();
439                        bcast->error = packet->error;
440                }
441                assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
442                if (trace->perpkt_threads[i].state != THREAD_FINISHED) {
443                        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
444                        // Unlock early otherwise we could deadlock
445                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, bcast);
446                } else {
447                        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
448                }
449        }
450        // We dont need to free packet
451
452        // And we're at the end free the memories
453        t->state = THREAD_FINISHED;
454
455        // Notify only after we've defiantly set the state to finished
456        message.code = MESSAGE_PERPKT_ENDED;
457        message.additional.uint64 = 0;
458        trace_send_message_to_reducer(trace, &message);
459
460        // TODO remove from TTABLE t sometime
461        pthread_exit(NULL);
462};
463
464/**
465 * Moves src into dest(Complete copy) and copies the memory buffer and
466 * its flags from dest into src ready for reuse without needing extra mallocs.
467 */
468static inline void swap_packets(libtrace_packet_t *dest, libtrace_packet_t *src) {
469        // Save the passed in buffer status
470        assert(dest->trace == NULL); // Must be a empty packet
471        void * temp_buf = dest->buffer;
472        buf_control_t temp_buf_control = dest->buf_control;
473        // Completely copy StoredPacket into packet
474        memcpy(dest, src, sizeof(libtrace_packet_t));
475        // Set the buffer settings on the returned packet
476        src->buffer = temp_buf;
477        src->buf_control = temp_buf_control;
478        src->trace = NULL;
479}
480
481/* Our simplest case when a thread becomes ready it can obtain a exclusive
482 * lock to read a packet from the underlying trace.
483 */
484inline static int trace_pread_packet_first_in_first_served(libtrace_t *libtrace, libtrace_packet_t **packet)
485{
486        // We need this to fill the 'first' packet table
487        libtrace_thread_t *t = get_thread_table(libtrace);
488        if (!*packet) {
489                if (!libtrace_ringbuffer_try_sread_bl(&libtrace->packet_freelist, (void **) packet))
490                        *packet = trace_create_packet();
491        }
492        assert(*packet);
493        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
494        /* Read a packet */
495        (*packet)->error = trace_read_packet(libtrace, *packet);
496        // Doing this inside the lock ensures the first packet is always
497        // recorded first
498        if ((*packet)->error > 0)
499                store_first_packet(libtrace, *packet, t);
500
501        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
502        return (*packet)->error;
503}
504
505/**
506 * For the case that we have a dedicated hasher thread
507 * 1. We read a packet from our buffer
508 * 2. Move that into the packet provided (packet)
509 */
510inline static int trace_pread_packet_hasher_thread(libtrace_t *libtrace, libtrace_packet_t **packet)
511{
512        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
513        libtrace_thread_t* t = &libtrace->perpkt_threads[this_thread];
514
515        if (*packet) // Recycle the old get the new
516                if (!libtrace_ringbuffer_try_swrite_bl(&libtrace->packet_freelist, (void *) *packet))
517                        trace_destroy_packet(*packet);
518        *packet = libtrace_ringbuffer_read(&t->rbuffer);
519
520        if (*packet) {
521                return (*packet)->error;
522        } else {
523                // This is how we do a notify, we send a message before hand to note that the trace is over etc.
524                // And this will notify the perpkt thread to read that message, this is easiest
525                // since cases like pause can also be dealt with this way without actually
526                // having to be the end of the stream.
527                fprintf(stderr, "Got a NULL packet from hasher therefore message waiting\n");
528                return -2;
529        }
530}
531
532/**
533 * Tries to read from our queue and returns 1 if a packet was retrieved
534 */
535static inline int try_waiting_queue(libtrace_t *libtrace, libtrace_thread_t * t, libtrace_packet_t **packet, int * ret)
536{
537        libtrace_packet_t* retrived_packet;
538
539        /* Lets see if we have one waiting */
540        if (libtrace_ringbuffer_try_read(&t->rbuffer, (void **) &retrived_packet)) {
541                /* Copy paste from trace_pread_packet_hasher_thread() except that we try read (non-blocking) */
542                assert(retrived_packet);
543
544                if (*packet) // Recycle the old get the new
545                        if (!libtrace_ringbuffer_try_swrite_bl(&libtrace->packet_freelist, (void *) *packet))
546                                trace_destroy_packet(*packet);
547                *packet = retrived_packet;
548                *ret = (*packet)->error;
549                return 1;
550        }
551        return 0;
552}
553
554/**
555 * Allows us to ensure all threads are finished writing to our threads ring_buffer
556 * before returning EOF/error.
557 */
558inline static int trace_handle_finishing_perpkt(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t)
559{
560        /* We are waiting for the condition that another thread ends to check
561         * our queue for new data, once all threads end we can go to finished */
562        bool complete = false;
563        int ret;
564
565        do {
566                // Wait for a thread to end
567                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
568
569                // Check before
570                if (libtrace->perpkts_finishing == libtrace->perpkt_thread_count) {
571                        complete = true;
572                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
573                        continue;
574                }
575
576                assert(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock) == 0);
577
578                // Check after
579                if (libtrace->perpkts_finishing == libtrace->perpkt_thread_count) {
580                        complete = true;
581                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
582                        continue;
583                }
584
585                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
586
587                // Always trying to keep our buffer empty for the unlikely case more threads than buffer space want to write into our queue
588                if(try_waiting_queue(libtrace, t, packet, &ret))
589                        return ret;
590        } while (!complete);
591
592        // We can only end up here once all threads complete
593        try_waiting_queue(libtrace, t, packet, &ret);
594
595        return ret;
596        // TODO rethink this logic fix bug here
597}
598
599/**
600 * Expects the libtrace_lock to not be held
601 */
602inline static int trace_finish_perpkt(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t)
603{
604        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
605        t->state = THREAD_FINISHING;
606        libtrace->perpkts_finishing++;
607        pthread_cond_broadcast(&libtrace->perpkt_cond);
608        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
609        return trace_handle_finishing_perpkt(libtrace, packet, t);
610}
611
612/**
613 * This case is much like the dedicated hasher, except that we will become
614 * hasher if we don't have a a packet waiting.
615 *
616 * Note: This is only every used if we have are doing hashing.
617 *
618 * TODO: Can block on zero copy formats such as ring: and dpdk: if the
619 * queue sizes in total are larger than the ring size.
620 *
621 * 1. We read a packet from our buffer
622 * 2. Move that into the packet provided (packet)
623 */
624inline static int trace_pread_packet_hash_locked(libtrace_t *libtrace, libtrace_packet_t **packet)
625{
626        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
627        libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
628        int thread, ret/*, psize*/;
629
630        while (1) {
631                if(try_waiting_queue(libtrace, t, packet, &ret))
632                        return ret;
633                // Can still block here if another thread is writing to a full queue
634                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
635
636                // Its impossible for our own queue to overfill, because no one can write
637                // when we are in the lock
638                if(try_waiting_queue(libtrace, t, packet, &ret)) {
639                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
640                        return ret;
641                }
642
643                // Another thread cannot write a packet because a queue has filled up. Is it ours?
644                if (libtrace->perpkt_queue_full) {
645                        contention_stats[this_thread].wait_for_fill_complete_hits++;
646                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
647                        continue;
648                }
649
650                if (!*packet) {
651                        if (!libtrace_ringbuffer_try_sread_bl(&libtrace->packet_freelist, (void **) packet))
652                                *packet = trace_create_packet();
653                }
654                assert(*packet);
655
656                // If we fail here we can guarantee that our queue is empty (and no new data will be added because we hold the lock)
657                if (libtrace_halt || ((*packet)->error = trace_read_packet(libtrace, *packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
658                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
659                        if (libtrace_halt)
660                                return 0;
661                        else
662                                return (*packet)->error;
663                }
664
665                trace_packet_set_hash(*packet, (*libtrace->hasher)(*packet, libtrace->hasher_data));
666                thread = trace_packet_get_hash(*packet) % libtrace->perpkt_thread_count;
667                if (thread == this_thread) {
668                        // If it's this thread we must be in order because we checked the buffer once we got the lock
669                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
670                        return (*packet)->error;
671                }
672
673                if (libtrace->perpkt_threads[thread].state != THREAD_FINISHED) {
674                        while (!libtrace_ringbuffer_try_swrite_bl(&libtrace->perpkt_threads[thread].rbuffer, *packet)) {
675                                libtrace->perpkt_queue_full = true;
676                                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
677                                contention_stats[this_thread].full_queue_hits++;
678                                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
679                        }
680                        *packet = NULL;
681                        libtrace->perpkt_queue_full = false;
682                } else {
683                        /* We can get here if the user closes the thread before natural completion/or error */
684                        assert (!"packet_hash_locked() The user terminated the trace in a abnormal manner");
685                }
686                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
687        }
688}
689
690/**
691 * This case is much like the dedicated hasher, except that we will become
692 * hasher if we don't have a a packet waiting.
693 *
694 * TODO: You can loose the tail of a trace if the final thread
695 * fills its own queue and therefore breaks early and doesn't empty the sliding window.
696 *
697 * TODO: Can block on zero copy formats such as ring: and dpdk: if the
698 * queue sizes in total are larger than the ring size.
699 *
700 * 1. We read a packet from our buffer
701 * 2. Move that into the packet provided (packet)
702 */
703inline static int trace_pread_packet_sliding_window(libtrace_t *libtrace, libtrace_packet_t **packet)
704{
705        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
706        libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
707        int ret, i, thread/*, psize*/;
708
709        if (t->state == THREAD_FINISHING)
710                return trace_handle_finishing_perpkt(libtrace, packet, t);
711
712        while (1) {
713                // Check if we have packets ready
714                if(try_waiting_queue(libtrace, t, packet, &ret))
715                        return ret;
716
717                // We limit the number of packets we get to the size of the sliding window
718                // such that it is impossible for any given thread to fail to store a packet
719                assert(sem_wait(&libtrace->sem) == 0);
720                /*~~~~Single threaded read of a packet~~~~*/
721                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
722
723                /* Re-check our queue things we might have data waiting */
724                if(try_waiting_queue(libtrace, t, packet, &ret)) {
725                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
726                        assert(sem_post(&libtrace->sem) == 0);
727                        return ret;
728                }
729
730                // TODO put on *proper* condition variable
731                if (libtrace->perpkt_queue_full) {
732                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
733                        assert(sem_post(&libtrace->sem) == 0);
734                        contention_stats[this_thread].wait_for_fill_complete_hits++;
735                        continue;
736                }
737
738                if (!*packet) {
739                        if (!libtrace_ringbuffer_try_sread_bl(&libtrace->packet_freelist, (void **) packet))
740                                *packet = trace_create_packet();
741                }
742                assert(*packet);
743
744                if (libtrace_halt || ((*packet)->error = trace_read_packet(libtrace, *packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
745                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
746                        assert(sem_post(&libtrace->sem) == 0);
747                        // Finish this thread ensuring that any data written later by another thread is retrieved also
748                        if (libtrace_halt)
749                                return 0;
750                        else
751                                return trace_finish_perpkt(libtrace, packet, t);
752                }
753                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
754
755                /* ~~~~Multiple threads can run the hasher~~~~ */
756                trace_packet_set_hash(*packet, (*libtrace->hasher)(*packet, libtrace->hasher_data));
757
758                /* Yes this is correct opposite read lock for a write operation */
759                assert(pthread_rwlock_rdlock(&libtrace->window_lock) == 0);
760                if (!libtrace_slidingwindow_try_write(&libtrace->sliding_window, trace_packet_get_order(*packet), *packet))
761                        assert(!"Semaphore should stop us from ever overfilling the sliding window");
762                assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
763                *packet = NULL;
764
765                // Always try read any data from the sliding window
766                while (libtrace_slidingwindow_read_ready(&libtrace->sliding_window)) {
767                        assert(pthread_rwlock_wrlock(&libtrace->window_lock) == 0);
768                        if (libtrace->perpkt_queue_full) {
769                                // I might be the holdup in which case if I can read my queue I should do that and return
770                                if(try_waiting_queue(libtrace, t, packet, &ret)) {
771                                        assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
772                                        return ret;
773                                }
774                                assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
775                                continue;
776                        }
777                        // Read greedily as many as we can
778                        while (libtrace_slidingwindow_try_read(&libtrace->sliding_window, (void **) packet, NULL)) {
779                                thread = trace_packet_get_hash(*packet) % libtrace->perpkt_thread_count;
780                                if (libtrace->perpkt_threads[thread].state != THREAD_FINISHED) {
781                                        while (!libtrace_ringbuffer_try_swrite_bl(&libtrace->perpkt_threads[thread].rbuffer, *packet)) {
782                                                if (this_thread == thread)
783                                                {
784                                                        // TODO think about this case more because we have to stop early if this were to happen on the last read
785                                                        // before EOF/error we might not have emptied the sliding window
786                                                        printf("!~!~!~!~!~!~In this Code~!~!~!~!\n");
787                                                        // Its our queue we must have a packet to read out
788                                                        if(try_waiting_queue(libtrace, t, packet, &ret)) {
789                                                                // We must be able to write this now 100% without fail
790                                                                libtrace_ringbuffer_write(&libtrace->perpkt_threads[thread].rbuffer, *packet);
791                                                                assert(sem_post(&libtrace->sem) == 0);
792                                                                assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
793                                                                return ret;
794                                                        } else {
795                                                                assert(!"Our queue is full but I cannot read from it??");
796                                                        }
797                                                }
798                                                // Not us we have to give the other threads a chance to write there packets then
799                                                libtrace->perpkt_queue_full = true;
800                                                assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
801                                                for (i = 0; i < libtrace->perpkt_thread_count-1; i++) // Release all other threads to read there packets
802                                                        assert(sem_post(&libtrace->sem) == 0);
803
804                                                contention_stats[this_thread].full_queue_hits++;
805                                                assert(pthread_rwlock_wrlock(&libtrace->window_lock) == 0);
806                                                // Grab these back
807                                                for (i = 0; i < libtrace->perpkt_thread_count-1; i++) // Release all other threads to read there packets
808                                                        assert(sem_wait(&libtrace->sem) == 0);
809                                                libtrace->perpkt_queue_full = false;
810                                        }
811                                        assert(sem_post(&libtrace->sem) == 0);
812                                        *packet = NULL;
813                                } else {
814                                        // Cannot write to a queue if no ones waiting (I think this is unreachable)
815                                        // in the general case (unless the user ends early without proper clean up).
816                                        assert (!"unreachable code??");
817                                }
818                        }
819                        assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
820                }
821                // Now we go back to checking our queue anyways
822        }
823}
824
825
826/**
827 * For the first packet of each queue we keep a copy and note the system
828 * time it was received at.
829 *
830 * This is used for finding the first packet when playing back a trace
831 * in trace time. And can be used by real time applications to print
832 * results out every XXX seconds.
833 */
834void store_first_packet(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t)
835{
836        if (!t->recorded_first) {
837                struct timeval tv;
838                libtrace_packet_t * dup;
839                // For what it's worth we can call these outside of the lock
840                gettimeofday(&tv, NULL);
841                dup = trace_copy_packet(packet);
842                assert(pthread_spin_lock(&libtrace->first_packets.lock) == 0);
843                libtrace->first_packets.packets[t->perpkt_num].packet = dup;
844                //printf("Stored first packet time=%f\n", trace_get_seconds(dup));
845                memcpy(&libtrace->first_packets.packets[t->perpkt_num].tv, &tv, sizeof(tv));
846                // Now update the first
847                libtrace->first_packets.count++;
848                if (libtrace->first_packets.count == 1) {
849                        // We the first entry hence also the first known packet
850                        libtrace->first_packets.first = t->perpkt_num;
851                } else {
852                        // Check if we are newer than the previous 'first' packet
853                        size_t first = libtrace->first_packets.first;
854                        if (trace_get_seconds(dup) <
855                                trace_get_seconds(libtrace->first_packets.packets[first].packet))
856                                libtrace->first_packets.first = t->perpkt_num;
857                }
858                assert(pthread_spin_unlock(&libtrace->first_packets.lock) == 0);
859                libtrace_message_t mesg = {0};
860                mesg.code = MESSAGE_FIRST_PACKET;
861                trace_send_message_to_reducer(libtrace, &mesg);
862                t->recorded_first = true;
863        }
864}
865
866/**
867 * Returns 1 if it's certain that the first packet is truly the first packet
868 * rather than a best guess based upon threads that have published so far.
869 * Otherwise 0 is returned.
870 * It's recommended that this result is stored rather than calling this
871 * function again.
872 */
873DLLEXPORT int retrive_first_packet(libtrace_t *libtrace, libtrace_packet_t **packet, struct timeval **tv)
874{
875        int ret = 0;
876        assert(pthread_spin_lock(&libtrace->first_packets.lock) == 0);
877        if (libtrace->first_packets.count) {
878                *packet = libtrace->first_packets.packets[libtrace->first_packets.first].packet;
879                *tv = &libtrace->first_packets.packets[libtrace->first_packets.first].tv;
880                if (libtrace->first_packets.count == (size_t) libtrace->perpkt_thread_count) {
881                        ret = 1;
882                } else {
883                        struct timeval curr_tv;
884                        // If a second has passed since the first entry we will assume this is the very first packet
885                        gettimeofday(&curr_tv, NULL);
886                        if (curr_tv.tv_sec > (*tv)->tv_sec) {
887                                if(curr_tv.tv_usec > (*tv)->tv_usec || curr_tv.tv_sec - (*tv)->tv_sec > 1) {
888                                        ret = 1;
889                                }
890                        }
891                }
892        } else {
893                *packet = NULL;
894                *tv = NULL;
895        }
896        assert(pthread_spin_unlock(&libtrace->first_packets.lock) == 0);
897        return ret;
898}
899
900
901DLLEXPORT uint64_t tv_to_usec(struct timeval *tv)
902{
903        return (uint64_t) tv->tv_sec*1000000ull + (uint64_t) tv->tv_usec;
904}
905
906inline static struct timeval usec_to_tv(uint64_t usec)
907{
908        struct timeval tv;
909        tv.tv_sec = usec / 1000000;
910        tv.tv_usec = usec % 1000000;
911        return tv;
912}
913
914/** Similar to delay_tracetime but send messages to all threads periodically */
915static void* keepalive_entry(void *data) {
916        struct timeval prev, next;
917        libtrace_message_t message = {0};
918        libtrace_t *trace = (libtrace_t *)data;
919        uint64_t next_release;
920        fprintf(stderr, "keepalive thread is starting\n");
921
922        gettimeofday(&prev, NULL);
923        message.code = MESSAGE_TICK;
924        while (trace->state != STATE_FINSHED) {
925                fd_set rfds;
926                next_release = tv_to_usec(&prev) + (trace->tick_interval * 1000);
927                gettimeofday(&next, NULL);
928                if (next_release > tv_to_usec(&next)) {
929                        next = usec_to_tv(next_release - tv_to_usec(&next));
930                        // Wait for timeout or a message
931                        FD_ZERO(&rfds);
932                FD_SET(libtrace_message_queue_get_fd(&trace->keepalive_thread.messages), &rfds);
933                        if (select(libtrace_message_queue_get_fd(&trace->keepalive_thread.messages)+1, &rfds, NULL, NULL, &next) == 1) {
934                                libtrace_message_t msg;
935                                libtrace_message_queue_get(&trace->keepalive_thread.messages, &msg);
936                                assert(msg.code == MESSAGE_DO_STOP);
937                                goto done;
938                        }
939                }
940                prev = usec_to_tv(next_release);
941                if (trace->state == STATE_RUNNING) {
942                        message.additional.uint64 = tv_to_usec(&prev);
943                        trace_send_message_to_perpkts(trace, &message);
944                }
945        }
946done:
947        fprintf(stderr, "keepalive thread is finishing\n");
948        trace->keepalive_thread.state = THREAD_FINISHED;
949        return NULL;
950}
951
952/**
953 * Delays a packets playback so the playback will be in trace time
954 */
955static inline void delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t) {
956        struct timeval curr_tv, pkt_tv;
957        uint64_t next_release = t->tracetime_offset_usec; // Time at which to release the packet
958        uint64_t curr_usec;
959        /* Tracetime we might delay releasing this packet */
960        if (!t->tracetime_offset_usec) {
961                libtrace_packet_t * first_pkt;
962                struct timeval *sys_tv;
963                int64_t initial_offset;
964                int stable = retrive_first_packet(libtrace, &first_pkt, &sys_tv);
965                assert(first_pkt);
966                pkt_tv = trace_get_timeval(first_pkt);
967                initial_offset = (int64_t)tv_to_usec(sys_tv) - (int64_t)tv_to_usec(&pkt_tv);
968                if (stable)
969                        // 0->1 because 0 is used to mean unset
970                        t->tracetime_offset_usec = initial_offset ? initial_offset: 1;
971                next_release = initial_offset;
972        }
973        /* next_release == offset */
974        pkt_tv = trace_get_timeval(packet);
975        next_release += tv_to_usec(&pkt_tv);
976        gettimeofday(&curr_tv, NULL);
977        curr_usec = tv_to_usec(&curr_tv);
978        if (next_release > curr_usec) {
979                // We need to wait
980                struct timeval delay_tv = usec_to_tv(next_release-curr_usec);
981                //printf("WAITING for %d.%d next=%"PRIu64" curr=%"PRIu64" seconds packettime %f\n", delay_tv.tv_sec, delay_tv.tv_usec, next_release, curr_usec, trace_get_seconds(packet));
982                select(0, NULL, NULL, NULL, &delay_tv);
983        }
984}
985
986/* Read one packet from the trace into a buffer. Note that this function will
987 * block until a packet is read (or EOF is reached).
988 *
989 * @param libtrace      the libtrace opaque pointer
990 * @param packet        the packet opaque pointer
991 * @returns 0 on EOF, negative value on error
992 *
993 * Note this is identical to read_packet but calls pread_packet instead of
994 * read packet in the format.
995 *
996 */
997static inline int trace_pread_packet_wrapper(libtrace_t *libtrace, libtrace_packet_t *packet) {
998
999        assert(libtrace && "You called trace_read_packet() with a NULL libtrace parameter!\n");
1000        if (trace_is_err(libtrace))
1001                return -1;
1002        if (!libtrace->started) {
1003                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,"You must call libtrace_start() before trace_read_packet()\n");
1004                return -1;
1005        }
1006        if (!(packet->buf_control==TRACE_CTRL_PACKET || packet->buf_control==TRACE_CTRL_EXTERNAL)) {
1007                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,"Packet passed to trace_read_packet() is invalid\n");
1008                return -1;
1009        }
1010        assert(packet);
1011
1012        if (libtrace->format->read_packet) {
1013                do {
1014                        size_t ret;
1015                        /* Finalise the packet, freeing any resources the format module
1016                         * may have allocated it and zeroing all data associated with it.
1017                         */
1018                        trace_fin_packet(packet);
1019                        /* Store the trace we are reading from into the packet opaque
1020                         * structure */
1021                        packet->trace = libtrace;
1022                        ret=libtrace->format->pread_packet(libtrace,packet);
1023                        if (ret==(size_t)-1 || ret==(size_t)-2 || ret==0) {
1024                                return ret;
1025                        }
1026                        if (libtrace->filter) {
1027                                /* If the filter doesn't match, read another
1028                                 * packet
1029                                 */
1030                                if (!trace_apply_filter(libtrace->filter,packet)){
1031                                        ++libtrace->filtered_packets;
1032                                        continue;
1033                                }
1034                        }
1035                        if (libtrace->snaplen>0) {
1036                                /* Snap the packet */
1037                                trace_set_capture_length(packet,
1038                                                libtrace->snaplen);
1039                        }
1040                        trace_packet_set_order(packet, libtrace->accepted_packets);
1041                        ++libtrace->accepted_packets;
1042                        return ret;
1043                } while(1);
1044        }
1045        trace_set_err(libtrace,TRACE_ERR_UNSUPPORTED,"This format does not support reading packets\n");
1046        return ~0U;
1047}
1048
1049/**
1050 * Read a packet from the parallel trace
1051 */
1052DLLEXPORT int trace_pread_packet(libtrace_t *libtrace, libtrace_packet_t **packet)
1053{
1054        int ret;
1055        libtrace_thread_t *t = get_thread_table(libtrace);
1056
1057        // Cleanup the packet passed back
1058        if (*packet)
1059                trace_fin_packet(*packet);
1060
1061        if (trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
1062                if (!*packet)
1063                        *packet = trace_create_packet();
1064                ret = trace_pread_packet_wrapper(libtrace, *packet);
1065        } else if (trace_has_dedicated_hasher(libtrace)) {
1066                ret = trace_pread_packet_hasher_thread(libtrace, packet);
1067        } else if (!trace_has_dedicated_hasher(libtrace)) {
1068                /* We don't care about which core a packet goes to */
1069                ret = trace_pread_packet_first_in_first_served(libtrace, packet);
1070        } /* else if (libtrace->reducer_flags & PERPKT_USE_SLIDING_WINDOW) {
1071                ret = trace_pread_packet_sliding_window(libtrace, packet);
1072        } else {
1073                ret = trace_pread_packet_hash_locked(libtrace, packet);
1074        }*/
1075
1076        // Formats can also optionally do this internally to ensure the first
1077        // packet is always reported correctly
1078        if (ret > 0) {
1079                store_first_packet(libtrace, *packet, t);
1080                if (libtrace->tracetime)
1081                        delay_tracetime(libtrace, *packet, t);
1082        }
1083
1084        return ret;
1085}
1086
1087/* Starts perpkt threads
1088 * @return threads_started
1089 */
1090static inline int trace_start_perpkt_threads (libtrace_t *libtrace) {
1091        int i;
1092
1093        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1094                libtrace_thread_t *t = &libtrace->perpkt_threads[i];
1095                assert(pthread_create(&t->tid, NULL, perpkt_threads_entry, (void *) libtrace) == 0);
1096        }
1097        return libtrace->perpkt_thread_count;
1098}
1099
1100/* Start an input trace in a parallel fashion.
1101 *
1102 * @param libtrace the input trace to start
1103 * @param global_blob some global data you can share with the new perpkt threads
1104 * @returns 0 on success
1105 */
1106DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob, fn_per_pkt per_pkt, fn_reducer reducer)
1107{
1108        int i;
1109        sigset_t sig_before, sig_block_all;
1110
1111        assert(libtrace);
1112        if (trace_is_err(libtrace))
1113                return -1;
1114        if (libtrace->state == STATE_PAUSED) {
1115                assert (libtrace->perpkts_pausing != 0);
1116                fprintf(stderr, "Restarting trace\n");
1117                UNUSED int err;
1118                if (libtrace->perpkt_thread_count > 1 && trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
1119                        printf("This format has direct support for p's\n");
1120                        err = libtrace->format->pstart_input(libtrace);
1121                } else {
1122                        if (libtrace->format->start_input) {
1123                                err = libtrace->format->start_input(libtrace);
1124                        }
1125                }
1126                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1127                libtrace->started = true;
1128                libtrace->state = STATE_RUNNING;
1129                assert(pthread_cond_broadcast(&libtrace->perpkt_cond) == 0);
1130                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1131                return 0;
1132        }
1133
1134        assert(libtrace->state == STATE_NEW);
1135        libtrace_parallel = 1;
1136
1137        // Store the user defined things against the trace
1138        libtrace->global_blob = global_blob;
1139        libtrace->per_pkt = per_pkt;
1140        libtrace->reducer = reducer;
1141        libtrace->perpkts_finishing = 0;
1142
1143        assert(pthread_mutex_init(&libtrace->libtrace_lock, NULL) == 0);
1144        assert(pthread_cond_init(&libtrace->perpkt_cond, NULL) == 0);
1145        assert(pthread_rwlock_init(&libtrace->window_lock, NULL) == 0);
1146        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1147
1148        // Set default buffer sizes
1149        if (libtrace->perpkt_buffer_size <= 0)
1150                libtrace->perpkt_buffer_size = 1000;
1151
1152        if (libtrace->perpkt_thread_count <= 0) {
1153                // TODO add BSD support
1154                libtrace->perpkt_thread_count = sysconf(_SC_NPROCESSORS_ONLN);
1155                if (libtrace->perpkt_thread_count <= 0)
1156                        // Lets just use one
1157                        libtrace->perpkt_thread_count = 1;
1158        }
1159
1160        if(libtrace->packet_freelist_size <= 0)
1161                libtrace->packet_freelist_size = (libtrace->perpkt_buffer_size + 1) * libtrace->perpkt_thread_count;
1162
1163        if(libtrace->packet_freelist_size <
1164                (libtrace->perpkt_buffer_size + 1) * libtrace->perpkt_thread_count)
1165                fprintf(stderr, "WARNING deadlocks may occur and extra memory allocating buffer sizes (packet_freelist_size) mismatched\n");
1166
1167        libtrace->started=true; // Before we start the threads otherwise we could have issues
1168        libtrace->state = STATE_RUNNING;
1169        /* Disable signals - Pthread signal handling */
1170
1171        sigemptyset(&sig_block_all);
1172
1173        assert(pthread_sigmask(SIG_SETMASK, &sig_block_all, &sig_before) == 0);
1174
1175        // If we are using a hasher start it
1176        if (libtrace->hasher || libtrace->hasher_thread.type == THREAD_HASHER) {
1177                libtrace_thread_t *t = &libtrace->hasher_thread;
1178                t->trace = libtrace;
1179                t->ret = NULL;
1180                t->type = THREAD_HASHER;
1181                t->state = THREAD_RUNNING;
1182                libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
1183                assert(pthread_create(&t->tid, NULL, hasher_start, (void *) libtrace) == 0);
1184        } else {
1185                libtrace->hasher_thread.type = THREAD_EMPTY;
1186        }
1187        libtrace_ringbuffer_init(&libtrace->packet_freelist, libtrace->packet_freelist_size, LIBTRACE_RINGBUFFER_BLOCKING);
1188        //libtrace_slidingwindow_init(&libtrace->sliding_window, libtrace->packet_freelist_size, 0);
1189        assert(sem_init(&libtrace->sem, 0, libtrace->packet_freelist_size) == 0);
1190        // This will be applied to every new thread that starts, i.e. they will block all signals
1191        // Lets start a fixed number of reading threads
1192
1193        // For now we never have a dedicated thread for the reducer
1194        // i.e. This main thread is used as the reducer
1195        libtrace->reducer_thread.tid = pthread_self();
1196        libtrace->reducer_thread.type = THREAD_REDUCER;
1197        libtrace->reducer_thread.state = THREAD_RUNNING;
1198        libtrace_message_queue_init(&libtrace->reducer_thread.messages, sizeof(libtrace_message_t));
1199
1200        /* Ready some storages */
1201        libtrace->first_packets.first = 0;
1202        libtrace->first_packets.count = 0;
1203        assert(pthread_spin_init(&libtrace->first_packets.lock, 0) == 0);
1204        libtrace->first_packets.packets = calloc(libtrace->perpkt_thread_count, sizeof(struct  __packet_storage_magic_type));
1205
1206
1207        /* Start all of our perpkt threads */
1208        libtrace->perpkt_threads = calloc(sizeof(libtrace_thread_t), libtrace->perpkt_thread_count);
1209        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1210                libtrace_thread_t *t = &libtrace->perpkt_threads[i];
1211                t->trace = libtrace;
1212                t->ret = NULL;
1213                t->type = THREAD_PERPKT;
1214                t->state = THREAD_RUNNING;
1215                t->user_data = NULL;
1216                // t->tid DONE on create
1217                t->perpkt_num = i;
1218                if (libtrace->hasher)
1219                        libtrace_ringbuffer_init(&t->rbuffer, libtrace->perpkt_buffer_size, LIBTRACE_RINGBUFFER_BLOCKING);
1220                // Depending on the mode vector or deque might be chosen
1221                libtrace_vector_init(&t->vector, sizeof(libtrace_result_t));
1222                libtrace_deque_init(&t->deque, sizeof(libtrace_result_t));
1223                libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
1224                t->tmp_key = 0;
1225                t->tmp_data = NULL;
1226                t->recorded_first = false;
1227                assert(pthread_spin_init(&t->tmp_spinlock, 0) == 0);
1228                t->tracetime_offset_usec = 0;;
1229        }
1230
1231        int threads_started = 0;
1232        /* Setup the trace and start our threads */
1233        if (libtrace->perpkt_thread_count > 1 && trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
1234                printf("This format has direct support for p's\n");
1235                threads_started = libtrace->format->pstart_input(libtrace);
1236        } else {
1237                if (libtrace->format->start_input) {
1238                        threads_started=libtrace->format->start_input(libtrace);
1239                }
1240        }
1241        if (threads_started == 0)
1242                threads_started = trace_start_perpkt_threads(libtrace);
1243
1244        if (libtrace->tick_interval > 0) {
1245                libtrace->keepalive_thread.type = THREAD_KEEPALIVE;
1246                libtrace->keepalive_thread.state = THREAD_RUNNING;
1247                libtrace_message_queue_init(&libtrace->keepalive_thread.messages, sizeof(libtrace_message_t));
1248                assert(pthread_create(&libtrace->keepalive_thread.tid, NULL, keepalive_entry, (void *) libtrace) == 0);
1249        }
1250
1251        // Revert back - Allow signals again
1252        assert(pthread_sigmask(SIG_SETMASK, &sig_before, NULL) == 0);
1253        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1254
1255        if (threads_started < 0)
1256                // Error
1257                return threads_started;
1258
1259        // TODO fix these leaks etc
1260        if (libtrace->perpkt_thread_count != threads_started)
1261                printf("Warning started threads not equal requested s=%d r=%d", threads_started, libtrace->perpkt_thread_count);
1262
1263
1264        return 0;
1265}
1266
1267/**
1268 * Pauses a trace, this should only be called by the main thread
1269 * 1. Set started = false
1270 * 2. All perpkt threads are paused waiting on a condition var
1271 * 3. Then call ppause on the underlying format if found
1272 * 4. Return with perpkts_pausing set to perpkt_count (Used when restarting so we reuse the threads)
1273 *
1274 * Once done you should be able to modify the trace setup and call pstart again
1275 * TODO handle changing thread numbers
1276 */
1277DLLEXPORT int trace_ppause(libtrace_t *libtrace)
1278{
1279        libtrace_thread_t *t;
1280        int i;
1281        assert(libtrace);
1282        if (!libtrace->started || libtrace->state != STATE_RUNNING) {
1283                fprintf(stderr, "pause failed started=%d state=%s (%d)\n", libtrace->started, get_trace_state_name(libtrace->state), libtrace->state);
1284                trace_set_err(libtrace,TRACE_ERR_BAD_STATE, "You must call trace_start() before calling trace_ppause()");
1285                return -1;
1286        }
1287
1288        t = get_thread_table(libtrace);
1289
1290        // Set pausing
1291        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1292        libtrace->state = STATE_PAUSING;
1293        pthread_cond_broadcast(&libtrace->perpkt_cond);
1294        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1295
1296        // Special case handle the hasher thread case
1297        if (trace_has_dedicated_hasher(libtrace)) {
1298                fprintf(stderr, "Hasher thread running we deal with this special!\n");
1299                libtrace_message_t message = {0};
1300                message.code = MESSAGE_DO_PAUSE;
1301                trace_send_message_to_thread(libtrace, &libtrace->hasher_thread, &message);
1302                // Wait for it to pause
1303                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1304                while (1 != libtrace->perpkts_pausing) {
1305                        assert(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock) == 0);
1306                }
1307                libtrace->perpkts_pausing--; // Do this on the hasher's behalf
1308                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1309        }
1310
1311        fprintf(stderr, "Sending messages \n");
1312        // Stop threads, skip this one if its a perpkt
1313        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1314                if (&libtrace->perpkt_threads[i] != t) {
1315                        libtrace_message_t message = {0};
1316                        message.code = MESSAGE_DO_PAUSE;
1317                        trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message);
1318                        if(trace_has_dedicated_hasher(libtrace)) {
1319                                // The hasher has stopped and other threads have messages waiting therefore
1320                                // If the queues are empty the other threads would have no data
1321                                // So send some NULL packets to simply ask the threads to check there message queues
1322                                // We are the only writer since hasher has paused
1323                                libtrace_ringbuffer_write(&libtrace->perpkt_threads[i].rbuffer, NULL);
1324                        }
1325                } else {
1326                        fprintf(stderr, "Mapper threads should not be used to pause a trace this could cause any number of problems!!\n");
1327                }
1328        }
1329
1330        // Formats must support native message handling if a message is ready
1331        // Approach per Perry's suggestion is a non-blocking read
1332        // followed by a blocking read. XXX STRIP THIS OUT
1333
1334        if (t) {
1335                // A perpkt is doing the pausing interesting fake a extra thread paused
1336                // We rely on the user to not return before starting the trace again
1337                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1338                libtrace->perpkts_pausing++;
1339                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1340        }
1341
1342        fprintf(stderr, "Asking threads to pause\n");
1343
1344        // Wait for all threads to pause
1345        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1346        while (libtrace->perpkt_thread_count != libtrace->perpkts_pausing) {
1347                assert(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock) == 0);
1348        }
1349        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1350
1351        fprintf(stderr, "Threads have paused\n");
1352
1353        if (trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
1354                libtrace->started = false;
1355                if (libtrace->format->ppause_input)
1356                        libtrace->format->ppause_input(libtrace);
1357                // TODO What happens if we don't have pause input??
1358        } else {
1359                int err;
1360                fprintf(stderr, "Trace is not parallel so we are doing a normal pause %s\n", libtrace->uridata);
1361                err = trace_pause(libtrace);
1362                // We should handle this a bit better
1363                if (err)
1364                        return err;
1365        }
1366
1367        // Only set as paused after the pause has been called on the trace
1368        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1369        libtrace->state = STATE_PAUSED;
1370        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1371        return 0;
1372}
1373
1374/**
1375 * Stop trace finish prematurely as though it meet an EOF
1376 * This should only be called by the main thread
1377 * 1. Calls ppause
1378 * 2. Sends a message asking for threads to finish
1379 *
1380 */
1381DLLEXPORT int trace_pstop(libtrace_t *libtrace)
1382{
1383        int i;
1384        libtrace_message_t message = {0};
1385        assert(libtrace);
1386        if (!libtrace->started) {
1387                trace_set_err(libtrace,TRACE_ERR_BAD_STATE, "You must call trace_start() before calling trace_pstop()");
1388                return -1;
1389        }
1390
1391        // Ensure all threads have paused and the underlying trace format has
1392        // been closed and all packets associated are cleaned up
1393        trace_ppause(libtrace);
1394
1395        // Now send a message asking the threads to stop
1396        // This will be retrieved before trying to read another packet
1397       
1398        message.code = MESSAGE_DO_STOP;
1399        trace_send_message_to_perpkts(libtrace, &message);
1400        if (trace_has_dedicated_hasher(libtrace))
1401                trace_send_message_to_thread(libtrace, &libtrace->hasher_thread, &message);
1402       
1403        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1404                trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message);
1405        }
1406
1407        // Now release the threads and let them stop
1408        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1409        libtrace->state = STATE_FINSHED;
1410        assert(pthread_cond_broadcast(&libtrace->perpkt_cond) == 0);
1411        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1412        return 0;
1413}
1414
1415/**
1416 * Set the hasher type along with a selected function, if hardware supports
1417 * that generic type of hashing it will be used otherwise the supplied
1418 * hasher function will be used and passed data when called.
1419 *
1420 * @return 0 if successful otherwise -1 on error
1421 */
1422DLLEXPORT int trace_set_hasher(libtrace_t *trace, enum hasher_types type, fn_hasher hasher, void *data) {
1423        int ret = -1;
1424        if (type == HASHER_HARDWARE || (type == HASHER_CUSTOM && !hasher) || (type == HASHER_BALANCE && hasher)) {
1425                return -1;
1426        }
1427
1428        // Save the requirements
1429        trace->hasher_type = type;
1430        if (hasher) {
1431                trace->hasher = hasher;
1432                trace->hasher_data = data;
1433        } else {
1434                trace->hasher = NULL;
1435                // TODO consider how to handle freeing this
1436                trace->hasher_data = NULL;
1437        }
1438
1439        // Try push this to hardware - NOTE hardware could do custom if
1440        // there is a more efficient way to apply it, in this case
1441        // it will simply grab the function out of libtrace_t
1442        if (trace->format->pconfig_input)
1443                ret = trace->format->pconfig_input(trace, TRACE_OPTION_SET_HASHER, &type);
1444
1445        if (ret == -1) {
1446                // We have to deal with this ourself
1447                // This most likely means single threaded reading of the trace
1448                if (!hasher) {
1449                        switch (type)
1450                        {
1451                                case HASHER_CUSTOM:
1452                                case HASHER_BALANCE:
1453                                        return 0;
1454                                case HASHER_BIDIRECTIONAL:
1455                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
1456                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
1457                                        toeplitz_init_config(trace->hasher_data, 1);
1458                                        return 0;
1459                                case HASHER_UNIDIRECTIONAL:
1460                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
1461                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
1462                                        toeplitz_init_config(trace->hasher_data, 0);
1463                                        return 0;
1464                                case HASHER_HARDWARE:
1465                                        return -1;
1466                        }
1467                        return -1;
1468                }
1469        } else {
1470                // The hardware is dealing with this yay
1471                trace->hasher_type = HASHER_HARDWARE;
1472        }
1473
1474        return 0;
1475}
1476
1477// Waits for all threads to finish
1478DLLEXPORT void trace_join(libtrace_t *libtrace) {
1479        int i;
1480
1481        /* Firstly wait for the perpkt threads to finish, since these are
1482         * user controlled */
1483        for (i=0; i< libtrace->perpkt_thread_count; i++) {
1484                //printf("Waiting to join with perpkt #%d\n", i);
1485                assert(pthread_join(libtrace->perpkt_threads[i].tid, NULL) == 0);
1486                //printf("Joined with perpkt #%d\n", i);
1487                // So we must do our best effort to empty the queue - so
1488                // the producer (or any other threads) don't block.
1489                libtrace_packet_t * packet;
1490                // Mark that we are no longer accepting packets
1491                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1492                libtrace->perpkt_threads[i].state = THREAD_FINISHED; // Important we are finished before we empty the buffer
1493                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1494                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
1495                        if (packet) // This could be NULL iff the perpkt finishes early
1496                                trace_destroy_packet(packet);
1497        }
1498
1499        /* Now the hasher */
1500        // XXX signal it to stop
1501        if (trace_has_dedicated_hasher(libtrace)) {
1502                fprintf(stderr, "Waiting to join with the hasher\n");
1503                pthread_join(libtrace->hasher_thread.tid, NULL);
1504                fprintf(stderr, "Joined with with the hasher\n");
1505                libtrace->hasher_thread.state = THREAD_FINISHED;
1506        }
1507
1508        // Now that everything is finished nothing can be touching our
1509        // buffers so clean them up
1510        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1511                // Its possible 1 packet got added by the reducer (or 1 per any other thread) since we cleaned up
1512                // if they lost timeslice before-during a write
1513                libtrace_packet_t * packet;
1514                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
1515                        trace_destroy_packet(packet);
1516                if (libtrace->hasher) {
1517                        assert(libtrace_ringbuffer_is_empty(&libtrace->perpkt_threads[i].rbuffer));
1518                        libtrace_ringbuffer_destroy(&libtrace->perpkt_threads[i].rbuffer);
1519                }
1520                // Cannot destroy vector yet, this happens with trace_destroy
1521        }
1522        libtrace->state = STATE_FINSHED;
1523       
1524        // Wait for the tick (keepalive) thread if its been started
1525        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE && libtrace->keepalive_thread.state == THREAD_RUNNING) {
1526                libtrace_message_t msg = {0};
1527                msg.code = MESSAGE_DO_STOP;
1528                fprintf(stderr, "Waiting to join with the keepalive\n");
1529                trace_send_message_to_thread(libtrace, &libtrace->keepalive_thread, &msg);
1530                pthread_join(libtrace->keepalive_thread.tid, NULL);
1531                fprintf(stderr, "Joined with with the keepalive\n");
1532        }
1533        // Lets mark this as done for now
1534        libtrace->state = STATE_JOINED;
1535}
1536
1537DLLEXPORT int libtrace_thread_get_message_count(libtrace_t * libtrace)
1538{
1539        libtrace_thread_t * t = get_thread_descriptor(libtrace);
1540        assert(t);
1541        return libtrace_message_queue_count(&t->messages);
1542}
1543
1544DLLEXPORT int libtrace_thread_get_message(libtrace_t * libtrace, libtrace_message_t * message)
1545{
1546        libtrace_thread_t * t = get_thread_descriptor(libtrace);
1547        assert(t);
1548        return libtrace_message_queue_get(&t->messages, message);
1549}
1550
1551DLLEXPORT int libtrace_thread_try_get_message(libtrace_t * libtrace, libtrace_message_t * message)
1552{
1553        libtrace_thread_t * t = get_thread_descriptor(libtrace);
1554        assert(t);
1555        return libtrace_message_queue_try_get(&t->messages, message);
1556}
1557
1558/**
1559 * Return backlog indicator
1560 */
1561DLLEXPORT int trace_post_reduce(libtrace_t *libtrace)
1562{
1563        libtrace_message_t message = {0};
1564        message.code = MESSAGE_POST_REDUCE;
1565        message.sender = get_thread_descriptor(libtrace);
1566        return libtrace_message_queue_put(&libtrace->reducer_thread.messages, (void *) &message);
1567}
1568
1569/**
1570 * Return backlog indicator
1571 */
1572DLLEXPORT int trace_send_message_to_reducer(libtrace_t * libtrace, libtrace_message_t * message)
1573{
1574        //printf("Sending message code=%d to reducer\n", message->code);
1575        message->sender = get_thread_descriptor(libtrace);
1576        return libtrace_message_queue_put(&libtrace->reducer_thread.messages, message);
1577}
1578
1579/**
1580 *
1581 */
1582DLLEXPORT int trace_send_message_to_thread(libtrace_t * libtrace, libtrace_thread_t *t, libtrace_message_t * message)
1583{
1584        //printf("Sending message code=%d to reducer\n", message->code);
1585        message->sender = get_thread_descriptor(libtrace);
1586        return libtrace_message_queue_put(&t->messages, message);
1587}
1588
1589DLLEXPORT int trace_send_message_to_perpkts(libtrace_t * libtrace, libtrace_message_t * message)
1590{
1591        int i;
1592        message->sender = get_thread_descriptor(libtrace);
1593        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1594                libtrace_message_queue_put(&libtrace->perpkt_threads[i].messages, message);
1595        }
1596        //printf("Sending message code=%d to reducer\n", message->code);
1597        return 0;
1598}
1599
1600DLLEXPORT void libtrace_result_set_key(libtrace_result_t * result, uint64_t key) {
1601        result->key = key;
1602}
1603DLLEXPORT uint64_t libtrace_result_get_key(libtrace_result_t * result) {
1604        return result->key;
1605}
1606DLLEXPORT void libtrace_result_set_value(libtrace_result_t * result, void * value) {
1607        result->value = value;
1608}
1609DLLEXPORT void* libtrace_result_get_value(libtrace_result_t * result) {
1610        return result->value;
1611}
1612DLLEXPORT void libtrace_result_set_key_value(libtrace_result_t * result, uint64_t key, void * value) {
1613        result->key = key;
1614        result->value = value;
1615}
1616DLLEXPORT void trace_destroy_result(libtrace_result_t ** result) {
1617        free(*result);
1618        result = NULL;
1619        // TODO automatically back with a free list!!
1620}
1621
1622DLLEXPORT void * trace_get_global(libtrace_t *trace)
1623{
1624        return trace->global_blob;
1625}
1626
1627DLLEXPORT void * trace_set_global(libtrace_t *trace, void * data)
1628{
1629        if (trace->global_blob && trace->global_blob != data) {
1630                void * ret = trace->global_blob;
1631                trace->global_blob = data;
1632                return ret;
1633        } else {
1634                trace->global_blob = data;
1635                return NULL;
1636        }
1637}
1638
1639DLLEXPORT void * trace_get_tls(libtrace_thread_t *t)
1640{
1641        return t->user_data;
1642}
1643
1644DLLEXPORT void * trace_set_tls(libtrace_thread_t *t, void * data)
1645{
1646        if(t->user_data && t->user_data != data) {
1647                void *ret = t->user_data;
1648                t->user_data = data;
1649                return ret;
1650        } else {
1651                t->user_data = data;
1652                return NULL;
1653        }
1654}
1655
1656/**
1657 * Publish to the reduce queue, return
1658 */
1659DLLEXPORT void trace_publish_result(libtrace_t *libtrace, uint64_t key, void * value) {
1660        libtrace_result_t res;
1661        res.is_packet = 0;
1662        // Who am I???
1663        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
1664        libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
1665        // Now put it into my table
1666        UNUSED static __thread int count = 0;
1667
1668
1669        libtrace_result_set_key_value(&res, key, value);
1670        /*
1671        if (count == 1)
1672                printf("My vector size is %d\n", libtrace_vector_get_size(&t->vector));
1673        count = (count+1) %1000;
1674        libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :)
1675        */
1676        /*if (count == 1)
1677                printf("My vector size is %d\n", libtrace_deque_get_size(&t->deque));
1678        count = (count+1)%1000;*/
1679        if (libtrace->reducer_flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED)) {
1680                if (libtrace_deque_get_size(&t->deque) >= 800) {
1681                        trace_post_reduce(libtrace);
1682                }
1683                //while (libtrace_deque_get_size(&t->deque) >= 1000)
1684                //      sched_yield();
1685                libtrace_deque_push_back(&t->deque, &res); // Automatically locking for us :)
1686        } else {
1687                //while (libtrace_vector_get_size(&t->vector) >= 1000)
1688                //      sched_yield();
1689
1690                if (libtrace_vector_get_size(&t->vector) >= 800) {
1691                        trace_post_reduce(libtrace);
1692                }
1693                libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :)
1694        }
1695}
1696
1697DLLEXPORT void trace_publish_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
1698        libtrace_result_t res;
1699        // Who am I???
1700        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
1701        libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
1702        // Now put it into my table
1703        UNUSED static __thread int count = 0;
1704
1705        res.is_packet = 1;
1706        libtrace_result_set_key_value(&res, trace_packet_get_order(packet), packet);
1707        /*
1708        if (count == 1)
1709                printf("My vector size is %d\n", libtrace_vector_get_size(&t->vector));
1710        count = (count+1) %1000;
1711        libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :)
1712        */
1713        /*if (count == 1)
1714                printf("My vector size is %d\n", libtrace_deque_get_size(&t->deque));
1715        count = (count+1)%1000;*/
1716        if (libtrace->reducer_flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED)) {
1717                if (libtrace_deque_get_size(&t->deque) >= 800) {
1718                        trace_post_reduce(libtrace);
1719                }
1720                //while (libtrace_deque_get_size(&t->deque) >= 1000)
1721                //      sched_yield();
1722                libtrace_deque_push_back(&t->deque, &res); // Automatically locking for us :)
1723        } else {
1724                //while (libtrace_vector_get_size(&t->vector) >= 1000)
1725                //      sched_yield();
1726
1727                if (libtrace_vector_get_size(&t->vector) >= 800) {
1728                        trace_post_reduce(libtrace);
1729                }
1730                libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :)
1731        }
1732}
1733
1734
1735static int compareres(const void* p1, const void* p2)
1736{
1737        if (libtrace_result_get_key((libtrace_result_t *) p1) < libtrace_result_get_key((libtrace_result_t *) p2))
1738                return -1;
1739        if (libtrace_result_get_key((libtrace_result_t *) p1) == libtrace_result_get_key((libtrace_result_t *) p2))
1740                return 0;
1741        else
1742                return 1;
1743}
1744
1745DLLEXPORT int trace_get_results(libtrace_t *libtrace, libtrace_vector_t * results) {
1746        int i;
1747        int flags = libtrace->reducer_flags; // Hint these aren't a changing
1748
1749        libtrace_vector_empty(results);
1750
1751        /* Here we assume queues are in order ascending order and they want
1752         * the smallest result first. If they are not in order the results
1753         * may not be in order.
1754         */
1755        if (flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED)) {
1756                int live_count = 0;
1757                bool live[libtrace->perpkt_thread_count]; // Set if a trace is alive
1758                uint64_t key[libtrace->perpkt_thread_count]; // Cached keys
1759                uint64_t min_key = UINT64_MAX; // XXX use max int here stdlimit.h?
1760                int min_queue = -1;
1761
1762                /* Loop through check all are alive (have data) and find the smallest */
1763                for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1764                        libtrace_queue_t *v = &libtrace->perpkt_threads[i].deque;
1765                        if (libtrace_deque_get_size(v) != 0) {
1766                                libtrace_result_t r;
1767                                libtrace_deque_peek_front(v, (void *) &r);
1768                                live_count++;
1769                                live[i] = 1;
1770                                key[i] = libtrace_result_get_key(&r);
1771                                if (i==0 || min_key > key[i]) {
1772                                        min_key = key[i];
1773                                        min_queue = i;
1774                                }
1775                        } else {
1776                                live[i] = 0;
1777                        }
1778                }
1779
1780                /* Now remove the smallest and loop - special case if all threads have joined we always flush whats left */
1781                while ((live_count == libtrace->perpkt_thread_count) || (live_count &&
1782                                ((flags & REDUCE_SEQUENTIAL && min_key == libtrace->expected_key) ||
1783                                libtrace->state == STATE_JOINED))) {
1784                        /* Get the minimum queue and then do stuff */
1785                        libtrace_result_t r;
1786
1787                        assert (libtrace_deque_pop_front(&libtrace->perpkt_threads[min_queue].deque, (void *) &r) == 1);
1788                        libtrace_vector_push_back(results, &r);
1789
1790                        // We expect the key we read +1 now
1791                        libtrace->expected_key = key[min_queue] + 1;
1792
1793                        // Now update the one we just removed
1794                        if (libtrace_deque_get_size(&libtrace->perpkt_threads[min_queue].deque) )
1795                        {
1796                                libtrace_deque_peek_front(&libtrace->perpkt_threads[min_queue].deque, (void *) &r);
1797                                key[min_queue] = libtrace_result_get_key(&r);
1798                                if (key[min_queue] <= min_key) {
1799                                        // We are still the smallest, might be out of order though :(
1800                                        min_key = key[min_queue];
1801                                } else {
1802                                        min_key = key[min_queue]; // Update our minimum
1803                                        // Check all find the smallest again - all are alive
1804                                        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1805                                                if (live[i] && min_key > key[i]) {
1806                                                        min_key = key[i];
1807                                                        min_queue = i;
1808                                                }
1809                                        }
1810                                }
1811                        } else {
1812                                live[min_queue] = 0;
1813                                live_count--;
1814                                min_key = UINT64_MAX; // Update our minimum
1815                                // Check all find the smallest again - all are alive
1816                                for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1817                                        // Still not 100% TODO (what if order is wrong or not increasing)
1818                                        if (live[i] && min_key >= key[i]) {
1819                                                min_key = key[i];
1820                                                min_queue = i;
1821                                        }
1822                                }
1823                        }
1824                }
1825        } else { // Queues are not in order - return all results in the queue
1826                for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1827                        libtrace_vector_append(results, &libtrace->perpkt_threads[i].vector);
1828                }
1829                if (flags & REDUCE_SORT) {
1830                        qsort(results->elements, results->size, results->element_size, &compareres);
1831                }
1832        }
1833        return libtrace_vector_get_size(results);
1834}
1835
1836DLLEXPORT uint64_t trace_packet_get_order(libtrace_packet_t * packet) {
1837        return packet->order;
1838}
1839
1840DLLEXPORT uint64_t trace_packet_get_hash(libtrace_packet_t * packet) {
1841        return packet->hash;
1842}
1843
1844DLLEXPORT void trace_packet_set_order(libtrace_packet_t * packet, uint64_t order) {
1845        packet->order = order;
1846}
1847
1848DLLEXPORT void trace_packet_set_hash(libtrace_packet_t * packet, uint64_t hash) {
1849        packet->hash = hash;
1850}
1851
1852DLLEXPORT int trace_finished(libtrace_t * libtrace) {
1853        int i;
1854        int b = 0;
1855        // TODO I don't like using this so much
1856        //assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
1857        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1858                if (libtrace->perpkt_threads[i].state == THREAD_RUNNING)
1859                        b++;
1860        }
1861        //assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
1862        return !b;
1863}
1864
1865DLLEXPORT int trace_parallel_config(libtrace_t *libtrace, trace_parallel_option_t option, void *value)
1866{
1867        UNUSED int ret = -1;
1868        switch (option) {
1869                case TRACE_OPTION_TICK_INTERVAL:
1870                        libtrace->tick_interval = *((int *) value);
1871                        return 1;
1872                case TRACE_OPTION_SET_HASHER:
1873                        return trace_set_hasher(libtrace, (enum hasher_types) *((int *) value), NULL, NULL);
1874                case TRACE_OPTION_SET_PERPKT_BUFFER_SIZE:
1875                        libtrace->perpkt_buffer_size = *((int *) value);
1876                        return 1;
1877                case TRACE_OPTION_SET_PACKET_FREELIST_SIZE:
1878                        libtrace->packet_freelist_size = *((int *) value);
1879                        return 1;
1880                case TRACE_OPTION_SET_PERPKT_THREAD_COUNT:
1881                        libtrace->perpkt_thread_count = *((int *) value);
1882                        return 1;
1883                case TRACE_DROP_OUT_OF_ORDER:
1884                        if (*((int *) value))
1885                                libtrace->reducer_flags |= REDUCE_DROP_OOO;
1886                        else
1887                                libtrace->reducer_flags &= ~REDUCE_DROP_OOO;
1888                        return 1;
1889                case TRACE_OPTION_SEQUENTIAL:
1890                        if (*((int *) value))
1891                                libtrace->reducer_flags |= REDUCE_SEQUENTIAL;
1892                        else
1893                                libtrace->reducer_flags &= ~REDUCE_SEQUENTIAL;
1894                        return 1;
1895                case TRACE_OPTION_ORDERED:
1896                        if (*((int *) value))
1897                                libtrace->reducer_flags |= REDUCE_ORDERED;
1898                        else
1899                                libtrace->reducer_flags &= ~REDUCE_ORDERED;
1900                        return 1;
1901                case TRACE_OPTION_USE_DEDICATED_HASHER:
1902                        if (*((int *) value))
1903                                libtrace->hasher_thread.type = THREAD_HASHER;
1904                        else
1905                                libtrace->hasher_thread.type = THREAD_EMPTY;
1906                        return 1;
1907                case TRACE_OPTION_USE_SLIDING_WINDOW_BUFFER:
1908                        if (*((int *) value))
1909                                libtrace->reducer_flags |= PERPKT_USE_SLIDING_WINDOW;
1910                        else
1911                                libtrace->reducer_flags &= ~PERPKT_USE_SLIDING_WINDOW;
1912                        return 1;
1913                case TRACE_OPTION_TRACETIME:
1914                        if(*((int *) value))
1915                                libtrace->tracetime = 1;
1916                        else
1917                                libtrace->tracetime = 0;
1918                        return 0;
1919        }
1920        return 0;
1921}
1922
1923DLLEXPORT libtrace_packet_t* trace_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet) {
1924        libtrace_packet_t* result;
1925        if (!libtrace_ringbuffer_try_sread_bl(&libtrace->packet_freelist, (void **) &result))
1926                result = trace_create_packet();
1927        assert(result);
1928        swap_packets(result, packet); // Move the current packet into our copy
1929        return result;
1930}
1931
1932DLLEXPORT void trace_free_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet) {
1933        // Try write back the packet
1934        assert(packet);
1935        // Always release any resources this might be holding such as a slot in a ringbuffer
1936        trace_fin_packet(packet);
1937        if (!libtrace_ringbuffer_try_swrite_bl(&libtrace->packet_freelist, packet)) {
1938                /* We couldn't, oh well lets just destroy it - XXX consider non managed formats i.e. rings buffers loosing packets and jamming up :( */
1939                //assert(1 == 90);
1940                trace_destroy_packet(packet);
1941        }
1942}
1943
1944DLLEXPORT libtrace_info_t *trace_get_information(libtrace_t * libtrace) {
1945        if (libtrace->format)
1946                return &libtrace->format->info;
1947        else
1948                return NULL;
1949}
Note: See TracBrowser for help on using the repository browser.