source: lib/trace_parallel.c @ fc85f33

develop
Last change on this file since fc85f33 was fc85f33, checked in by Shane Alcock <salcock@…>, 21 months ago

Merge branch 'master' of git://github.com/markzz/libtrace into markzz-master

  • Property mode set to 100644
File size: 90.5 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27#define _GNU_SOURCE
28#include "common.h"
29#include "config.h"
30#include <assert.h>
31#include <errno.h>
32#include <fcntl.h>
33#include <stdio.h>
34#include <stdlib.h>
35#include <string.h>
36#include <sys/stat.h>
37#include <sys/types.h>
38#ifndef WIN32
39#include <sys/socket.h>
40#endif
41#include <stdarg.h>
42#include <sys/param.h>
43
44#ifdef HAVE_LIMITS_H
45#  include <limits.h>
46#endif
47
48#ifdef HAVE_SYS_LIMITS_H
49#  include <sys/limits.h>
50#endif
51
52#ifdef HAVE_NET_IF_ARP_H
53#  include <net/if_arp.h>
54#endif
55
56#ifdef HAVE_NET_IF_H
57#  include <net/if.h>
58#endif
59
60#ifdef HAVE_NETINET_IN_H
61#  include <netinet/in.h>
62#endif
63
64#ifdef HAVE_NET_ETHERNET_H
65#  include <net/ethernet.h>
66#endif
67
68#ifdef HAVE_NETINET_IF_ETHER_H
69#  include <netinet/if_ether.h>
70#endif
71
72#include <time.h>
73#ifdef WIN32
74#include <sys/timeb.h>
75#endif
76
77#include "libtrace.h"
78#include "libtrace_parallel.h"
79
80#ifdef HAVE_NET_BPF_H
81#  include <net/bpf.h>
82#else
83#  ifdef HAVE_PCAP_BPF_H
84#    include <pcap-bpf.h>
85#  endif
86#endif
87
88
89#include "libtrace_int.h"
90#include "format_helper.h"
91#include "rt_protocol.h"
92#include "hash_toeplitz.h"
93
94#include <pthread.h>
95#include <signal.h>
96#include <unistd.h>
97#include <ctype.h>
98
99static inline int delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t);
100extern int libtrace_parallel;
101
102struct mem_stats {
103        struct memfail {
104           uint64_t cache_hit;
105           uint64_t ring_hit;
106           uint64_t miss;
107           uint64_t recycled;
108        } readbulk, read, write, writebulk;
109};
110
111
112#ifdef ENABLE_MEM_STATS
113// Grrr gcc wants this spelt out
114__thread struct mem_stats mem_hits = {{0},{0},{0},{0}};
115
116
117static void print_memory_stats() {
118        uint64_t total;
119#if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__)
120        char t_name[50];
121        pthread_getname_np(pthread_self(), t_name, sizeof(t_name));
122
123        fprintf(stderr, "Thread ID#%d - %s\n", (int) pthread_self(), t_name);
124#else
125        fprintf(stderr, "Thread ID#%d\n", (int) pthread_self());
126#endif
127
128        total = mem_hits.read.cache_hit + mem_hits.read.ring_hit + mem_hits.read.miss;
129        if (total) {
130                fprintf(stderr, "\tRead:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
131                                mem_hits.read.cache_hit, mem_hits.read.ring_hit, mem_hits.read.miss, mem_hits.read.recycled);
132                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
133                                total, (double) mem_hits.read.miss / (double) total * 100.0);
134        }
135
136        total = mem_hits.readbulk.cache_hit + mem_hits.readbulk.ring_hit + mem_hits.readbulk.miss;
137        if (total) {
138                fprintf(stderr, "\tReadbulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
139                                mem_hits.readbulk.cache_hit, mem_hits.readbulk.ring_hit, mem_hits.readbulk.miss, mem_hits.readbulk.recycled);
140
141
142                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
143                                total, (double) mem_hits.readbulk.miss / (double) total * 100.0);
144        }
145
146        total = mem_hits.write.cache_hit + mem_hits.write.ring_hit + mem_hits.write.miss;
147        if (total) {
148                fprintf(stderr, "\tWrite:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
149                                mem_hits.write.cache_hit, mem_hits.write.ring_hit, mem_hits.write.miss, mem_hits.write.recycled);
150
151                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
152                                total, (double) mem_hits.write.miss / (double) total * 100.0);
153        }
154
155        total = mem_hits.writebulk.cache_hit + mem_hits.writebulk.ring_hit + mem_hits.writebulk.miss;
156        if (total) {
157                fprintf(stderr, "\tWritebulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
158                                mem_hits.writebulk.cache_hit, mem_hits.writebulk.ring_hit, mem_hits.writebulk.miss, mem_hits.writebulk.recycled);
159
160                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
161                                total, (double) mem_hits.writebulk.miss / (double) total * 100.0);
162        }
163}
164#else
165static void print_memory_stats() {}
166#endif
167
168static const libtrace_generic_t gen_zero = {0};
169
170/* This should optimise away the switch to nothing in the explict cases */
171inline void send_message(libtrace_t *trace, libtrace_thread_t *thread,
172                const enum libtrace_messages type,
173                libtrace_generic_t data, libtrace_thread_t *sender) {
174
175        fn_cb_dataless fn = NULL;
176        enum libtrace_messages switchtype;
177        libtrace_callback_set_t *cbs = NULL;
178
179        if (thread == &trace->reporter_thread) {
180                cbs = trace->reporter_cbs;
181        } else {
182                cbs = trace->perpkt_cbs;
183        }
184
185        if (cbs == NULL)
186                return;
187
188        if (type >= MESSAGE_USER)
189                switchtype = MESSAGE_USER;
190        else
191                switchtype = (enum libtrace_messages) type;
192
193        switch (switchtype) {
194        case MESSAGE_STARTING:
195                if (cbs->message_starting)
196                        thread->user_data = (*cbs->message_starting)(trace,
197                                        thread, trace->global_blob);
198                return;
199        case MESSAGE_FIRST_PACKET:
200                if (cbs->message_first_packet)
201                                (*cbs->message_first_packet)(trace, thread,
202                                trace->global_blob, thread->user_data,
203                                sender);
204                return;
205        case MESSAGE_TICK_COUNT:
206                if (cbs->message_tick_count)
207                        (*cbs->message_tick_count)(trace, thread,
208                                        trace->global_blob, thread->user_data,
209                                        data.uint64);
210                return;
211        case MESSAGE_TICK_INTERVAL:
212                if (cbs->message_tick_interval)
213                        (*cbs->message_tick_interval)(trace, thread,
214                                        trace->global_blob, thread->user_data,
215                                        data.uint64);
216                return;
217        case MESSAGE_STOPPING:
218                fn = cbs->message_stopping;
219                break;
220        case MESSAGE_RESUMING:
221                fn = cbs->message_resuming;
222                break;
223        case MESSAGE_PAUSING:
224                fn = cbs->message_pausing;
225                break;
226        case MESSAGE_USER:
227                if (cbs->message_user)
228                        (*cbs->message_user)(trace, thread, trace->global_blob,
229                                        thread->user_data, type, data, sender);
230                return;
231        case MESSAGE_RESULT:
232                if (cbs->message_result)
233                        (*cbs->message_result)(trace, thread,
234                                        trace->global_blob, thread->user_data,
235                                        data.res);
236                return;
237
238        /* These should be unused */
239        case MESSAGE_DO_PAUSE:
240        case MESSAGE_DO_STOP:
241        case MESSAGE_POST_REPORTER:
242        case MESSAGE_PACKET:
243                return;
244        case MESSAGE_META_PACKET:
245                return;
246        }
247        if (fn)
248                (*fn)(trace, thread, trace->global_blob, thread->user_data);
249}
250
251DLLEXPORT void trace_destroy_callback_set(libtrace_callback_set_t *cbset) {
252        free(cbset);
253}
254
255DLLEXPORT libtrace_callback_set_t *trace_create_callback_set() {
256        libtrace_callback_set_t *cbset;
257
258        cbset = (libtrace_callback_set_t *)malloc(sizeof(libtrace_callback_set_t));
259        memset(cbset, 0, sizeof(libtrace_callback_set_t));
260        return cbset;
261}
262
263/*
264 * This can be used once the hasher thread has been started and internally after
265 * verify_configuration.
266 */
267DLLEXPORT bool trace_has_dedicated_hasher(libtrace_t * libtrace)
268{
269        return libtrace->hasher_thread.type == THREAD_HASHER;
270}
271
272DLLEXPORT bool trace_has_reporter(libtrace_t * libtrace)
273{
274        if (!(libtrace->state != STATE_NEW)) {
275                trace_set_err(libtrace, TRACE_ERR_BAD_STATE, "Cannot check reporter for the current state in trace_has_reporter()");
276                return false;
277        }
278        return libtrace->reporter_thread.type == THREAD_REPORTER && libtrace->reporter_cbs;
279}
280
281/**
282 * When running the number of perpkt threads in use.
283 * TODO what if the trace is not running yet, or has finished??
284 *
285 * @brief libtrace_perpkt_thread_nb
286 * @param t The trace
287 * @return
288 */
289DLLEXPORT int trace_get_perpkt_threads(libtrace_t * t) {
290        return t->perpkt_thread_count;
291}
292
293DLLEXPORT int trace_get_perpkt_thread_id(libtrace_thread_t *thread) {
294        return thread->perpkt_num;
295}
296
297/**
298 * Changes the overall traces state and signals the condition.
299 *
300 * @param trace A pointer to the trace
301 * @param new_state The new state of the trace
302 * @param need_lock Set to true if libtrace_lock is not held, otherwise
303 *        false in the case the lock is currently held by this thread.
304 */
305static inline void libtrace_change_state(libtrace_t *trace,
306        const enum trace_state new_state, const bool need_lock)
307{
308        UNUSED enum trace_state prev_state;
309        if (need_lock)
310                pthread_mutex_lock(&trace->libtrace_lock);
311        prev_state = trace->state;
312        trace->state = new_state;
313
314        if (trace->config.debug_state)
315                fprintf(stderr, "Trace(%s) state changed from %s to %s\n",
316                        trace->uridata, get_trace_state_name(prev_state),
317                        get_trace_state_name(trace->state));
318
319        pthread_cond_broadcast(&trace->perpkt_cond);
320        if (need_lock)
321                pthread_mutex_unlock(&trace->libtrace_lock);
322}
323
324/**
325 * Changes a thread's state and broadcasts the condition variable. This
326 * should always be done when the lock is held.
327 *
328 * Additionally for perpkt threads the state counts are updated.
329 *
330 * @param trace A pointer to the trace
331 * @param t A pointer to the thread to modify
332 * @param new_state The new state of the thread
333 * @param need_lock Set to true if libtrace_lock is not held, otherwise
334 *        false in the case the lock is currently held by this thread.
335 */
336static inline void thread_change_state(libtrace_t *trace, libtrace_thread_t *t,
337        const enum thread_states new_state, const bool need_lock)
338{
339        enum thread_states prev_state;
340        if (need_lock)
341                pthread_mutex_lock(&trace->libtrace_lock);
342        prev_state = t->state;
343        t->state = new_state;
344        if (t->type == THREAD_PERPKT) {
345                --trace->perpkt_thread_states[prev_state];
346                ++trace->perpkt_thread_states[new_state];
347        }
348
349        if (trace->config.debug_state)
350                fprintf(stderr, "Thread %d state changed from %d to %d\n",
351                        (int) t->tid, prev_state, t->state);
352
353        if (trace->perpkt_thread_states[THREAD_FINISHED] == trace->perpkt_thread_count) {
354                /* Make sure we save our final stats in case someone wants
355                 * them at the end of their program.
356                 */
357
358                trace_get_statistics(trace, NULL);
359                libtrace_change_state(trace, STATE_FINISHED, false);
360        }
361
362        pthread_cond_broadcast(&trace->perpkt_cond);
363        if (need_lock)
364                pthread_mutex_unlock(&trace->libtrace_lock);
365}
366
367/**
368 * This is valid once a trace is initialised
369 *
370 * @return True if the format supports parallel threads.
371 */
372static inline bool trace_supports_parallel(libtrace_t *trace)
373{
374        if (!trace) {
375                fprintf(stderr, "NULL trace passed into trace_supports_parallel()\n");
376                return false;
377        }
378        if (!trace->format) {
379                trace_set_err(trace, TRACE_ERR_BAD_FORMAT,
380                        "NULL capture format associated with trace in trace_supports_parallel()");
381                return false;
382        }
383        if (trace->format->pstart_input)
384                return true;
385        else
386                return false;
387}
388
389void libtrace_zero_thread(libtrace_thread_t * t) {
390        t->accepted_packets = 0;
391        t->filtered_packets = 0;
392        t->recorded_first = false;
393        t->tracetime_offset_usec = 0;
394        t->user_data = 0;
395        t->format_data = 0;
396        libtrace_zero_ringbuffer(&t->rbuffer);
397        t->trace = NULL;
398        t->ret = NULL;
399        t->type = THREAD_EMPTY;
400        t->perpkt_num = -1;
401}
402
403// Ints are aligned int is atomic so safe to read and write at same time
404// However write must be locked, read doesn't (We never try read before written to table)
405libtrace_thread_t * get_thread_table(libtrace_t *libtrace) {
406        int i = 0;
407        pthread_t tid = pthread_self();
408
409        for (;i<libtrace->perpkt_thread_count ;++i) {
410                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
411                        return &libtrace->perpkt_threads[i];
412        }
413        return NULL;
414}
415
416static libtrace_thread_t * get_thread_descriptor(libtrace_t *libtrace) {
417        libtrace_thread_t *ret;
418        if (!(ret = get_thread_table(libtrace))) {
419                pthread_t tid = pthread_self();
420                // Check if we are reporter or something else
421                if (libtrace->hasher_thread.type == THREAD_REPORTER &&
422                                pthread_equal(tid, libtrace->reporter_thread.tid))
423                        ret = &libtrace->reporter_thread;
424                else if (libtrace->hasher_thread.type == THREAD_HASHER &&
425                         pthread_equal(tid, libtrace->hasher_thread.tid))
426                        ret = &libtrace->hasher_thread;
427                else
428                        ret = NULL;
429        }
430        return ret;
431}
432
433DLLEXPORT void libtrace_make_packet_safe(libtrace_packet_t *pkt) {
434        // Duplicate the packet in standard malloc'd memory and free the
435        // original, This is a 1:1 exchange so the ocache count remains unchanged.
436        if (pkt->buf_control != TRACE_CTRL_PACKET) {
437                libtrace_packet_t *dup;
438                dup = trace_copy_packet(pkt);
439                /* Release the external buffer */
440                trace_fin_packet(pkt);
441                /* Copy the duplicated packet over the existing */
442                memcpy(pkt, dup, sizeof(libtrace_packet_t));
443                /* Free the packet structure */
444                free(dup);
445        }
446}
447
448/**
449 * Makes a libtrace_result_t safe, used when pausing a trace.
450 * This will call libtrace_make_packet_safe if the result is
451 * a packet.
452 */
453DLLEXPORT void libtrace_make_result_safe(libtrace_result_t *res) {
454        if (res->type == RESULT_PACKET) {
455                libtrace_make_packet_safe(res->value.pkt);
456        }
457}
458
459/**
460 * Holds threads in a paused state, until released by broadcasting
461 * the condition mutex.
462 */
463static void trace_thread_pause(libtrace_t *trace, libtrace_thread_t *t) {
464        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
465        thread_change_state(trace, t, THREAD_PAUSED, false);
466        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
467                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
468        }
469        thread_change_state(trace, t, THREAD_RUNNING, false);
470        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
471}
472
473/**
474 * Sends a packet to the user, expects either a valid packet or a TICK packet.
475 *
476 * @param trace The trace
477 * @param t The current thread
478 * @param packet A pointer to the packet storage, which may be set to null upon
479 *               return, or a packet to be finished.
480 * @param tracetime If true packets are delayed to match with tracetime
481 * @return 0 is successful, otherwise if playing back in tracetime
482 *         READ_MESSAGE(-2) can be returned in which case the packet is not sent.
483 *
484 * @note READ_MESSAGE will only be returned if tracetime is true.
485 */
486static inline int dispatch_packet(libtrace_t *trace,
487                                  libtrace_thread_t *t,
488                                  libtrace_packet_t **packet,
489                                  bool tracetime) {
490
491        if ((*packet)->error > 0) {
492                if (tracetime) {
493                        if (delay_tracetime(trace, packet[0], t) == READ_MESSAGE)
494                                return READ_MESSAGE;
495                }
496                if (!IS_LIBTRACE_META_PACKET((*packet))) {
497                        t->accepted_packets++;
498                }
499
500                /* If packet is meta call the meta callback */
501                if (IS_LIBTRACE_META_PACKET((*packet))) {
502                        /* Pass to meta callback if defined else pass to packet callback */
503                        if (trace->perpkt_cbs->message_meta_packet) {
504                                *packet = (*trace->perpkt_cbs->message_meta_packet)(trace, t,
505                                        trace->global_blob, t->user_data, *packet);
506                        } else if (trace->perpkt_cbs->message_packet) {
507                                *packet = (*trace->perpkt_cbs->message_packet)(trace, t,
508                                        trace->global_blob, t->user_data, *packet);
509                        }
510                } else {
511                        if (trace->perpkt_cbs->message_packet) {
512                                *packet = (*trace->perpkt_cbs->message_packet)(trace, t,
513                                        trace->global_blob, t->user_data, *packet);
514                        }
515                }
516                trace_fin_packet(*packet);
517        } else {
518                if ((*packet)->error != READ_TICK) {
519                        trace_set_err(trace, TRACE_ERR_BAD_STATE,
520                                "dispatch_packet() called with invalid 'packet'");
521                        return -1;
522                }
523                libtrace_generic_t data = {.uint64 = trace_packet_get_order(*packet)};
524                send_message(trace, t, MESSAGE_TICK_COUNT, data, t);
525        }
526        return 0;
527}
528
529/**
530 * Sends a batch of packets to the user, expects either a valid packet or a
531 * TICK packet.
532 *
533 * @param trace The trace
534 * @param t The current thread
535 * @param packets [in,out] An array of packets, these may be null upon return
536 * @param nb_packets The total number of packets in the list
537 * @param empty [in,out] A pointer to an integer storing the first empty slot,
538 * upon return this is updated
539 * @param offset [in,out] The offset into the array, upon return this is updated
540 * @param tracetime If true packets are delayed to match with tracetime
541 * @return 0 is successful, otherwise if playing back in tracetime
542 *         READ_MESSAGE(-2) can be returned in which case the packet is not sent.
543 *
544 * @note READ_MESSAGE will only be returned if tracetime is true.
545 */
546static inline int dispatch_packets(libtrace_t *trace,
547                                  libtrace_thread_t *t,
548                                  libtrace_packet_t *packets[],
549                                  int nb_packets, int *empty, int *offset,
550                                  bool tracetime) {
551        for (;*offset < nb_packets; ++*offset) {
552                int ret;
553                ret = dispatch_packet(trace, t, &packets[*offset], tracetime);
554                if (ret == 0) {
555                        /* Move full slots to front as we go */
556                        if (packets[*offset]) {
557                                if (*empty != *offset) {
558                                        packets[*empty] = packets[*offset];
559                                        packets[*offset] = NULL;
560                                }
561                                ++*empty;
562                        }
563                } else {
564                        /* Break early */
565                        if (ret != READ_MESSAGE) {
566                                trace_set_err(trace, TRACE_ERR_UNKNOWN_OPTION,
567                                        "dispatch_packets() called with at least one invalid packet");
568                                return -1;
569                        }
570                        return READ_MESSAGE;
571                }
572        }
573
574        return 0;
575}
576
577/**
578 * Pauses a per packet thread, messages will not be processed when the thread
579 * is paused.
580 *
581 * This process involves reading packets if a hasher thread is used. As such
582 * this function can fail to pause due to errors when reading in which case
583 * the thread should be stopped instead.
584 *
585 *
586 * @brief trace_perpkt_thread_pause
587 * @return READ_ERROR(-1) or READ_EOF(0) or 1 if successfull
588 */
589static int trace_perpkt_thread_pause(libtrace_t *trace, libtrace_thread_t *t,
590                                     libtrace_packet_t *packets[],
591                                     int nb_packets, int *empty, int *offset) {
592        libtrace_packet_t * packet = NULL;
593
594        /* Let the user thread know we are going to pause */
595        send_message(trace, t, MESSAGE_PAUSING, gen_zero, t);
596
597        /* Send through any remaining packets (or messages) without delay */
598
599        /* First send those packets already read, as fast as possible
600         * This should never fail or check for messages etc. */
601        ASSERT_RET(dispatch_packets(trace, t, packets, nb_packets, empty,
602                                    offset, false), == 0);
603
604        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
605        /* If a hasher thread is running, empty input queues so we don't lose data */
606        if (trace_has_dedicated_hasher(trace)) {
607                // The hasher has stopped by this point, so the queue shouldn't be filling
608                while(!libtrace_ringbuffer_is_empty(&t->rbuffer) || t->format_data) {
609                        int ret = trace->pread(trace, t, &packet, 1);
610                        if (ret == 1) {
611                                if (packet->error > 0) {
612                                        store_first_packet(trace, packet, t);
613                                }
614                                ASSERT_RET(dispatch_packet(trace, t, &packet, false), == 0);
615                                if (packet == NULL)
616                                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
617                        } else if (ret != READ_MESSAGE) {
618                                /* Ignore messages we pick these up next loop */
619                                if (!(ret == READ_EOF || ret == READ_ERROR)) {
620                                        trace_set_err(trace, TRACE_ERR_PAUSE_PTHREAD,
621                                                "Error pausing processing thread in trace_perpkt_thread_pause()");
622                                        return -1;
623                                }
624                                /* Verify no packets are remaining */
625                                /* TODO refactor this sanity check out!! */
626                                while (!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
627                                        ASSERT_RET(trace->pread(trace, t, &packet, 1), <= 0);
628                                        // No packets after this should have any data in them
629                                        if (packet->error > 0) {
630                                                trace_set_err(trace, TRACE_ERR_BAD_PACKET, "Bogus data in "
631                                                        "libtrace ring buffer after pausing perpkt thread");
632                                                return -1;
633                                        }
634                                }
635                                libtrace_ocache_free(&trace->packet_freelist, (void **) &packet, 1, 1);
636                                return -1;
637                        }
638                }
639        }
640        libtrace_ocache_free(&trace->packet_freelist, (void **) &packet, 1, 1);
641
642        /* Now we do the actual pause, this returns when we resumed */
643        trace_thread_pause(trace, t);
644        send_message(trace, t, MESSAGE_RESUMING, gen_zero, t);
645        return 1;
646}
647
648/**
649 * The is the entry point for our packet processing threads.
650 */
651static void* perpkt_threads_entry(void *data) {
652        libtrace_t *trace = (libtrace_t *)data;
653        libtrace_thread_t *t;
654        libtrace_message_t message = {0, {.uint64=0}, NULL};
655        libtrace_packet_t *packets[trace->config.burst_size];
656        size_t i;
657        //int ret;
658        /* The current reading position into the packets */
659        int offset = 0;
660        /* The number of packets last read */
661        int nb_packets = 0;
662        /* The offset to the first NULL packet upto offset */
663        int empty = 0;
664        int j;
665
666        /* Wait until trace_pstart has been completed */
667        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
668        t = get_thread_table(trace);
669        if (!t) {
670                trace_set_err(trace, TRACE_ERR_THREAD, "Unable to get thread table in perpkt_threads_entry()");
671                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
672                pthread_exit(NULL);
673        }
674        if (trace->state == STATE_ERROR) {
675                thread_change_state(trace, t, THREAD_FINISHED, false);
676                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
677                pthread_exit(NULL);
678        }
679        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
680
681        if (trace->format->pregister_thread) {
682                if (trace->format->pregister_thread(trace, t, 
683                                trace_is_parallel(trace)) < 0) {
684                        thread_change_state(trace, t, THREAD_FINISHED, false);
685                        pthread_exit(NULL);
686                }
687        }
688
689        /* Fill our buffer with empty packets */
690        memset(&packets, 0, sizeof(void*) * trace->config.burst_size);
691        libtrace_ocache_alloc(&trace->packet_freelist, (void **) packets,
692                              trace->config.burst_size,
693                              trace->config.burst_size);
694
695        /* ~~~~~~~~~~~ Setup complete now we loop ~~~~~~~~~~~~~~~ */
696
697        /* Let the per_packet function know we have started */
698        send_message(trace, t, MESSAGE_STARTING, gen_zero, t);
699        send_message(trace, t, MESSAGE_RESUMING, gen_zero, t);
700
701        for (;;) {
702
703                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
704                        int ret;
705                        switch (message.code) {
706                                case MESSAGE_DO_PAUSE: // This is internal
707                                        ret = trace_perpkt_thread_pause(trace, t,
708                                              packets, nb_packets, &empty, &offset);
709                                        if (ret == READ_EOF) {
710                                                goto eof;
711                                        } else if (ret == READ_ERROR) {
712                                                goto error;
713                                        }
714                                        if (ret != 1) {
715                                                fprintf(stderr, "Unknown error pausing thread in perpkt_threads_entry()\n");
716                                                pthread_exit(NULL);
717                                        }
718                                        continue;
719                                case MESSAGE_DO_STOP: // This is internal
720                                        goto eof;
721                        }
722                        send_message(trace, t, message.code, message.data, 
723                                        message.sender);
724                        /* Continue and the empty messages out before packets */
725                        continue;
726                }
727
728
729                /* Do we need to read a new set of packets MOST LIKELY we do */
730                if (offset == nb_packets) {
731                        /* Refill the packet buffer */
732                        if (empty != nb_packets) {
733                                // Refill the empty packets
734                                libtrace_ocache_alloc(&trace->packet_freelist,
735                                                      (void **) &packets[empty],
736                                                      nb_packets - empty,
737                                                      nb_packets - empty);
738                        }
739                        if (!trace->pread) {
740                                if (!packets[0]) {
741                                        fprintf(stderr, "Unable to read into NULL packet structure\n");
742                                        pthread_exit(NULL);
743                                }
744                                nb_packets = trace_read_packet(trace, packets[0]);
745                                packets[0]->error = nb_packets;
746                                if (nb_packets > 0)
747                                        nb_packets = 1;
748                        } else {
749                                nb_packets = trace->pread(trace, t, packets, trace->config.burst_size);
750                        }
751                        offset = 0;
752                        empty = 0;
753                }
754
755                /* Handle error/message cases */
756                if (nb_packets > 0) {
757                        /* Store the first non-meta packet */
758                        for (j = 0; j < nb_packets; j++) {
759                                if (t->recorded_first)
760                                        break;
761                                if (packets[j]->error > 0) {
762                                        store_first_packet(trace, packets[j], t);
763                                }
764                        }
765                        dispatch_packets(trace, t, packets, nb_packets, &empty,
766                                         &offset, trace->tracetime);
767                } else {
768                        switch (nb_packets) {
769                        case READ_EOF:
770                                goto eof;
771                        case READ_ERROR:
772                                goto error;
773                        case READ_MESSAGE:
774                                nb_packets = 0;
775                                continue;
776                        default:
777                                fprintf(stderr, "Unexpected error %d!!\n", nb_packets);
778                                goto error;
779                        }
780                }
781
782        }
783
784error:
785        message.code = MESSAGE_DO_STOP;
786        message.sender = t;
787        message.data.uint64 = 0;
788        trace_message_perpkts(trace, &message);
789eof:
790        /* ~~~~~~~~~~~~~~ Trace is finished do tear down ~~~~~~~~~~~~~~~~~~~~~ */
791
792        // Let the per_packet function know we have stopped
793        send_message(trace, t, MESSAGE_PAUSING, gen_zero, t);
794        send_message(trace, t, MESSAGE_STOPPING, gen_zero, t);
795
796        // Free any remaining packets
797        for (i = 0; i < trace->config.burst_size; i++) {
798                if (packets[i]) {
799                        libtrace_ocache_free(&trace->packet_freelist, (void **) &packets[i], 1, 1);
800                        packets[i] = NULL;
801                }
802        }
803
804        thread_change_state(trace, t, THREAD_FINISHED, true);
805
806        /* Make sure the reporter sees we have finished */
807        if (trace_has_reporter(trace))
808                trace_post_reporter(trace);
809
810        // Release all ocache memory before unregistering with the format
811        // because this might(it does in DPDK) unlink the formats mempool
812        // causing destroy/finish packet to fail.
813        libtrace_ocache_unregister_thread(&trace->packet_freelist);
814        if (trace->format->punregister_thread) {
815                trace->format->punregister_thread(trace, t);
816        }
817        print_memory_stats();
818
819        pthread_exit(NULL);
820}
821
822/**
823 * The start point for our single threaded hasher thread, this will read
824 * and hash a packet from a data source and queue it against the correct
825 * core to process it.
826 */
827static void* hasher_entry(void *data) {
828        libtrace_t *trace = (libtrace_t *)data;
829        libtrace_thread_t * t;
830        int i;
831        libtrace_packet_t * packet;
832        libtrace_message_t message = {0, {.uint64=0}, NULL};
833        int pkt_skipped = 0;
834
835        if (!trace_has_dedicated_hasher(trace)) {
836                fprintf(stderr, "Trace does not have hasher associated with it in hasher_entry()\n");
837                pthread_exit(NULL);
838        }
839        /* Wait until all threads are started and objects are initialised (ring buffers) */
840        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
841        t = &trace->hasher_thread;
842        if (!(t->type == THREAD_HASHER && pthread_equal(pthread_self(), t->tid))) {
843                fprintf(stderr, "Incorrect thread type or non matching thread IDs in hasher_entry()\n");
844                pthread_exit(NULL);
845        }
846
847        if (trace->state == STATE_ERROR) {
848                thread_change_state(trace, t, THREAD_FINISHED, false);
849                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
850                pthread_exit(NULL);
851        }
852        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
853
854        /* We are reading but it is not the parallel API */
855        if (trace->format->pregister_thread) {
856                trace->format->pregister_thread(trace, t, true);
857        }
858
859        /* Read all packets in then hash and queue against the correct thread */
860        while (1) {
861                int thread;
862                if (!pkt_skipped)
863                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
864                if (!packet) {
865                        fprintf(stderr, "Hasher thread was unable to get a fresh packet from the "
866                                "object cache\n");
867                        pthread_exit(NULL);
868                }
869
870                // Check for messages that we expect MESSAGE_DO_PAUSE, (internal messages only)
871                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
872                        switch(message.code) {
873                                case MESSAGE_DO_PAUSE:
874                                        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
875                                        thread_change_state(trace, t, THREAD_PAUSED, false);
876                                        pthread_cond_broadcast(&trace->perpkt_cond);
877                                        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
878                                                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
879                                        }
880                                        thread_change_state(trace, t, THREAD_RUNNING, false);
881                                        pthread_cond_broadcast(&trace->perpkt_cond);
882                                        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
883                                        break;
884                                case MESSAGE_DO_STOP:
885                                        /* Either FINISHED or FINISHING */
886                                        if (!(trace->started == false)) {
887                                                fprintf(stderr, "STOP message received by hasher, but "
888                                                        "trace is still active\n");
889                                                pthread_exit(NULL);
890                                        }
891                                        /* Mark the current packet as EOF */
892                                        packet->error = 0;
893                                        goto hasher_eof;
894                                default:
895                                        fprintf(stderr, "Hasher thread didn't expect message code=%d\n", message.code);
896                        }
897                        pkt_skipped = 1;
898                        continue;
899                }
900
901                if ((packet->error = trace_read_packet(trace, packet)) <1) {
902                        if (packet->error == READ_MESSAGE) {
903                                pkt_skipped = 1;
904                                continue;
905                        } else {
906                                break; /* We are EOF or error'd either way we stop  */
907                        }
908                }
909
910                /* We are guaranteed to have a hash function i.e. != NULL */
911                trace_packet_set_hash(packet, (*trace->hasher)(packet, trace->hasher_data));
912                thread = trace_packet_get_hash(packet) % trace->perpkt_thread_count;
913                /* Blocking write to the correct queue - I'm the only writer */
914                if (trace->perpkt_threads[thread].state != THREAD_FINISHED) {
915                        uint64_t order = trace_packet_get_order(packet);
916                        libtrace_ringbuffer_write(&trace->perpkt_threads[thread].rbuffer, packet);
917                        if (trace->config.tick_count && order % trace->config.tick_count == 0) {
918                                // Write ticks to everyone else
919                                libtrace_packet_t * pkts[trace->perpkt_thread_count];
920                                memset(pkts, 0, sizeof(void *) * trace->perpkt_thread_count);
921                                libtrace_ocache_alloc(&trace->packet_freelist, (void **) pkts, trace->perpkt_thread_count, trace->perpkt_thread_count);
922                                for (i = 0; i < trace->perpkt_thread_count; i++) {
923                                        pkts[i]->error = READ_TICK;
924                                        trace_packet_set_order(pkts[i], order);
925                                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, pkts[i]);
926                                }
927                        }
928                        pkt_skipped = 0;
929                }
930        }
931hasher_eof:
932        /* Broadcast our last failed read to all threads */
933        for (i = 0; i < trace->perpkt_thread_count; i++) {
934                libtrace_packet_t * bcast;
935                if (i == trace->perpkt_thread_count - 1) {
936                        bcast = packet;
937                } else {
938                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &bcast, 1, 1);
939                        bcast->error = packet->error;
940                }
941                ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
942                if (trace->perpkt_threads[i].state != THREAD_FINISHED) {
943                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, bcast);
944                } else {
945                        libtrace_ocache_free(&trace->packet_freelist, (void **) &bcast, 1, 1);
946                }
947                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
948        }
949
950        // We don't need to free the packet
951        thread_change_state(trace, t, THREAD_FINISHED, true);
952
953        libtrace_ocache_unregister_thread(&trace->packet_freelist);
954        if (trace->format->punregister_thread) {
955                trace->format->punregister_thread(trace, t);
956        }
957        print_memory_stats();
958
959        // TODO remove from TTABLE t sometime
960        pthread_exit(NULL);
961}
962
963/* Our simplest case when a thread becomes ready it can obtain an exclusive
964 * lock to read packets from the underlying trace.
965 */
966static int trace_pread_packet_first_in_first_served(libtrace_t *libtrace,
967                                                    libtrace_thread_t *t,
968                                                    libtrace_packet_t *packets[],
969                                                    size_t nb_packets) {
970        size_t i = 0;
971        //bool tick_hit = false;
972
973        ASSERT_RET(pthread_mutex_lock(&libtrace->read_packet_lock), == 0);
974        /* Read nb_packets */
975        for (i = 0; i < nb_packets; ++i) {
976                if (libtrace_message_queue_count(&t->messages) > 0) {
977                        if ( i==0 ) {
978                                ASSERT_RET(pthread_mutex_unlock(&libtrace->read_packet_lock), == 0);
979                                return READ_MESSAGE;
980                        } else {
981                                break;
982                        }
983                }
984                packets[i]->error = trace_read_packet(libtrace, packets[i]);
985
986                if (packets[i]->error <= 0) {
987                        /* We'll catch this next time if we have already got packets */
988                        if ( i==0 ) {
989                                ASSERT_RET(pthread_mutex_unlock(&libtrace->read_packet_lock), == 0);
990                                return packets[i]->error;
991                        } else {
992                                break;
993                        }
994                }
995                /*
996                if (libtrace->config.tick_count && trace_packet_get_order(packets[i]) % libtrace->config.tick_count == 0) {
997                        tick_hit = true;
998                }*/
999
1000                // Doing this inside the lock ensures the first packet is
1001                // always recorded first
1002                if (!t->recorded_first && packets[0]->error > 0) {
1003                        store_first_packet(libtrace, packets[0], t);
1004                }
1005        }
1006        ASSERT_RET(pthread_mutex_unlock(&libtrace->read_packet_lock), == 0);
1007        /* XXX TODO this needs to be inband with packets, or we don't bother in this case
1008        if (tick_hit) {
1009                libtrace_message_t tick;
1010                tick.additional.uint64 = trace_packet_get_order(packets[i]);
1011                tick.code = MESSAGE_TICK;
1012                trace_send_message_to_perpkts(libtrace, &tick);
1013        } */
1014        return i;
1015}
1016
1017/**
1018 * For the case that we have a dedicated hasher thread
1019 * 1. We read a packet from our buffer
1020 * 2. Move that into the packet provided (packet)
1021 */
1022inline static int trace_pread_packet_hasher_thread(libtrace_t *libtrace,
1023                                                   libtrace_thread_t *t,
1024                                                   libtrace_packet_t *packets[],
1025                                                   size_t nb_packets) {
1026        size_t i;
1027
1028        /* We store the last error message here */
1029        if (t->format_data) {
1030                return ((libtrace_packet_t *)t->format_data)->error;
1031        }
1032
1033        // Always grab at least one
1034        if (packets[0]) // Recycle the old get the new
1035                libtrace_ocache_free(&libtrace->packet_freelist, (void **) packets, 1, 1);
1036        packets[0] = libtrace_ringbuffer_read(&t->rbuffer);
1037
1038        if (packets[0]->error <= 0 && packets[0]->error != READ_TICK) {
1039                return packets[0]->error;
1040        }
1041
1042        for (i = 1; i < nb_packets; i++) {
1043                if (packets[i]) // Recycle the old get the new
1044                        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packets[i], 1, 1);
1045                if (!libtrace_ringbuffer_try_read(&t->rbuffer, (void **) &packets[i])) {
1046                        packets[i] = NULL;
1047                        break;
1048                }
1049
1050                /* We will return an error or EOF the next time around */
1051                if (packets[i]->error <= 0 && packets[0]->error != READ_TICK) {
1052                        /* The message case will be checked automatically -
1053                           However other cases like EOF and error will only be
1054                           sent once*/
1055                        if (packets[i]->error != READ_MESSAGE) {
1056                                t->format_data = packets[i];
1057                        }
1058                        break;
1059                }
1060        }
1061
1062        return i;
1063}
1064
1065/**
1066 * For the first packet of each queue we keep a copy and note the system
1067 * time it was received at.
1068 *
1069 * This is used for finding the first packet when playing back a trace
1070 * in trace time. And can be used by real time applications to print
1071 * results out every XXX seconds.
1072 */
1073void store_first_packet(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t)
1074{
1075
1076        libtrace_message_t mesg = {0, {.uint64=0}, NULL};
1077        struct timeval tv;
1078        libtrace_packet_t * dup;
1079
1080        if (t->recorded_first) {
1081                return;
1082        }
1083
1084        if (IS_LIBTRACE_META_PACKET(packet)) {
1085                return;
1086        }
1087
1088        /* We mark system time against a copy of the packet */
1089        gettimeofday(&tv, NULL);
1090        dup = trace_copy_packet(packet);
1091
1092        ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
1093        libtrace->first_packets.packets[t->perpkt_num].packet = dup;
1094        memcpy(&libtrace->first_packets.packets[t->perpkt_num].tv, &tv, sizeof(tv));
1095        libtrace->first_packets.count++;
1096
1097        /* Now update the first */
1098        if (libtrace->first_packets.count == 1) {
1099                /* We the first entry hence also the first known packet */
1100                libtrace->first_packets.first = t->perpkt_num;
1101        } else {
1102                /* Check if we are newer than the previous 'first' packet */
1103                size_t first = libtrace->first_packets.first;
1104                struct timeval cur_ts = trace_get_timeval(dup);
1105                struct timeval first_ts = trace_get_timeval(libtrace->first_packets.packets[first].packet);
1106                if (timercmp(&cur_ts, &first_ts, <))
1107                        libtrace->first_packets.first = t->perpkt_num;
1108        }
1109        ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
1110
1111        memset(&mesg, 0, sizeof(libtrace_message_t));
1112        mesg.code = MESSAGE_FIRST_PACKET;
1113        trace_message_reporter(libtrace, &mesg);
1114        trace_message_perpkts(libtrace, &mesg);
1115        t->recorded_first = true;
1116}
1117
1118DLLEXPORT int trace_get_first_packet(libtrace_t *libtrace,
1119                                     libtrace_thread_t *t,
1120                                     const libtrace_packet_t **packet,
1121                                     const struct timeval **tv)
1122{
1123        void * tmp;
1124        int ret = 0;
1125
1126        if (t) {
1127                if (t->type != THREAD_PERPKT || t->trace != libtrace)
1128                        return -1;
1129        }
1130
1131        /* Throw away these which we don't use */
1132        if (!packet)
1133                packet = (const libtrace_packet_t **) &tmp;
1134        if (!tv)
1135                tv = (const struct timeval **) &tmp;
1136
1137        ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
1138        if (t) {
1139                /* Get the requested thread */
1140                *packet = libtrace->first_packets.packets[t->perpkt_num].packet;
1141                *tv = &libtrace->first_packets.packets[t->perpkt_num].tv;
1142        } else if (libtrace->first_packets.count) {
1143                /* Get the first packet across all threads */
1144                *packet = libtrace->first_packets.packets[libtrace->first_packets.first].packet;
1145                *tv = &libtrace->first_packets.packets[libtrace->first_packets.first].tv;
1146                if (libtrace->first_packets.count == (size_t) libtrace->perpkt_thread_count) {
1147                        ret = 1;
1148                } else {
1149                        struct timeval curr_tv;
1150                        // If a second has passed since the first entry we will assume this is the very first packet
1151                        gettimeofday(&curr_tv, NULL);
1152                        if (curr_tv.tv_sec > (*tv)->tv_sec) {
1153                                if(curr_tv.tv_usec > (*tv)->tv_usec || curr_tv.tv_sec - (*tv)->tv_sec > 1) {
1154                                        ret = 1;
1155                                }
1156                        }
1157                }
1158        } else {
1159                *packet = NULL;
1160                *tv = NULL;
1161        }
1162        ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
1163        return ret;
1164}
1165
1166
1167DLLEXPORT uint64_t tv_to_usec(const struct timeval *tv)
1168{
1169        return (uint64_t) tv->tv_sec*1000000ull + (uint64_t) tv->tv_usec;
1170}
1171
1172inline static struct timeval usec_to_tv(uint64_t usec)
1173{
1174        struct timeval tv;
1175        tv.tv_sec = usec / 1000000;
1176        tv.tv_usec = usec % 1000000;
1177        return tv;
1178}
1179
1180/** Similar to delay_tracetime but send messages to all threads periodically */
1181static void* reporter_entry(void *data) {
1182        libtrace_message_t message = {0, {.uint64=0}, NULL};
1183        libtrace_t *trace = (libtrace_t *)data;
1184        libtrace_thread_t *t = &trace->reporter_thread;
1185
1186        /* Wait until all threads are started */
1187        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
1188        if (trace->state == STATE_ERROR) {
1189                thread_change_state(trace, t, THREAD_FINISHED, false);
1190                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1191                pthread_exit(NULL);
1192        }
1193        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1194
1195        if (trace->format->pregister_thread) {
1196                trace->format->pregister_thread(trace, t, false);
1197        }
1198
1199        send_message(trace, t, MESSAGE_STARTING, (libtrace_generic_t){0}, t);
1200        send_message(trace, t, MESSAGE_RESUMING, (libtrace_generic_t){0}, t);
1201
1202        while (!trace_has_finished(trace)) {
1203                if (trace->config.reporter_polling) {
1204                        if (libtrace_message_queue_try_get(&t->messages, &message) == LIBTRACE_MQ_FAILED)
1205                                message.code = MESSAGE_POST_REPORTER;
1206                } else {
1207                        libtrace_message_queue_get(&t->messages, &message);
1208                }
1209                switch (message.code) {
1210                        // Check for results
1211                        case MESSAGE_POST_REPORTER:
1212                                trace->combiner.read(trace, &trace->combiner);
1213                                break;
1214                        case MESSAGE_DO_PAUSE:
1215                                if(trace->combiner.pause) {
1216                                        trace->combiner.pause(trace, &trace->combiner);
1217                                }
1218                                send_message(trace, t, MESSAGE_PAUSING,
1219                                                (libtrace_generic_t) {0}, t);
1220                                trace_thread_pause(trace, t);
1221                                send_message(trace, t, MESSAGE_RESUMING,
1222                                                (libtrace_generic_t) {0}, t);
1223                                break;
1224                default:
1225                        send_message(trace, t, message.code, message.data,
1226                                        message.sender);
1227                }
1228        }
1229
1230        // Flush out whats left now all our threads have finished
1231        trace->combiner.read_final(trace, &trace->combiner);
1232
1233        // GOODBYE
1234        send_message(trace, t, MESSAGE_PAUSING,(libtrace_generic_t) {0}, t);
1235        send_message(trace, t, MESSAGE_STOPPING,(libtrace_generic_t) {0}, t);
1236
1237        thread_change_state(trace, &trace->reporter_thread, THREAD_FINISHED, true);
1238        print_memory_stats();
1239        pthread_exit(NULL);
1240}
1241
1242/** Similar to delay_tracetime but send messages to all threads periodically */
1243static void* keepalive_entry(void *data) {
1244        struct timeval prev, next;
1245        libtrace_message_t message = {0, {.uint64=0}, NULL};
1246        libtrace_t *trace = (libtrace_t *)data;
1247        uint64_t next_release;
1248        libtrace_thread_t *t = &trace->keepalive_thread;
1249
1250        /* Wait until all threads are started */
1251        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
1252        if (trace->state == STATE_ERROR) {
1253                thread_change_state(trace, t, THREAD_FINISHED, false);
1254                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1255                pthread_exit(NULL);
1256        }
1257        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1258
1259        gettimeofday(&prev, NULL);
1260        memset(&message, 0, sizeof(libtrace_message_t));
1261        message.code = MESSAGE_TICK_INTERVAL;
1262
1263        while (trace->state != STATE_FINISHED) {
1264                fd_set rfds;
1265                next_release = tv_to_usec(&prev) + (trace->config.tick_interval * 1000);
1266                gettimeofday(&next, NULL);
1267                if (next_release > tv_to_usec(&next)) {
1268                        next = usec_to_tv(next_release - tv_to_usec(&next));
1269                        // Wait for timeout or a message
1270                        FD_ZERO(&rfds);
1271                        FD_SET(libtrace_message_queue_get_fd(&t->messages), &rfds);
1272                        if (select(libtrace_message_queue_get_fd(&t->messages)+1, &rfds, NULL, NULL, &next) == 1) {
1273                                libtrace_message_t msg;
1274                                libtrace_message_queue_get(&t->messages, &msg);
1275                                if (msg.code != MESSAGE_DO_STOP) {
1276                                        fprintf(stderr, "Unexpected message code in keepalive_entry()\n");
1277                                        pthread_exit(NULL);
1278                                }
1279                                goto done;
1280                        }
1281                }
1282                prev = usec_to_tv(next_release);
1283                if (trace->state == STATE_RUNNING) {
1284                        message.data.uint64 = ((((uint64_t)prev.tv_sec) << 32) +
1285                                               (((uint64_t)prev.tv_usec << 32)/1000000));
1286                        trace_message_perpkts(trace, &message);
1287                }
1288        }
1289done:
1290
1291        thread_change_state(trace, t, THREAD_FINISHED, true);
1292        pthread_exit(NULL);
1293}
1294
1295/**
1296 * Delays a packets playback so the playback will be in trace time.
1297 * This may break early if a message becomes available.
1298 *
1299 * Requires the first packet for this thread to be received.
1300 * @param libtrace  The trace
1301 * @param packet    The packet to delay
1302 * @param t         The current thread
1303 * @return Either READ_MESSAGE(-2) or 0 is successful
1304 */
1305static inline int delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t) {
1306        struct timeval curr_tv, pkt_tv;
1307        uint64_t next_release = t->tracetime_offset_usec;
1308        uint64_t curr_usec;
1309
1310        if (!t->tracetime_offset_usec) {
1311                const libtrace_packet_t *first_pkt;
1312                const struct timeval *sys_tv;
1313                int64_t initial_offset;
1314                int stable = trace_get_first_packet(libtrace, NULL, &first_pkt, &sys_tv);
1315                if (!first_pkt)
1316                        return 0;
1317                pkt_tv = trace_get_timeval(first_pkt);
1318                initial_offset = (int64_t)tv_to_usec(sys_tv) - (int64_t)tv_to_usec(&pkt_tv);
1319                /* In the unlikely case offset is 0, change it to 1 */
1320                if (stable)
1321                        t->tracetime_offset_usec = initial_offset ? initial_offset: 1;
1322                next_release = initial_offset;
1323        }
1324        /* next_release == offset */
1325        pkt_tv = trace_get_timeval(packet);
1326        next_release += tv_to_usec(&pkt_tv);
1327        gettimeofday(&curr_tv, NULL);
1328        curr_usec = tv_to_usec(&curr_tv);
1329        if (next_release > curr_usec) {
1330                int ret, mesg_fd = libtrace_message_queue_get_fd(&t->messages);
1331                struct timeval delay_tv = usec_to_tv(next_release-curr_usec);
1332                fd_set rfds;
1333                FD_ZERO(&rfds);
1334                FD_SET(mesg_fd, &rfds);
1335                // We need to wait
1336                ret = select(mesg_fd+1, &rfds, NULL, NULL, &delay_tv);
1337                if (ret == 0) {
1338                        return 0;
1339                } else if (ret > 0) {
1340                        return READ_MESSAGE;
1341                } else {
1342                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, "Unexpected return from select in delay_tracetime()");
1343                        return -1;
1344                }
1345        }
1346        return 0;
1347}
1348
1349/* Discards packets that don't match the filter.
1350 * Discarded packets are emptied and then moved to the end of the packet list.
1351 *
1352 * @param trace       The trace format, containing the filter
1353 * @param packets     An array of packets
1354 * @param nb_packets  The number of valid items in packets
1355 *
1356 * @return The number of packets that passed the filter, which are moved to
1357 *          the start of the packets array
1358 */
1359static inline size_t filter_packets(libtrace_t *trace,
1360                                    libtrace_packet_t **packets,
1361                                    size_t nb_packets) {
1362        size_t offset = 0;
1363        size_t i;
1364
1365        for (i = 0; i < nb_packets; ++i) {
1366                // The filter needs the trace attached to receive the link type
1367                packets[i]->trace = trace;
1368                if (trace_apply_filter(trace->filter, packets[i])) {
1369                        libtrace_packet_t *tmp;
1370                        tmp = packets[offset];
1371                        packets[offset++] = packets[i];
1372                        packets[i] = tmp;
1373                } else {
1374                        trace_fin_packet(packets[i]);
1375                }
1376        }
1377
1378        return offset;
1379}
1380
1381/* Read a batch of packets from the trace into a buffer.
1382 * Note that this function will block until a packet is read (or EOF is reached)
1383 *
1384 * @param libtrace    The trace
1385 * @param t           The thread
1386 * @param packets     An array of packets
1387 * @param nb_packets  The number of empty packets in packets
1388 * @return The number of packets read, 0 on EOF (or an error/message -1,-2).
1389 */
1390static int trace_pread_packet_wrapper(libtrace_t *libtrace,
1391                                      libtrace_thread_t *t,
1392                                      libtrace_packet_t *packets[],
1393                                      size_t nb_packets) {
1394        int i;
1395        if (!libtrace) {
1396                fprintf(stderr, "NULL trace passed into trace_read_packet()\n");
1397                return TRACE_ERR_NULL_TRACE;
1398        }
1399        if (nb_packets <= 0) {
1400                trace_set_err(libtrace, TRACE_ERR_NULL,
1401                        "nb_packets must be greater than zero in trace_pread_packet_wrapper()");
1402                return -1;
1403        }
1404        if (trace_is_err(libtrace))
1405                return -1;
1406        if (!libtrace->started) {
1407                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
1408                              "You must call libtrace_start() before trace_read_packet()\n");
1409                return -1;
1410        }
1411
1412        if (libtrace->format->pread_packets) {
1413                int ret;
1414                for (i = 0; i < (int) nb_packets; ++i) {
1415                        if (!i[packets]) {
1416                                trace_set_err(libtrace, TRACE_ERR_BAD_STATE, "NULL packets in "
1417                                        "trace_pread_packet_wrapper()");
1418                                return -1;
1419                        }
1420                        if (!(packets[i]->buf_control==TRACE_CTRL_PACKET ||
1421                              packets[i]->buf_control==TRACE_CTRL_EXTERNAL)) {
1422                                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,
1423                                              "Packet passed to trace_read_packet() is invalid\n");
1424                                return -1;
1425                        }
1426                        packets[i]->which_trace_start = libtrace->startcount;
1427                }
1428                do {
1429                        ret=libtrace->format->pread_packets(libtrace, t,
1430                                                            packets,
1431                                                            nb_packets);
1432                        /* Error, EOF or message? */
1433                        if (ret <= 0) {
1434                                return ret;
1435                        }
1436
1437                        if (libtrace->filter) {
1438                                int remaining;
1439                                remaining = filter_packets(libtrace,
1440                                                           packets, ret);
1441                                t->filtered_packets += ret - remaining;
1442                                ret = remaining;
1443                        }
1444                        for (i = 0; i < ret; ++i) {
1445                                /* We do not mark the packet against the trace,
1446                                 * before hand or after. After breaks DAG meta
1447                                 * packets and before is inefficient */
1448                                //packets[i]->trace = libtrace;
1449                                /* TODO IN FORMAT?? Like traditional libtrace */
1450                                if (libtrace->snaplen>0)
1451                                        trace_set_capture_length(packets[i],
1452                                                        libtrace->snaplen);
1453                        }
1454                } while(ret == 0);
1455                return ret;
1456        }
1457        trace_set_err(libtrace, TRACE_ERR_UNSUPPORTED,
1458                      "This format does not support reading packets\n");
1459        return ~0U;
1460}
1461
1462/* Restarts a parallel trace, this is called from trace_pstart.
1463 * The libtrace lock is held upon calling this function.
1464 * Typically with a parallel trace the threads are not
1465 * killed rather.
1466 */
1467static int trace_prestart(libtrace_t * libtrace, void *global_blob,
1468                          libtrace_callback_set_t *per_packet_cbs, 
1469                          libtrace_callback_set_t *reporter_cbs) {
1470        int i, err = 0;
1471        if (libtrace->state != STATE_PAUSED) {
1472                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
1473                        "trace(%s) is not currently paused",
1474                              libtrace->uridata);
1475                return -1;
1476        }
1477
1478        if (!libtrace_parallel) {
1479                trace_set_err(libtrace, TRACE_ERR_THREAD, "Trace_prestart() has been called on a "
1480                        "non-parallel libtrace input?");
1481                return -1;
1482        }
1483        if (libtrace->perpkt_thread_states[THREAD_RUNNING]) {
1484                trace_set_err(libtrace, TRACE_ERR_THREAD, "Cannot restart a parallel libtrace input "
1485                        "while it is still running");
1486                return -1;
1487        }
1488
1489        /* Reset first packets */
1490        pthread_spin_lock(&libtrace->first_packets.lock);
1491        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1492                if (libtrace->first_packets.packets[i].packet) {
1493                        trace_destroy_packet(libtrace->first_packets.packets[i].packet);
1494                        libtrace->first_packets.packets[i].packet = NULL;
1495                        libtrace->first_packets.packets[i].tv.tv_sec = 0;
1496                        libtrace->first_packets.packets[i].tv.tv_usec = 0;
1497                        libtrace->first_packets.count--;
1498                        libtrace->perpkt_threads[i].recorded_first = false;
1499                }
1500        }
1501        if (libtrace->first_packets.count != 0) {
1502                trace_set_err(libtrace, TRACE_ERR_THREAD, "Expected a first packets count of 0 in trace_pstart()");
1503                return -1;
1504        }
1505        libtrace->first_packets.first = 0;
1506        pthread_spin_unlock(&libtrace->first_packets.lock);
1507
1508        /* Reset delay */
1509        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1510                libtrace->perpkt_threads[i].tracetime_offset_usec = 0;
1511        }
1512
1513        /* Reset statistics */
1514        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
1515                libtrace->perpkt_threads[i].accepted_packets = 0;
1516                libtrace->perpkt_threads[i].filtered_packets = 0;
1517        }
1518        libtrace->accepted_packets = 0;
1519        libtrace->filtered_packets = 0;
1520
1521        /* Update functions if requested */
1522        if(global_blob)
1523                libtrace->global_blob = global_blob;
1524
1525        if (per_packet_cbs) {
1526                if (libtrace->perpkt_cbs)
1527                        trace_destroy_callback_set(libtrace->perpkt_cbs);
1528                libtrace->perpkt_cbs = trace_create_callback_set();
1529                memcpy(libtrace->perpkt_cbs, per_packet_cbs, 
1530                                sizeof(libtrace_callback_set_t));
1531        }
1532
1533        if (reporter_cbs) {
1534                if (libtrace->reporter_cbs)
1535                        trace_destroy_callback_set(libtrace->reporter_cbs);
1536
1537                libtrace->reporter_cbs = trace_create_callback_set();
1538                memcpy(libtrace->reporter_cbs, reporter_cbs,
1539                                sizeof(libtrace_callback_set_t));
1540        }
1541
1542        if (trace_is_parallel(libtrace)) {
1543                err = libtrace->format->pstart_input(libtrace);
1544        } else {
1545                if (libtrace->format->start_input) {
1546                        err = libtrace->format->start_input(libtrace);
1547                }
1548        }
1549
1550        if (err == 0) {
1551                libtrace->started = true;
1552                libtrace->startcount ++;
1553                libtrace_change_state(libtrace, STATE_RUNNING, false);
1554        }
1555        return err;
1556}
1557
1558/**
1559 * @return the number of CPU cores on the machine. -1 if unknown.
1560 */
1561SIMPLE_FUNCTION static int get_nb_cores() {
1562        int numCPU;
1563#ifdef _SC_NPROCESSORS_ONLN
1564        /* Most systems do this now */
1565        numCPU = sysconf(_SC_NPROCESSORS_ONLN);
1566
1567#else
1568        int mib[] = {CTL_HW, HW_AVAILCPU};
1569        size_t len = sizeof(numCPU);
1570
1571        /* get the number of CPUs from the system */
1572        sysctl(mib, 2, &numCPU, &len, NULL, 0);
1573#endif
1574        return numCPU <= 0 ? 1 : numCPU;
1575}
1576
1577/**
1578 * Verifies the configuration and sets default values for any values not
1579 * specified by the user.
1580 */
1581static void verify_configuration(libtrace_t *libtrace) {
1582
1583        if (libtrace->config.hasher_queue_size <= 0)
1584                libtrace->config.hasher_queue_size = 1000;
1585
1586        if (libtrace->config.perpkt_threads <= 0) {
1587                libtrace->perpkt_thread_count = get_nb_cores();
1588                if (libtrace->perpkt_thread_count <= 0)
1589                        // Lets just use one
1590                        libtrace->perpkt_thread_count = 1;
1591        } else {
1592                libtrace->perpkt_thread_count = libtrace->config.perpkt_threads;
1593        }
1594
1595        if (libtrace->config.reporter_thold <= 0)
1596                libtrace->config.reporter_thold = 100;
1597        if (libtrace->config.burst_size <= 0)
1598                libtrace->config.burst_size = 32;
1599        if (libtrace->config.thread_cache_size <= 0)
1600                libtrace->config.thread_cache_size = 64;
1601        if (libtrace->config.cache_size <= 0)
1602                libtrace->config.cache_size = (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count;
1603
1604        if (libtrace->config.cache_size <
1605                (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count)
1606                fprintf(stderr, "WARNING deadlocks may occur and extra memory allocating buffer sizes (packet_freelist_size) mismatched\n");
1607
1608        if (libtrace->combiner.initialise == NULL && libtrace->combiner.publish == NULL)
1609                libtrace->combiner = combiner_unordered;
1610
1611        /* Figure out if we are using a dedicated hasher thread? */
1612        if (libtrace->hasher && libtrace->perpkt_thread_count > 1) {
1613                libtrace->hasher_thread.type = THREAD_HASHER;
1614        }
1615}
1616
1617/**
1618 * Starts a libtrace_thread, including allocating memory for messaging.
1619 * Threads are expected to wait until the libtrace look is released.
1620 * Hence why we don't init structures until later.
1621 *
1622 * @param trace The trace the thread is associated with
1623 * @param t The thread that is filled when the thread is started
1624 * @param type The type of thread
1625 * @param start_routine The entry location of the thread
1626 * @param perpkt_num The perpkt thread number (should be set -1 if not perpkt)
1627 * @param name For debugging purposes set the threads name (Optional)
1628 *
1629 * @return 0 on success or -1 upon error in which case the libtrace error is set.
1630 *         In this situation the thread structure is zeroed.
1631 */
1632static int trace_start_thread(libtrace_t *trace,
1633                       libtrace_thread_t *t,
1634                       enum thread_types type,
1635                       void *(*start_routine) (void *),
1636                       int perpkt_num,
1637                       const char *name) {
1638#ifdef __linux__
1639        cpu_set_t cpus;
1640        int i;
1641#endif
1642        int ret;
1643        if (t->type != THREAD_EMPTY) {
1644                trace_set_err(trace, TRACE_ERR_THREAD,
1645                        "Expected thread type of THREAD_EMPTY in trace_start_thread()");
1646                return -1;
1647        }
1648        t->trace = trace;
1649        t->ret = NULL;
1650        t->user_data = NULL;
1651        t->type = type;
1652        t->state = THREAD_RUNNING;
1653
1654        if (!name) {
1655                trace_set_err(trace, TRACE_ERR_THREAD, "NULL thread name in trace_start_thread()");
1656                return -1;
1657        }
1658
1659#ifdef __linux__
1660        CPU_ZERO(&cpus);
1661        for (i = 0; i < get_nb_cores(); i++)
1662                CPU_SET(i, &cpus);
1663
1664        ret = pthread_create(&t->tid, NULL, start_routine, (void *) trace);
1665        if( ret == 0 ) {
1666                ret = pthread_setaffinity_np(t->tid, sizeof(cpus), &cpus);
1667        }
1668
1669#else
1670        ret = pthread_create(&t->tid, NULL, start_routine, (void *) trace);
1671#endif
1672        if (ret != 0) {
1673                libtrace_zero_thread(t);
1674                trace_set_err(trace, ret, "Failed to create a thread of type=%d\n", type);
1675                return -1;
1676        }
1677        libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
1678        if (trace_has_dedicated_hasher(trace) && type == THREAD_PERPKT) {
1679                libtrace_ringbuffer_init(&t->rbuffer,
1680                                         trace->config.hasher_queue_size,
1681                                         trace->config.hasher_polling?
1682                                                 LIBTRACE_RINGBUFFER_POLLING:
1683                                                 LIBTRACE_RINGBUFFER_BLOCKING);
1684        }
1685#if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__)
1686        if(name)
1687                pthread_setname_np(t->tid, name);
1688#endif
1689        t->perpkt_num = perpkt_num;
1690        return 0;
1691}
1692
1693/** Parses the environment variable LIBTRACE_CONF into the supplied
1694 * configuration structure.
1695 *
1696 * @param[in,out] libtrace The trace from which we determine the URI and set
1697 * the configuration.
1698 *
1699 * We search for 3 environment variables and apply them to the config in the
1700 * following order. Such that the first has the lowest priority.
1701 *
1702 * 1. LIBTRACE_CONF, The global environment configuration
1703 * 2. LIBTRACE_CONF_<FORMAT>, Applied to a given format
1704 * 3. LIBTRACE_CONF_<FORMAT_URI>, Applied the specified trace
1705 *
1706 * E.g.
1707 * - int:eth0 would match LIBTRACE_CONF, LIBTRACE_CONF_INT, LIBTRACE_CONF_INT_ETH0
1708 * - dag:/dev/dag0,0 would match LIBTRACE_CONF, LIBTRACE_CONF_DAG, LIBTRACE_CONF_DAG__DEV_DAG0_0
1709 * - test.erf would match LIBTRACE_CONF, LIBTRACE_CONF_ERF, LIBTRACE_CONF_ERF_TEST_ERF
1710 *
1711 * @note All environment variables names MUST only contian
1712 * [A-Z], [0-9] and [_] (underscore) and not start with a number. Any characters
1713 * outside of this range should be captilised if possible or replaced with an
1714 * underscore.
1715 */
1716static void parse_env_config (libtrace_t *libtrace) {
1717        char env_name[1024] = "LIBTRACE_CONF_";
1718        size_t len = strlen(env_name);
1719        size_t mark = 0;
1720        size_t i;
1721        char * env;
1722
1723        /* Make our compound string */
1724        strncpy(&env_name[len], libtrace->format->name, sizeof(env_name) - len);
1725        len += strlen(libtrace->format->name);
1726        strncpy(&env_name[len], ":", sizeof(env_name) - len);
1727        len += 1;
1728        strncpy(&env_name[len], libtrace->uridata, sizeof(env_name) - len);
1729
1730        /* env names are allowed to be A-Z (CAPS) 0-9 and _ */
1731        for (i = 0; env_name[i] != 0; ++i) {
1732                env_name[i] = toupper(env_name[i]);
1733                if(env_name[i] == ':') {
1734                        mark = i;
1735                }
1736                if (!( (env_name[i] >= 'A' && env_name[i] <= 'Z') ||
1737                       (env_name[i] >= '0' && env_name[i] <= '9') )) {
1738                        env_name[i] = '_';
1739                }
1740        }
1741
1742        /* First apply global env settings LIBTRACE_CONF */
1743        env = getenv("LIBTRACE_CONF");
1744        if (env)
1745        {
1746                printf("Got env %s", env);
1747                trace_set_configuration(libtrace, env);
1748        }
1749
1750        /* Then format settings LIBTRACE_CONF_<FORMAT> */
1751        if (mark != 0) {
1752                env_name[mark] = 0;
1753                env = getenv(env_name);
1754                if (env) {
1755                        trace_set_configuration(libtrace, env);
1756                }
1757                env_name[mark] = '_';
1758        }
1759
1760        /* Finally this specific trace LIBTRACE_CONF_<FORMAT_URI> */
1761        env = getenv(env_name);
1762        if (env) {
1763                trace_set_configuration(libtrace, env);
1764        }
1765}
1766
1767DLLEXPORT bool trace_is_parallel(libtrace_t * libtrace) {
1768        if (libtrace->state == STATE_NEW)
1769                return trace_supports_parallel(libtrace);
1770        return libtrace->pread == trace_pread_packet_wrapper;
1771}
1772
1773DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob,
1774                           libtrace_callback_set_t *per_packet_cbs,
1775                           libtrace_callback_set_t *reporter_cbs) {
1776        int i;
1777        int ret = -1;
1778        char name[24];
1779        sigset_t sig_before, sig_block_all;
1780        if (!libtrace) {
1781                fprintf(stderr, "NULL trace passed to trace_pstart()\n");
1782                return -1;
1783        }
1784
1785        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1786        if (trace_is_err(libtrace)) {
1787                goto cleanup_none;
1788        }
1789
1790        if (libtrace->state == STATE_PAUSED) {
1791                ret = trace_prestart(libtrace, global_blob, per_packet_cbs, 
1792                                reporter_cbs);
1793                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1794                return ret;
1795        }
1796
1797        if (libtrace->state != STATE_NEW) {
1798                trace_set_err(libtrace, TRACE_ERR_BAD_STATE, "trace_pstart "
1799                              "should be called on a NEW or PAUSED trace but "
1800                              "instead was called from %s",
1801                              get_trace_state_name(libtrace->state));
1802                goto cleanup_none;
1803        }
1804
1805        /* Store the user defined things against the trace */
1806        libtrace->global_blob = global_blob;
1807
1808        /* Save a copy of the callbacks in case the user tries to change them
1809         * on us later */
1810        if (!per_packet_cbs) {
1811                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "trace_pstart "
1812                                "requires a non-NULL set of per packet "
1813                                "callbacks.");
1814                goto cleanup_none;
1815        }
1816
1817        if (per_packet_cbs->message_packet == NULL) {
1818                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "The per "
1819                                "packet callbacks must include a handler "
1820                                "for a packet. Please set this using "
1821                                "trace_set_packet_cb().");
1822                goto cleanup_none;
1823        }
1824
1825        libtrace->perpkt_cbs = trace_create_callback_set();
1826        memcpy(libtrace->perpkt_cbs, per_packet_cbs, sizeof(libtrace_callback_set_t));
1827       
1828        if (reporter_cbs) {
1829                libtrace->reporter_cbs = trace_create_callback_set();
1830                memcpy(libtrace->reporter_cbs, reporter_cbs, sizeof(libtrace_callback_set_t));
1831        }
1832
1833       
1834
1835
1836        /* And zero other fields */
1837        for (i = 0; i < THREAD_STATE_MAX; ++i) {
1838                libtrace->perpkt_thread_states[i] = 0;
1839        }
1840        libtrace->first_packets.first = 0;
1841        libtrace->first_packets.count = 0;
1842        libtrace->first_packets.packets = NULL;
1843        libtrace->perpkt_threads = NULL;
1844        /* Set a global which says we are using a parallel trace. This is
1845         * for backwards compatibility due to changes when destroying packets */
1846        libtrace_parallel = 1;
1847
1848        /* Parses configuration passed through environment variables */
1849        parse_env_config(libtrace);
1850        verify_configuration(libtrace);
1851
1852        ret = -1;
1853        /* Try start the format - we prefer parallel over single threaded, as
1854         * these formats should support messages better */
1855
1856        if (trace_supports_parallel(libtrace) &&
1857            !trace_has_dedicated_hasher(libtrace)) {
1858                ret = libtrace->format->pstart_input(libtrace);
1859                libtrace->pread = trace_pread_packet_wrapper;
1860        }
1861        if (ret != 0) {
1862                if (libtrace->format->start_input) {
1863                        ret = libtrace->format->start_input(libtrace);
1864                }
1865                if (libtrace->perpkt_thread_count > 1) {
1866                        libtrace->pread = trace_pread_packet_first_in_first_served;
1867                        /* Don't wait for a burst of packets if the format is
1868                         * live as this could block ring based formats and
1869                         * introduces delay. */
1870                        if (libtrace->format->info.live) {
1871                                libtrace->config.burst_size = 1;
1872                        }
1873                }
1874                else {
1875                        /* Use standard read_packet */
1876                        libtrace->pread = NULL;
1877                }
1878        }
1879
1880        if (ret < 0) {
1881                goto cleanup_none;
1882        }
1883
1884        /* --- Start all the threads we need --- */
1885        /* Disable signals because it is inherited by the threads we start */
1886        sigemptyset(&sig_block_all);
1887        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_block_all, &sig_before), == 0);
1888
1889        /* If we need a hasher thread start it
1890         * Special Case: If single threaded we don't need a hasher
1891         */
1892        if (trace_has_dedicated_hasher(libtrace)) {
1893                libtrace->hasher_thread.type = THREAD_EMPTY;
1894                ret = trace_start_thread(libtrace, &libtrace->hasher_thread,
1895                                   THREAD_HASHER, hasher_entry, -1,
1896                                   "hasher-thread");
1897                if (ret != 0)
1898                        goto cleanup_started;
1899                libtrace->pread = trace_pread_packet_hasher_thread;
1900        } else {
1901                libtrace->hasher_thread.type = THREAD_EMPTY;
1902        }
1903
1904        /* Start up our perpkt threads */
1905        libtrace->perpkt_threads = calloc(sizeof(libtrace_thread_t),
1906                                          libtrace->perpkt_thread_count);
1907        if (!libtrace->perpkt_threads) {
1908                trace_set_err(libtrace, errno, "trace_pstart "
1909                              "failed to allocate memory.");
1910                goto cleanup_threads;
1911        }
1912        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1913                snprintf(name, sizeof(name), "perpkt-%d", i);
1914                libtrace_zero_thread(&libtrace->perpkt_threads[i]);
1915                ret = trace_start_thread(libtrace, &libtrace->perpkt_threads[i],
1916                                   THREAD_PERPKT, perpkt_threads_entry, i,
1917                                   name);
1918                if (ret != 0)
1919                        goto cleanup_threads;
1920        }
1921
1922        /* Start the reporter thread */
1923        if (reporter_cbs) {
1924                if (libtrace->combiner.initialise)
1925                        libtrace->combiner.initialise(libtrace, &libtrace->combiner);
1926                ret = trace_start_thread(libtrace, &libtrace->reporter_thread,
1927                                   THREAD_REPORTER, reporter_entry, -1,
1928                                   "reporter_thread");
1929                if (ret != 0)
1930                        goto cleanup_threads;
1931        }
1932
1933        /* Start the keepalive thread */
1934        if (libtrace->config.tick_interval > 0) {
1935                ret = trace_start_thread(libtrace, &libtrace->keepalive_thread,
1936                                   THREAD_KEEPALIVE, keepalive_entry, -1,
1937                                   "keepalive_thread");
1938                if (ret != 0)
1939                        goto cleanup_threads;
1940        }
1941
1942        /* Init other data structures */
1943        libtrace->perpkt_thread_states[THREAD_RUNNING] = libtrace->perpkt_thread_count;
1944        ASSERT_RET(pthread_spin_init(&libtrace->first_packets.lock, 0), == 0);
1945        libtrace->first_packets.packets = calloc(libtrace->perpkt_thread_count,
1946                                                 sizeof(*libtrace->first_packets.packets));
1947        if (libtrace->first_packets.packets == NULL) {
1948                trace_set_err(libtrace, errno, "trace_pstart "
1949                              "failed to allocate memory.");
1950                goto cleanup_threads;
1951        }
1952
1953        if (libtrace_ocache_init(&libtrace->packet_freelist,
1954                             (void* (*)()) trace_create_packet,
1955                             (void (*)(void *))trace_destroy_packet,
1956                             libtrace->config.thread_cache_size,
1957                             libtrace->config.cache_size * 4,
1958                             libtrace->config.fixed_count) != 0) {
1959                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "trace_pstart "
1960                              "failed to allocate ocache.");
1961                goto cleanup_threads;
1962        }
1963
1964        /* Threads don't start */
1965        libtrace->started = true;
1966        libtrace->startcount ++;
1967        libtrace_change_state(libtrace, STATE_RUNNING, false);
1968
1969        ret = 0;
1970        goto success;
1971cleanup_threads:
1972        if (libtrace->first_packets.packets) {
1973                free(libtrace->first_packets.packets);
1974                libtrace->first_packets.packets = NULL;
1975        }
1976        libtrace_change_state(libtrace, STATE_ERROR, false);
1977        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1978        if (libtrace->hasher_thread.type == THREAD_HASHER) {
1979                pthread_join(libtrace->hasher_thread.tid, NULL);
1980                libtrace_zero_thread(&libtrace->hasher_thread);
1981        }
1982
1983        if (libtrace->perpkt_threads) {
1984                for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1985                        if (libtrace->perpkt_threads[i].type == THREAD_PERPKT) {
1986                                pthread_join(libtrace->perpkt_threads[i].tid, NULL);
1987                                libtrace_zero_thread(&libtrace->perpkt_threads[i]);
1988                        } else break;
1989                }
1990                free(libtrace->perpkt_threads);
1991                libtrace->perpkt_threads = NULL;
1992        }
1993
1994        if (libtrace->reporter_thread.type == THREAD_REPORTER) {
1995                pthread_join(libtrace->reporter_thread.tid, NULL);
1996                libtrace_zero_thread(&libtrace->reporter_thread);
1997        }
1998
1999        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
2000                pthread_join(libtrace->keepalive_thread.tid, NULL);
2001                libtrace_zero_thread(&libtrace->keepalive_thread);
2002        }
2003        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
2004        libtrace_change_state(libtrace, STATE_NEW, false);
2005        if (libtrace->perpkt_thread_states[THREAD_RUNNING] != 0) {
2006                trace_set_err(libtrace, TRACE_ERR_THREAD, "Expected 0 running threads in trace_pstart()");
2007                return -1;
2008        }
2009        libtrace->perpkt_thread_states[THREAD_FINISHED] = 0;
2010cleanup_started:
2011        if (libtrace->pread == trace_pread_packet_wrapper) {
2012                if (libtrace->format->ppause_input)
2013                        libtrace->format->ppause_input(libtrace);
2014        } else {
2015                if (libtrace->format->pause_input)
2016                        libtrace->format->pause_input(libtrace);
2017        }
2018        ret = -1;
2019success:
2020        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_before, NULL), == 0);
2021cleanup_none:
2022        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2023        return ret;
2024}
2025
2026DLLEXPORT int trace_set_starting_cb(libtrace_callback_set_t *cbset,
2027                fn_cb_starting handler) {
2028        cbset->message_starting = handler;
2029        return 0;
2030}
2031
2032DLLEXPORT int trace_set_pausing_cb(libtrace_callback_set_t *cbset,
2033                fn_cb_dataless handler) {
2034        cbset->message_pausing = handler;
2035        return 0;
2036}
2037
2038DLLEXPORT int trace_set_resuming_cb(libtrace_callback_set_t *cbset,
2039                fn_cb_dataless handler) {
2040        cbset->message_resuming = handler;
2041        return 0;
2042}
2043
2044DLLEXPORT int trace_set_stopping_cb(libtrace_callback_set_t *cbset,
2045                fn_cb_dataless handler) {
2046        cbset->message_stopping = handler;
2047        return 0;
2048}
2049
2050DLLEXPORT int trace_set_packet_cb(libtrace_callback_set_t *cbset,
2051                fn_cb_packet handler) {
2052        cbset->message_packet = handler;
2053        return 0;
2054}
2055
2056DLLEXPORT int trace_set_meta_packet_cb(libtrace_callback_set_t *cbset,
2057                fn_cb_meta_packet handler) {
2058        cbset->message_meta_packet = handler;
2059        return 0;
2060}
2061
2062DLLEXPORT int trace_set_first_packet_cb(libtrace_callback_set_t *cbset,
2063                fn_cb_first_packet handler) {
2064        cbset->message_first_packet = handler;
2065        return 0;
2066}
2067
2068DLLEXPORT int trace_set_tick_count_cb(libtrace_callback_set_t *cbset,
2069                fn_cb_tick handler) {
2070        cbset->message_tick_count = handler;
2071        return 0;
2072}
2073
2074DLLEXPORT int trace_set_tick_interval_cb(libtrace_callback_set_t *cbset,
2075                fn_cb_tick handler) {
2076        cbset->message_tick_interval = handler;
2077        return 0;
2078}
2079
2080DLLEXPORT int trace_set_result_cb(libtrace_callback_set_t *cbset,
2081                fn_cb_result handler) {
2082        cbset->message_result = handler;
2083        return 0;
2084}
2085
2086DLLEXPORT int trace_set_user_message_cb(libtrace_callback_set_t *cbset,
2087                fn_cb_usermessage handler) {
2088        cbset->message_user = handler;
2089        return 0;
2090}
2091
2092/*
2093 * Pauses a trace, this should only be called by the main thread
2094 * 1. Set started = false
2095 * 2. All perpkt threads are paused waiting on a condition var
2096 * 3. Then call ppause on the underlying format if found
2097 * 4. The traces state is paused
2098 *
2099 * Once done you should be able to modify the trace setup and call pstart again
2100 * TODO add support to change the number of threads.
2101 */
2102DLLEXPORT int trace_ppause(libtrace_t *libtrace)
2103{
2104        libtrace_thread_t *t;
2105        int i;
2106        if (!libtrace) {
2107                fprintf(stderr, "NULL trace passed into trace_ppause()\n");
2108                return TRACE_ERR_NULL_TRACE;
2109        }
2110
2111        t = get_thread_table(libtrace);
2112        // Check state from within the lock if we are going to change it
2113        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
2114
2115        /* If we are already paused, just treat this as a NOOP */
2116        if (libtrace->state == STATE_PAUSED) {
2117                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2118                return 0;
2119        }
2120        if (!libtrace->started || libtrace->state != STATE_RUNNING) {
2121                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2122                trace_set_err(libtrace,TRACE_ERR_BAD_STATE, "You must call trace_start() before calling trace_ppause()");
2123                return -1;
2124        }
2125
2126        libtrace_change_state(libtrace, STATE_PAUSING, false);
2127        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2128
2129        // Special case handle the hasher thread case
2130        if (trace_has_dedicated_hasher(libtrace)) {
2131                if (libtrace->config.debug_state)
2132                        fprintf(stderr, "Hasher thread is running, asking it to pause ...");
2133                libtrace_message_t message = {0, {.uint64=0}, NULL};
2134                message.code = MESSAGE_DO_PAUSE;
2135                trace_message_thread(libtrace, &libtrace->hasher_thread, &message);
2136                // Wait for it to pause
2137                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
2138                while (libtrace->hasher_thread.state == THREAD_RUNNING) {
2139                        ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
2140                }
2141                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2142                if (libtrace->config.debug_state)
2143                        fprintf(stderr, " DONE\n");
2144        }
2145
2146        if (libtrace->config.debug_state)
2147                fprintf(stderr, "Asking perpkt threads to pause ...");
2148        // Stop threads, skip this one if it's a perpkt
2149        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2150                if (&libtrace->perpkt_threads[i] != t) {
2151                        libtrace_message_t message = {0, {.uint64=0}, NULL};
2152                        message.code = MESSAGE_DO_PAUSE;
2153                        ASSERT_RET(trace_message_thread(libtrace, &libtrace->perpkt_threads[i], &message), != -1);
2154                        if(trace_has_dedicated_hasher(libtrace)) {
2155                                // The hasher has stopped and other threads have messages waiting therefore
2156                                // If the queues are empty the other threads would have no data
2157                                // So send some message packets to simply ask the threads to check
2158                                // We are the only writer since hasher has paused
2159                                libtrace_packet_t *pkt;
2160                                libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &pkt, 1, 1);
2161                                pkt->error = READ_MESSAGE;
2162                                libtrace_ringbuffer_write(&libtrace->perpkt_threads[i].rbuffer, pkt);
2163                        }
2164                } else {
2165                        fprintf(stderr, "Mapper threads should not be used to pause a trace this could cause any number of problems!!\n");
2166                }
2167        }
2168
2169        if (t) {
2170                // A perpkt is doing the pausing, interesting, fake an extra thread paused
2171                // We rely on the user to *not* return before starting the trace again
2172                thread_change_state(libtrace, t, THREAD_PAUSED, true);
2173        }
2174
2175        // Wait for all threads to pause
2176        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
2177        while(libtrace->perpkt_thread_states[THREAD_RUNNING]) {
2178                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
2179        }
2180        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2181
2182        if (libtrace->config.debug_state)
2183                fprintf(stderr, " DONE\n");
2184
2185        // Deal with the reporter
2186        if (trace_has_reporter(libtrace)) {
2187                if (libtrace->config.debug_state)
2188                        fprintf(stderr, "Reporter thread is running, asking it to pause ...");
2189                if (pthread_equal(pthread_self(), libtrace->reporter_thread.tid)) {
2190                        libtrace->combiner.pause(libtrace, &libtrace->combiner);
2191                        thread_change_state(libtrace, &libtrace->reporter_thread, THREAD_PAUSED, true);
2192               
2193                } else {
2194                        libtrace_message_t message = {0, {.uint64=0}, NULL};
2195                        message.code = MESSAGE_DO_PAUSE;
2196                        trace_message_thread(libtrace, &libtrace->reporter_thread, &message);
2197                        // Wait for it to pause
2198                        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
2199                        while (libtrace->reporter_thread.state == THREAD_RUNNING) {
2200                                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
2201                        }
2202                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
2203                }
2204                if (libtrace->config.debug_state)
2205                        fprintf(stderr, " DONE\n");
2206        }
2207
2208        /* Cache values before we pause */
2209        if (libtrace->stats == NULL)
2210                libtrace->stats = trace_create_statistics();
2211        // Save the statistics against the trace
2212        trace_get_statistics(libtrace, NULL);
2213        if (trace_is_parallel(libtrace)) {
2214                libtrace->started = false;
2215                if (libtrace->format->ppause_input)
2216                        libtrace->format->ppause_input(libtrace);
2217                // TODO What happens if we don't have pause input??
2218        } else {
2219                int err;
2220                err = trace_pause(libtrace);
2221                // We should handle this a bit better
2222                if (err)
2223                        return err;
2224        }
2225
2226        // Only set as paused after the pause has been called on the trace
2227        libtrace_change_state(libtrace, STATE_PAUSED, true);
2228        return 0;
2229}
2230
2231/**
2232 * Stop trace finish prematurely as though it meet an EOF
2233 * This should only be called by the main thread
2234 * 1. Calls ppause
2235 * 2. Sends a message asking for threads to finish
2236 * 3. Releases threads which will pause
2237 */
2238DLLEXPORT int trace_pstop(libtrace_t *libtrace)
2239{
2240        int i, err;
2241        libtrace_message_t message = {0, {.uint64=0}, NULL};
2242        if (!libtrace) {
2243                fprintf(stderr, "NULL trace passed into trace_pstop()\n");
2244                return TRACE_ERR_NULL_TRACE;
2245        }
2246
2247        // Ensure all threads have paused and the underlying trace format has
2248        // been closed and all packets associated are cleaned up
2249        // Pause will do any state checks for us
2250        err = trace_ppause(libtrace);
2251        if (err)
2252                return err;
2253
2254        // Now send a message asking the threads to stop
2255        // This will be retrieved before trying to read another packet
2256        message.code = MESSAGE_DO_STOP;
2257        trace_message_perpkts(libtrace, &message);
2258        if (trace_has_dedicated_hasher(libtrace))
2259                trace_message_thread(libtrace, &libtrace->hasher_thread, &message);
2260
2261        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2262                trace_message_thread(libtrace, &libtrace->perpkt_threads[i], &message);
2263        }
2264
2265        /* Now release the threads and let them stop - when the threads finish
2266         * the state will be set to finished */
2267        libtrace_change_state(libtrace, STATE_FINISHING, true);
2268        return 0;
2269}
2270
2271DLLEXPORT int trace_set_hasher(libtrace_t *trace, enum hasher_types type, fn_hasher hasher, void *data) {
2272        int ret = -1;
2273        if ((type == HASHER_CUSTOM && !hasher) || (type == HASHER_BALANCE && hasher)) {
2274                return -1;
2275        }
2276
2277        // Save the requirements
2278        trace->hasher_type = type;
2279        if (hasher) {
2280                if (trace->hasher_owner == HASH_OWNED_LIBTRACE) {
2281                        if (trace->hasher_data) {
2282                                free(trace->hasher_data);
2283                        }
2284                }
2285                trace->hasher = hasher;
2286                trace->hasher_data = data;
2287                trace->hasher_owner = HASH_OWNED_EXTERNAL;
2288        } else {
2289                trace->hasher = NULL;
2290                trace->hasher_data = NULL;
2291                trace->hasher_owner = HASH_OWNED_LIBTRACE;
2292        }
2293
2294        // Try push this to hardware - NOTE hardware could do custom if
2295        // there is a more efficient way to apply it, in this case
2296        // it will simply grab the function out of libtrace_t
2297        if (trace_supports_parallel(trace) && trace->format->config_input)
2298                ret = trace->format->config_input(trace, TRACE_OPTION_HASHER, &type);
2299
2300        if (ret == -1) {
2301                libtrace_err_t err UNUSED;
2302
2303                /* We have to deal with this ourself */
2304                /* If we succeed, clear any error state otherwise our caller
2305                 * might assume an error occurred, even though we've resolved
2306                 * the issue ourselves.
2307                 */
2308                if (!hasher) {
2309                        switch (type)
2310                        {
2311                                case HASHER_CUSTOM:
2312                                case HASHER_BALANCE:
2313                                        err = trace_get_err(trace);
2314                                        return 0;
2315                                case HASHER_BIDIRECTIONAL:
2316                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
2317                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
2318                                        toeplitz_init_config(trace->hasher_data, 1);
2319                                        err = trace_get_err(trace);
2320                                        return 0;
2321                                case HASHER_UNIDIRECTIONAL:
2322                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
2323                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
2324                                        toeplitz_init_config(trace->hasher_data, 0);
2325                                        err = trace_get_err(trace);
2326                                        return 0;
2327                        }
2328                        return -1;
2329                }
2330        } else {
2331                /* If the hasher is hardware we zero out the hasher and hasher
2332                 * data fields - only if we need a hasher do we do this */
2333                trace->hasher = NULL;
2334                trace->hasher_data = NULL;
2335        }
2336
2337        return 0;
2338}
2339
2340// Waits for all threads to finish
2341DLLEXPORT void trace_join(libtrace_t *libtrace) {
2342        int i;
2343
2344        /* Firstly wait for the perpkt threads to finish, since these are
2345         * user controlled */
2346        for (i=0; i< libtrace->perpkt_thread_count; i++) {
2347                ASSERT_RET(pthread_join(libtrace->perpkt_threads[i].tid, NULL), == 0);
2348                // So we must do our best effort to empty the queue - so
2349                // the producer (or any other threads) don't block.
2350                libtrace_packet_t * packet;
2351                if (libtrace->perpkt_threads[i].state != THREAD_FINISHED) {
2352                        trace_set_err(libtrace, TRACE_ERR_THREAD_STATE,
2353                                "Expected processing thread state to be THREAD_FINISHED in trace_join()");
2354                        return;
2355                }
2356                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
2357                        if (packet) // This could be NULL iff the perpkt finishes early
2358                                trace_destroy_packet(packet);
2359        }
2360
2361        /* Now the hasher */
2362        if (trace_has_dedicated_hasher(libtrace)) {
2363                pthread_join(libtrace->hasher_thread.tid, NULL);
2364                if (libtrace->hasher_thread.state != THREAD_FINISHED) {
2365                        trace_set_err(libtrace, TRACE_ERR_THREAD_STATE,
2366                                "Expected hasher thread state to be THREAD_FINISHED in trace_join()");
2367                        return;
2368                }
2369        }
2370
2371        // Now that everything is finished nothing can be touching our
2372        // buffers so clean them up
2373        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2374                // Its possible 1 packet got added by the reporter (or 1 per any other thread) since we cleaned up
2375                // if they lost timeslice before-during a write
2376                libtrace_packet_t * packet;
2377                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
2378                        trace_destroy_packet(packet);
2379                if (trace_has_dedicated_hasher(libtrace)) {
2380                        if (!libtrace_ringbuffer_is_empty(&libtrace->perpkt_threads[i].rbuffer)) {
2381                                trace_set_err(libtrace, TRACE_ERR_THREAD,
2382                                        "Expected processing threads ring buffers to be empty in trace_join()");
2383                                return;
2384                        }
2385                        libtrace_ringbuffer_destroy(&libtrace->perpkt_threads[i].rbuffer);
2386                }
2387                // Cannot destroy vector yet, this happens with trace_destroy
2388        }
2389
2390        if (trace_has_reporter(libtrace)) {
2391                pthread_join(libtrace->reporter_thread.tid, NULL);
2392                if (libtrace->reporter_thread.state != THREAD_FINISHED) {
2393                        trace_set_err(libtrace, TRACE_ERR_THREAD_STATE,
2394                                "Expected reporting thread state to be THREAD_FINISHED in trace_join()");
2395                        return;
2396                }
2397        }
2398
2399        // Wait for the tick (keepalive) thread if it has been started
2400        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
2401                libtrace_message_t msg = {0, {.uint64=0}, NULL};
2402                msg.code = MESSAGE_DO_STOP;
2403                trace_message_thread(libtrace, &libtrace->keepalive_thread, &msg);
2404                pthread_join(libtrace->keepalive_thread.tid, NULL);
2405        }
2406
2407        libtrace_change_state(libtrace, STATE_JOINED, true);
2408        print_memory_stats();
2409}
2410
2411DLLEXPORT int libtrace_thread_get_message_count(libtrace_t * libtrace,
2412                                                libtrace_thread_t *t)
2413{
2414        int ret;
2415        if (t == NULL)
2416                t = get_thread_descriptor(libtrace);
2417        if (t == NULL)
2418                return -1;
2419        ret = libtrace_message_queue_count(&t->messages);
2420        return ret < 0 ? 0 : ret;
2421}
2422
2423DLLEXPORT int libtrace_thread_get_message(libtrace_t * libtrace,
2424                                          libtrace_thread_t *t,
2425                                          libtrace_message_t * message)
2426{
2427        int ret;
2428        if (t == NULL)
2429                t = get_thread_descriptor(libtrace);
2430        if (t == NULL)
2431                return -1;
2432        ret = libtrace_message_queue_get(&t->messages, message);
2433        return ret < 0 ? 0 : ret;
2434}
2435
2436DLLEXPORT int libtrace_thread_try_get_message(libtrace_t * libtrace,
2437                                              libtrace_thread_t *t,
2438                                              libtrace_message_t * message)
2439{
2440        if (t == NULL)
2441                t = get_thread_descriptor(libtrace);
2442        if (t == NULL)
2443                return -1;
2444        if (libtrace_message_queue_try_get(&t->messages, message) != LIBTRACE_MQ_FAILED)
2445                return 0;
2446        else
2447                return -1;
2448}
2449
2450DLLEXPORT int trace_message_thread(libtrace_t * libtrace, libtrace_thread_t *t, libtrace_message_t * message)
2451{
2452        int ret;
2453        if (!message->sender)
2454                message->sender = get_thread_descriptor(libtrace);
2455
2456        ret = libtrace_message_queue_put(&t->messages, message);
2457        return ret < 0 ? 0 : ret;
2458}
2459
2460DLLEXPORT int trace_message_reporter(libtrace_t * libtrace, libtrace_message_t * message)
2461{
2462        if (!trace_has_reporter(libtrace) ||
2463            !(libtrace->reporter_thread.state == THREAD_RUNNING
2464              || libtrace->reporter_thread.state == THREAD_PAUSED))
2465                return -1;
2466
2467        return trace_message_thread(libtrace, &libtrace->reporter_thread, message);
2468}
2469
2470DLLEXPORT int trace_post_reporter(libtrace_t *libtrace)
2471{
2472        libtrace_message_t message;
2473        memset(&message, 0, sizeof(libtrace_message_t));
2474        message.code = MESSAGE_POST_REPORTER;
2475        message.data.uint64 = 0;
2476        message.sender = NULL;
2477        return trace_message_reporter(libtrace, (void *) &message);
2478}
2479
2480DLLEXPORT int trace_message_perpkts(libtrace_t * libtrace, libtrace_message_t * message)
2481{
2482        int i;
2483        int missed = 0;
2484        if (message->sender == NULL)
2485                message->sender = get_thread_descriptor(libtrace);
2486        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2487                if (libtrace->perpkt_threads[i].state == THREAD_RUNNING ||
2488                    libtrace->perpkt_threads[i].state == THREAD_PAUSED) {
2489                        libtrace_message_queue_put(&libtrace->perpkt_threads[i].messages, message);
2490                } else {
2491                        missed += 1;
2492                }
2493        }
2494        return -missed;
2495}
2496
2497/**
2498 * Publishes a result to the reduce queue
2499 * Should only be called by a perpkt thread, i.e. from a perpkt handler
2500 */
2501DLLEXPORT void trace_publish_result(libtrace_t *libtrace, libtrace_thread_t *t, uint64_t key, libtrace_generic_t value, int type) {
2502        libtrace_result_t res;
2503        res.type = type;
2504        res.key = key;
2505        res.value = value;
2506        if (!libtrace->combiner.publish) {
2507                fprintf(stderr, "Combiner has no publish method -- can not publish results!\n");
2508                return;
2509        }
2510        libtrace->combiner.publish(libtrace, t->perpkt_num, &libtrace->combiner, &res);
2511        return;
2512}
2513
2514DLLEXPORT void trace_set_combiner(libtrace_t *trace, const libtrace_combine_t *combiner, libtrace_generic_t config){
2515        if (combiner) {
2516                trace->combiner = *combiner;
2517                trace->combiner.configuration = config;
2518        } else {
2519                // No combiner, so don't try use it
2520                memset(&trace->combiner, 0, sizeof(trace->combiner));
2521        }
2522}
2523
2524DLLEXPORT uint64_t trace_packet_get_order(libtrace_packet_t * packet) {
2525        return packet->order;
2526}
2527
2528DLLEXPORT uint64_t trace_packet_get_hash(libtrace_packet_t * packet) {
2529        return packet->hash;
2530}
2531
2532DLLEXPORT void trace_packet_set_order(libtrace_packet_t * packet, uint64_t order) {
2533        packet->order = order;
2534}
2535
2536DLLEXPORT void trace_packet_set_hash(libtrace_packet_t * packet, uint64_t hash) {
2537        packet->hash = hash;
2538}
2539
2540DLLEXPORT bool trace_has_finished(libtrace_t * libtrace) {
2541        return libtrace->state == STATE_FINISHED || libtrace->state == STATE_JOINED;
2542}
2543
2544/**
2545 * @return True if the trace is not running such that it can be configured
2546 */
2547static inline bool trace_is_configurable(libtrace_t *trace) {
2548        return trace->state == STATE_NEW ||
2549                        trace->state == STATE_PAUSED;
2550}
2551
2552DLLEXPORT int trace_set_perpkt_threads(libtrace_t *trace, int nb) {
2553        // Only supported on new traces not paused traces
2554        if (trace->state != STATE_NEW) return -1;
2555
2556        /* TODO consider allowing an offset from the total number of cores i.e.
2557         * -1 reserve 1 core */
2558        if (nb >= 0) {
2559                trace->config.perpkt_threads = nb;
2560                return 0;
2561        } else {
2562                return -1;
2563        }
2564}
2565
2566DLLEXPORT int trace_set_tick_interval(libtrace_t *trace, size_t millisec) {
2567        if (!trace_is_configurable(trace)) return -1;
2568
2569        trace->config.tick_interval = millisec;
2570        return 0;
2571}
2572
2573DLLEXPORT int trace_set_tick_count(libtrace_t *trace, size_t count) {
2574        if (!trace_is_configurable(trace)) return -1;
2575
2576        trace->config.tick_count = count;
2577        return 0;
2578}
2579
2580DLLEXPORT int trace_set_tracetime(libtrace_t *trace, bool tracetime) {
2581        if (!trace_is_configurable(trace)) return -1;
2582
2583        trace->tracetime = tracetime;
2584        return 0;
2585}
2586
2587DLLEXPORT int trace_set_cache_size(libtrace_t *trace, size_t size) {
2588        if (!trace_is_configurable(trace)) return -1;
2589
2590        trace->config.cache_size = size;
2591        return 0;
2592}
2593
2594DLLEXPORT int trace_set_thread_cache_size(libtrace_t *trace, size_t size) {
2595        if (!trace_is_configurable(trace)) return -1;
2596
2597        trace->config.thread_cache_size = size;
2598        return 0;
2599}
2600
2601DLLEXPORT int trace_set_fixed_count(libtrace_t *trace, bool fixed) {
2602        if (!trace_is_configurable(trace)) return -1;
2603
2604        trace->config.fixed_count = fixed;
2605        return 0;
2606}
2607
2608DLLEXPORT int trace_set_burst_size(libtrace_t *trace, size_t size) {
2609        if (!trace_is_configurable(trace)) return -1;
2610
2611        trace->config.burst_size = size;
2612        return 0;
2613}
2614
2615DLLEXPORT int trace_set_hasher_queue_size(libtrace_t *trace, size_t size) {
2616        if (!trace_is_configurable(trace)) return -1;
2617
2618        trace->config.hasher_queue_size = size;
2619        return 0;
2620}
2621
2622DLLEXPORT int trace_set_hasher_polling(libtrace_t *trace, bool polling) {
2623        if (!trace_is_configurable(trace)) return -1;
2624
2625        trace->config.hasher_polling = polling;
2626        return 0;
2627}
2628
2629DLLEXPORT int trace_set_reporter_polling(libtrace_t *trace, bool polling) {
2630        if (!trace_is_configurable(trace)) return -1;
2631
2632        trace->config.reporter_polling = polling;
2633        return 0;
2634}
2635
2636DLLEXPORT int trace_set_reporter_thold(libtrace_t *trace, size_t thold) {
2637        if (!trace_is_configurable(trace)) return -1;
2638
2639        trace->config.reporter_thold = thold;
2640        return 0;
2641}
2642
2643DLLEXPORT int trace_set_debug_state(libtrace_t *trace, bool debug_state) {
2644        if (!trace_is_configurable(trace)) return -1;
2645
2646        trace->config.debug_state = debug_state;
2647        return 0;
2648}
2649
2650static bool config_bool_parse(char *value, size_t nvalue) {
2651        if (strncmp(value, "true", nvalue) == 0)
2652                return true;
2653        else if (strncmp(value, "false", nvalue) == 0)
2654                return false;
2655        else
2656                return strtoll(value, NULL, 10) != 0;
2657}
2658
2659/* Note update documentation on trace_set_configuration */
2660static void config_string(struct user_configuration *uc, char *key, size_t nkey, char *value, size_t nvalue) {
2661        if (!key) {
2662                fprintf(stderr, "NULL key passed to config_string()\n");
2663                return;
2664        }
2665        if (!value) {
2666                fprintf(stderr, "NULL value passed to config_string()\n");
2667                return;
2668        }
2669        if (!uc) {
2670                fprintf(stderr, "NULL uc (user_configuration) passed to config_string()\n");
2671                return;
2672        }
2673        if (strncmp(key, "cache_size", nkey) == 0
2674            || strncmp(key, "cs", nkey) == 0) {
2675                uc->cache_size = strtoll(value, NULL, 10);
2676        } else if (strncmp(key, "thread_cache_size", nkey) == 0
2677                   || strncmp(key, "tcs", nkey) == 0) {
2678                uc->thread_cache_size = strtoll(value, NULL, 10);
2679        } else if (strncmp(key, "fixed_count", nkey) == 0
2680                   || strncmp(key, "fc", nkey) == 0) {
2681                uc->fixed_count = config_bool_parse(value, nvalue);
2682        } else if (strncmp(key, "burst_size", nkey) == 0
2683                   || strncmp(key, "bs", nkey) == 0) {
2684                uc->burst_size = strtoll(value, NULL, 10);
2685        } else if (strncmp(key, "tick_interval", nkey) == 0
2686                   || strncmp(key, "ti", nkey) == 0) {
2687                uc->tick_interval = strtoll(value, NULL, 10);
2688        } else if (strncmp(key, "tick_count", nkey) == 0
2689                   || strncmp(key, "tc", nkey) == 0) {
2690                uc->tick_count = strtoll(value, NULL, 10);
2691        } else if (strncmp(key, "perpkt_threads", nkey) == 0
2692                   || strncmp(key, "pt", nkey) == 0) {
2693                uc->perpkt_threads = strtoll(value, NULL, 10);
2694        } else if (strncmp(key, "hasher_queue_size", nkey) == 0
2695                   || strncmp(key, "hqs", nkey) == 0) {
2696                uc->hasher_queue_size = strtoll(value, NULL, 10);
2697        } else if (strncmp(key, "hasher_polling", nkey) == 0
2698                   || strncmp(key, "hp", nkey) == 0) {
2699                uc->hasher_polling = config_bool_parse(value, nvalue);
2700        } else if (strncmp(key, "reporter_polling", nkey) == 0
2701                   || strncmp(key, "rp", nkey) == 0) {
2702                uc->reporter_polling = config_bool_parse(value, nvalue);
2703        } else if (strncmp(key, "reporter_thold", nkey) == 0
2704                   || strncmp(key, "rt", nkey) == 0) {
2705                uc->reporter_thold = strtoll(value, NULL, 10);
2706        } else if (strncmp(key, "debug_state", nkey) == 0
2707                   || strncmp(key, "ds", nkey) == 0) {
2708                uc->debug_state = config_bool_parse(value, nvalue);
2709        } else {
2710                fprintf(stderr, "No matching option %s(=%s), ignoring\n", key, value);
2711        }
2712}
2713
2714DLLEXPORT int trace_set_configuration(libtrace_t *trace, const char *str) {
2715        char *pch;
2716        char key[100];
2717        char value[100];
2718        char *dup;
2719        if (!trace) {
2720                fprintf(stderr, "NULL trace passed into trace_set_configuration()\n");
2721                return TRACE_ERR_NULL_TRACE;
2722        }
2723        if (!str) {
2724                trace_set_err(trace, TRACE_ERR_CONFIG, "NULL configuration string passed to trace_set_configuration()");
2725                return -1;
2726        }
2727
2728        if (!trace_is_configurable(trace)) return -1;
2729
2730        dup = strdup(str);
2731        pch = strtok (dup," ,.-");
2732        while (pch != NULL)
2733        {
2734                if (sscanf(pch, "%99[^=]=%99s", key, value) == 2) {
2735                        config_string(&trace->config, key, sizeof(key), value, sizeof(value));
2736                } else {
2737                        fprintf(stderr, "Error: parsing option %s\n", pch);
2738                }
2739                pch = strtok (NULL," ,.-");
2740        }
2741        free(dup);
2742
2743        return 0;
2744}
2745
2746DLLEXPORT int trace_set_configuration_file(libtrace_t *trace, FILE *file) {
2747        char line[1024];
2748        if (!trace_is_configurable(trace)) return -1;
2749
2750        while (fgets(line, sizeof(line), file) != NULL)
2751        {
2752                trace_set_configuration(trace, line);
2753        }
2754
2755        if(ferror(file))
2756                return -1;
2757        else
2758                return 0;
2759}
2760
2761DLLEXPORT void trace_free_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
2762        if (!packet) {
2763                trace_set_err(libtrace, TRACE_ERR_NULL_PACKET,
2764                        "NULL packet passed to trace_free_packet()");
2765                return;
2766        }
2767        /* Always release any resources this might be holding */
2768        trace_fin_packet(packet);
2769        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packet, 1, 1);
2770}
2771
2772DLLEXPORT void trace_increment_packet_refcount(libtrace_packet_t *packet) {
2773        pthread_mutex_lock(&(packet->ref_lock));
2774        if (packet->refcount < 0) {
2775                packet->refcount = 1;
2776        } else {
2777                packet->refcount ++;
2778        }
2779        pthread_mutex_unlock(&(packet->ref_lock));
2780}
2781
2782DLLEXPORT void trace_decrement_packet_refcount(libtrace_packet_t *packet) {
2783        pthread_mutex_lock(&(packet->ref_lock));
2784        packet->refcount --;
2785
2786        if (packet->refcount <= 0) {
2787                trace_free_packet(packet->trace, packet);
2788        }
2789        pthread_mutex_unlock(&(packet->ref_lock));
2790}
2791
2792
2793DLLEXPORT libtrace_info_t *trace_get_information(libtrace_t * libtrace) {
2794        if (libtrace->format)
2795                return &libtrace->format->info;
2796        else
2797                pthread_exit(NULL);
2798}
Note: See TracBrowser for help on using the repository browser.