source: lib/trace_parallel.c @ 1407294

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 1407294 was 1407294, checked in by Richard Sanger <rsangerarj@…>, 6 years ago

Remove HASHER_HARDWARE and doc/code tidies

We don't want to expose this option to the user as it was only used internally.
As it happens we can completely remove it if needed.

Remove error handling from start thread and fix gcc warning in start thread.

  • Property mode set to 100644
File size: 73.3 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * All rights reserved.
8 *
9 * This code has been developed by the University of Waikato WAND
10 * research group. For further information please see http://www.wand.net.nz/
11 *
12 * libtrace is free software; you can redistribute it and/or modify
13 * it under the terms of the GNU General Public License as published by
14 * the Free Software Foundation; either version 2 of the License, or
15 * (at your option) any later version.
16 *
17 * libtrace is distributed in the hope that it will be useful,
18 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 * GNU General Public License for more details.
21 *
22 * You should have received a copy of the GNU General Public License
23 * along with libtrace; if not, write to the Free Software
24 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25 *
26 * $Id$
27 *
28 */
29
30
31#define _GNU_SOURCE
32#include "common.h"
33#include "config.h"
34#include <assert.h>
35#include <errno.h>
36#include <fcntl.h>
37#include <stdio.h>
38#include <stdlib.h>
39#include <string.h>
40#include <sys/stat.h>
41#include <sys/types.h>
42#ifndef WIN32
43#include <sys/socket.h>
44#endif
45#include <stdarg.h>
46#include <sys/param.h>
47
48#ifdef HAVE_LIMITS_H
49#  include <limits.h>
50#endif
51
52#ifdef HAVE_SYS_LIMITS_H
53#  include <sys/limits.h>
54#endif
55
56#ifdef HAVE_NET_IF_ARP_H
57#  include <net/if_arp.h>
58#endif
59
60#ifdef HAVE_NET_IF_H
61#  include <net/if.h>
62#endif
63
64#ifdef HAVE_NETINET_IN_H
65#  include <netinet/in.h>
66#endif
67
68#ifdef HAVE_NET_ETHERNET_H
69#  include <net/ethernet.h>
70#endif
71
72#ifdef HAVE_NETINET_IF_ETHER_H
73#  include <netinet/if_ether.h>
74#endif
75
76#include <time.h>
77#ifdef WIN32
78#include <sys/timeb.h>
79#endif
80
81#include "libtrace.h"
82#include "libtrace_int.h"
83
84#ifdef HAVE_PCAP_BPF_H
85#  include <pcap-bpf.h>
86#else
87#  ifdef HAVE_NET_BPF_H
88#    include <net/bpf.h>
89#  endif
90#endif
91
92
93#include "libtrace_int.h"
94#include "format_helper.h"
95#include "rt_protocol.h"
96#include "hash_toeplitz.h"
97#include "combiners.h"
98
99#include <pthread.h>
100#include <signal.h>
101#include <unistd.h>
102#include <ctype.h>
103
104static inline int delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t);
105extern int libtrace_parallel;
106
107struct mem_stats {
108        struct memfail {
109           uint64_t cache_hit;
110           uint64_t ring_hit;
111           uint64_t miss;
112           uint64_t recycled;
113        } readbulk, read, write, writebulk;
114};
115
116// Grrr gcc wants this spelt out
117__thread struct mem_stats mem_hits = {{0},{0},{0},{0}};
118
119static void print_memory_stats() {
120#if 0
121        char t_name[50];
122        uint64_t total;
123        pthread_getname_np(pthread_self(), t_name, sizeof(t_name));
124
125        fprintf(stderr, "Thread ID#%d - %s\n", (int) pthread_self(), t_name);
126
127        total = mem_hits.read.cache_hit + mem_hits.read.ring_hit + mem_hits.read.miss;
128        if (total) {
129                fprintf(stderr, "\tRead:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
130                                mem_hits.read.cache_hit, mem_hits.read.ring_hit, mem_hits.read.miss, mem_hits.read.recycled);
131                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
132                                total, (double) mem_hits.read.miss / (double) total * 100.0);
133        }
134
135        total = mem_hits.readbulk.cache_hit + mem_hits.readbulk.ring_hit + mem_hits.readbulk.miss;
136        if (total) {
137                fprintf(stderr, "\tReadbulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
138                                mem_hits.readbulk.cache_hit, mem_hits.readbulk.ring_hit, mem_hits.readbulk.miss, mem_hits.readbulk.recycled);
139
140
141                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
142                                total, (double) mem_hits.readbulk.miss / (double) total * 100.0);
143        }
144
145        total = mem_hits.write.cache_hit + mem_hits.write.ring_hit + mem_hits.write.miss;
146        if (total) {
147                fprintf(stderr, "\tWrite:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
148                                mem_hits.write.cache_hit, mem_hits.write.ring_hit, mem_hits.write.miss, mem_hits.write.recycled);
149
150                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
151                                total, (double) mem_hits.write.miss / (double) total * 100.0);
152        }
153
154        total = mem_hits.writebulk.cache_hit + mem_hits.writebulk.ring_hit + mem_hits.writebulk.miss;
155        if (total) {
156                fprintf(stderr, "\tWritebulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
157                                mem_hits.writebulk.cache_hit, mem_hits.writebulk.ring_hit, mem_hits.writebulk.miss, mem_hits.writebulk.recycled);
158
159                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
160                                total, (double) mem_hits.writebulk.miss / (double) total * 100.0);
161        }
162#endif
163}
164
165/*
166 * This can be used once the hasher thread has been started and internally after
167 * verfiy_configuration.
168 */
169DLLEXPORT bool trace_has_dedicated_hasher(libtrace_t * libtrace)
170{
171        return libtrace->hasher_thread.type == THREAD_HASHER;
172}
173
174DLLEXPORT bool trace_has_reporter(libtrace_t * libtrace)
175{
176        assert(libtrace->state != STATE_NEW);
177        return libtrace->reporter_thread.type == THREAD_REPORTER && libtrace->reporter;
178}
179
180/**
181 * When running the number of perpkt threads in use.
182 * TODO what if the trace is not running yet, or has finished??
183 *
184 * @brief libtrace_perpkt_thread_nb
185 * @param t The trace
186 * @return
187 */
188DLLEXPORT int libtrace_get_perpkt_count(libtrace_t * t) {
189        return t->perpkt_thread_count;
190}
191
192/**
193 * Changes the overall traces state and signals the condition.
194 *
195 * @param trace A pointer to the trace
196 * @param new_state The new state of the trace
197 * @param need_lock Set to true if libtrace_lock is not held, otherwise
198 *        false in the case the lock is currently held by this thread.
199 */
200static inline void libtrace_change_state(libtrace_t *trace,
201        const enum trace_state new_state, const bool need_lock)
202{
203        UNUSED enum trace_state prev_state;
204        if (need_lock)
205                pthread_mutex_lock(&trace->libtrace_lock);
206        prev_state = trace->state;
207        trace->state = new_state;
208
209        if (trace->config.debug_state)
210                fprintf(stderr, "Trace(%s) state changed from %s to %s\n",
211                        trace->uridata, get_trace_state_name(prev_state),
212                        get_trace_state_name(trace->state));
213
214        pthread_cond_broadcast(&trace->perpkt_cond);
215        if (need_lock)
216                pthread_mutex_unlock(&trace->libtrace_lock);
217}
218
219/**
220 * Changes a thread's state and broadcasts the condition variable. This
221 * should always be done when the lock is held.
222 *
223 * Additionally for perpkt threads the state counts are updated.
224 *
225 * @param trace A pointer to the trace
226 * @param t A pointer to the thread to modify
227 * @param new_state The new state of the thread
228 * @param need_lock Set to true if libtrace_lock is not held, otherwise
229 *        false in the case the lock is currently held by this thread.
230 */
231static inline void thread_change_state(libtrace_t *trace, libtrace_thread_t *t,
232        const enum thread_states new_state, const bool need_lock)
233{
234        enum thread_states prev_state;
235        if (need_lock)
236                pthread_mutex_lock(&trace->libtrace_lock);
237        prev_state = t->state;
238        t->state = new_state;
239        if (t->type == THREAD_PERPKT) {
240                --trace->perpkt_thread_states[prev_state];
241                ++trace->perpkt_thread_states[new_state];
242        }
243
244        if (trace->config.debug_state)
245                fprintf(stderr, "Thread %d state changed from %d to %d\n",
246                        (int) t->tid, prev_state, t->state);
247
248        if (trace->perpkt_thread_states[THREAD_FINISHED] == trace->perpkt_thread_count)
249                libtrace_change_state(trace, STATE_FINSHED, false);
250
251        pthread_cond_broadcast(&trace->perpkt_cond);
252        if (need_lock)
253                pthread_mutex_unlock(&trace->libtrace_lock);
254}
255
256/**
257 * This is valid once a trace is initialised
258 *
259 * @return True if the format supports parallel threads.
260 */
261static inline bool trace_supports_parallel(libtrace_t *trace)
262{
263        assert(trace);
264        assert(trace->format);
265        if (trace->format->pstart_input)
266                return true;
267        else
268                return false;
269}
270
271void libtrace_zero_thread(libtrace_thread_t * t) {
272        t->accepted_packets = 0;
273        t->filtered_packets = 0;
274        t->recorded_first = false;
275        t->tracetime_offset_usec = 0;
276        t->user_data = 0;
277        t->format_data = 0;
278        libtrace_zero_ringbuffer(&t->rbuffer);
279        t->trace = NULL;
280        t->ret = NULL;
281        t->type = THREAD_EMPTY;
282        t->perpkt_num = -1;
283}
284
285// Ints are aligned int is atomic so safe to read and write at same time
286// However write must be locked, read doesn't (We never try read before written to table)
287libtrace_thread_t * get_thread_table(libtrace_t *libtrace) {
288        int i = 0;
289        pthread_t tid = pthread_self();
290
291        for (;i<libtrace->perpkt_thread_count ;++i) {
292                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
293                        return &libtrace->perpkt_threads[i];
294        }
295        return NULL;
296}
297
298int get_thread_table_num(libtrace_t *libtrace) {
299        int i = 0;
300        pthread_t tid = pthread_self();
301        for (;i<libtrace->perpkt_thread_count; ++i) {
302                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
303                        return i;
304        }
305        return -1;
306}
307
308static libtrace_thread_t * get_thread_descriptor(libtrace_t *libtrace) {
309        libtrace_thread_t *ret;
310        if (!(ret = get_thread_table(libtrace))) {
311                pthread_t tid = pthread_self();
312                // Check if we are reporter or something else
313                if (pthread_equal(tid, libtrace->reporter_thread.tid))
314                        ret = &libtrace->reporter_thread;
315                else if (pthread_equal(tid, libtrace->hasher_thread.tid))
316                        ret = &libtrace->hasher_thread;
317                else
318                        ret = NULL;
319        }
320        return ret;
321}
322
323DLLEXPORT void libtrace_make_packet_safe(libtrace_packet_t *pkt) {
324        // Duplicate the packet in standard malloc'd memory and free the
325        // original, This is a 1:1 exchange so is ocache count remains unchanged.
326        if (pkt->buf_control != TRACE_CTRL_PACKET) {
327                libtrace_packet_t *dup;
328                dup = trace_copy_packet(pkt);
329                /* Release the external buffer */
330                trace_fin_packet(pkt);
331                /* Copy the duplicated packet over the existing */
332                memcpy(pkt, dup, sizeof(libtrace_packet_t));
333        }
334}
335
336/**
337 * Makes a libtrace_result_t safe, used when pausing a trace.
338 * This will call libtrace_make_packet_safe if the result is
339 * a packet.
340 */
341DLLEXPORT void libtrace_make_result_safe(libtrace_result_t *res) {
342        if (res->type == RESULT_PACKET) {
343                libtrace_make_packet_safe(res->value.pkt);
344        }
345}
346
347/**
348 * Holds threads in a paused state, until released by broadcasting
349 * the condition mutex.
350 */
351static void trace_thread_pause(libtrace_t *trace, libtrace_thread_t *t) {
352        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
353        thread_change_state(trace, t, THREAD_PAUSED, false);
354        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
355                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
356        }
357        thread_change_state(trace, t, THREAD_RUNNING, false);
358        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
359}
360
361/**
362 * Sends a packet to the user, expects either a valid packet or a TICK packet.
363 *
364 * @param trace The trace
365 * @param t The current thread
366 * @param packet A pointer to the packet storage, which may be set to null upon
367 *               return, or a packet to be finished.
368 * @param tracetime If true packets are delayed to match with tracetime
369 * @return 0 is successful, otherwise if playing back in tracetime
370 *         READ_MESSAGE(-2) can be returned in which case the packet is not sent.
371 *
372 * @note READ_MESSAGE will only be returned if tracetime is true.
373 */
374static inline int dispatch_packet(libtrace_t *trace,
375                                  libtrace_thread_t *t,
376                                  libtrace_packet_t **packet,
377                                  bool tracetime) {
378
379        if ((*packet)->error > 0) {
380                if (tracetime) {
381                        if (delay_tracetime(trace, packet[0], t) == READ_MESSAGE)
382                                return READ_MESSAGE;
383                }
384                t->accepted_packets++;
385                libtrace_generic_t data = {.pkt = *packet};
386                *packet = (*trace->per_pkt)(trace, t, MESSAGE_PACKET, data, t);
387                trace_fin_packet(*packet);
388        } else {
389                assert((*packet)->error == READ_TICK);
390                libtrace_generic_t data = {.uint64 = trace_packet_get_order(*packet)};
391                (*trace->per_pkt)(trace, t, MESSAGE_TICK_COUNT, data, t);
392        }
393        return 0;
394}
395
396/**
397 * Sends a batch of packets to the user, expects either a valid packet or a
398 * TICK packet.
399 *
400 * @param trace The trace
401 * @param t The current thread
402 * @param packets [in,out] An array of packets, these may be null upon return
403 * @param nb_packets The total number of packets in the list
404 * @param empty [in,out] A pointer to an integer storing the first empty slot,
405 * upon return this is updated
406 * @param offset [in,out] The offset into the array, upon return this is updated
407 * @param tracetime If true packets are delayed to match with tracetime
408 * @return 0 is successful, otherwise if playing back in tracetime
409 *         READ_MESSAGE(-2) can be returned in which case the packet is not sent.
410 *
411 * @note READ_MESSAGE will only be returned if tracetime is true.
412 */
413static inline int dispatch_packets(libtrace_t *trace,
414                                  libtrace_thread_t *t,
415                                  libtrace_packet_t *packets[],
416                                  int nb_packets, int *empty, int *offset,
417                                  bool tracetime) {
418        for (;*offset < nb_packets; ++*offset) {
419                int ret;
420                ret = dispatch_packet(trace, t, &packets[*offset], tracetime);
421                if (ret == 0) {
422                        /* Move full slots to front as we go */
423                        if (packets[*offset]) {
424                                if (*empty != *offset) {
425                                        packets[*empty] = packets[*offset];
426                                        packets[*offset] = NULL;
427                                }
428                                ++*empty;
429                        }
430                } else {
431                        /* Break early */
432                        assert(ret == READ_MESSAGE);
433                        return READ_MESSAGE;
434                }
435        }
436
437        return 0;
438}
439
440/**
441 * Pauses a per packet thread, messages will not be processed when the thread
442 * is paused.
443 *
444 * This process involves reading packets if a hasher thread is used. As such
445 * this function can fail to pause due to errors when reading in which case
446 * the thread should be stopped instead.
447 *
448 *
449 * @brief trace_perpkt_thread_pause
450 * @return READ_ERROR(-1) or READ_EOF(0) or 1 if successfull
451 */
452static int trace_perpkt_thread_pause(libtrace_t *trace, libtrace_thread_t *t,
453                                     libtrace_packet_t *packets[],
454                                     int nb_packets, int *empty, int *offset) {
455        libtrace_packet_t * packet = NULL;
456
457        /* Let the user thread know we are going to pause */
458        (*trace->per_pkt)(trace, t, MESSAGE_PAUSING, (libtrace_generic_t){0}, t);
459
460        /* Send through any remaining packets (or messages) without delay */
461
462        /* First send those packets already read, as fast as possible
463         * This should never fail or check for messages etc. */
464        ASSERT_RET(dispatch_packets(trace, t, packets, nb_packets, empty,
465                                    offset, false), == 0);
466
467        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
468        /* If a hasher thread is running, empty input queues so we don't lose data */
469        if (trace_has_dedicated_hasher(trace)) {
470                fprintf(stderr, "Trace is using a hasher thread emptying queues\n");
471                // The hasher has stopped by this point, so the queue shouldn't be filling
472                while(!libtrace_ringbuffer_is_empty(&t->rbuffer) || t->format_data) {
473                        int ret = trace->pread(trace, t, &packet, 1);
474                        if (ret == 1) {
475                                if (packet->error > 0) {
476                                        store_first_packet(trace, packet, t);
477                                }
478                                ASSERT_RET(dispatch_packet(trace, t, &packet, false), == 0);
479                                if (packet == NULL)
480                                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
481                        } else if (ret != READ_MESSAGE) {
482                                /* Ignore messages we pick these up next loop */
483                                assert (ret == READ_EOF || ret == READ_ERROR);
484                                /* Verify no packets are remaining */
485                                /* TODO refactor this sanity check out!! */
486                                while (!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
487                                        ASSERT_RET(trace->pread(trace, t, &packet, 1), <= 0);
488                                        // No packets after this should have any data in them
489                                        assert(packet->error <= 0);
490                                }
491                                fprintf(stderr, "PREAD_FAILED %d\n", ret);
492                                libtrace_ocache_free(&trace->packet_freelist, (void **) &packet, 1, 1);
493                                return -1;
494                        }
495                }
496        }
497        libtrace_ocache_free(&trace->packet_freelist, (void **) &packet, 1, 1);
498
499        /* Now we do the actual pause, this returns when we resumed */
500        trace_thread_pause(trace, t);
501        (*trace->per_pkt)(trace, t, MESSAGE_RESUMING, (libtrace_generic_t){0}, t);
502        return 1;
503}
504
505/**
506 * The is the entry point for our packet processing threads.
507 */
508static void* perpkt_threads_entry(void *data) {
509        libtrace_t *trace = (libtrace_t *)data;
510        libtrace_thread_t *t;
511        libtrace_message_t message = {0};
512        libtrace_packet_t *packets[trace->config.burst_size];
513        size_t i;
514        //int ret;
515        /* The current reading position into the packets */
516        int offset = 0;
517        /* The number of packets last read */
518        int nb_packets = 0;
519        /* The offset to the first NULL packet upto offset */
520        int empty = 0;
521
522        /* Wait until trace_pstart has been completed */
523        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
524        t = get_thread_table(trace);
525        assert(t);
526        if (trace->state == STATE_ERROR) {
527                thread_change_state(trace, t, THREAD_FINISHED, false);
528                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
529                pthread_exit(NULL);
530        }
531        //printf("Yay Started perpkt thread #%d\n", (int) get_thread_table_num(trace));
532        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
533
534        if (trace->format->pregister_thread) {
535                trace->format->pregister_thread(trace, t, !trace_has_dedicated_hasher(trace));
536        }
537
538        /* Fill our buffer with empty packets */
539        memset(&packets, 0, sizeof(void*) * trace->config.burst_size);
540        libtrace_ocache_alloc(&trace->packet_freelist, (void **) packets,
541                              trace->config.burst_size,
542                              trace->config.burst_size);
543
544        /* ~~~~~~~~~~~ Setup complete now we loop ~~~~~~~~~~~~~~~ */
545
546        /* Let the per_packet function know we have started */
547        (*trace->per_pkt)(trace, t, MESSAGE_STARTING, (libtrace_generic_t){0}, t);
548        (*trace->per_pkt)(trace, t, MESSAGE_RESUMING, (libtrace_generic_t){0}, t);
549
550        for (;;) {
551
552                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
553                        int ret;
554                        switch (message.code) {
555                                case MESSAGE_DO_PAUSE: // This is internal
556                                        ret = trace_perpkt_thread_pause(trace, t,
557                                              packets, nb_packets, &empty, &offset);
558                                        if (ret == READ_EOF) {
559                                                goto eof;
560                                        } else if (ret == READ_ERROR) {
561                                                goto error;
562                                        }
563                                        assert(ret == 1);
564                                        continue;
565                                case MESSAGE_DO_STOP: // This is internal
566                                        fprintf(stderr, "DO_STOP stop!!\n");
567                                        goto eof;
568                        }
569                        (*trace->per_pkt)(trace, t, message.code, message.data, message.sender);
570                        /* Continue and the empty messages out before packets */
571                        continue;
572                }
573
574
575                /* Do we need to read a new set of packets MOST LIKELY we do */
576                if (offset == nb_packets) {
577                        /* Refill the packet buffer */
578                        if (empty != nb_packets) {
579                                // Refill the empty packets
580                                libtrace_ocache_alloc(&trace->packet_freelist,
581                                                      (void **) &packets[empty],
582                                                      nb_packets - empty,
583                                                      nb_packets - empty);
584                        }
585                        if (!trace->pread) {
586                                assert(packets[0]);
587                                nb_packets = trace_read_packet(trace, packets[0]);
588                                packets[0]->error = nb_packets;
589                                if (nb_packets > 0)
590                                        nb_packets = 1;
591                        } else {
592                                nb_packets = trace->pread(trace, t, packets, trace->config.burst_size);
593                        }
594                        offset = 0;
595                        empty = 0;
596                }
597
598                /* Handle error/message cases */
599                if (nb_packets > 0) {
600                        /* Store the first packet */
601                        if (packets[0]->error > 0) {
602                                store_first_packet(trace, packets[0], t);
603                        }
604                        dispatch_packets(trace, t, packets, nb_packets, &empty,
605                                         &offset, trace->tracetime);
606                } else {
607                        switch (nb_packets) {
608                        case READ_EOF:
609                                goto eof;
610                        case READ_ERROR:
611                                goto error;
612                        case READ_MESSAGE:
613                                nb_packets = 0;
614                                continue;
615                        default:
616                                fprintf(stderr, "Unexpected error %d!!\n", nb_packets);
617                                goto error;
618                        }
619                }
620
621        }
622
623error:
624        message.code = MESSAGE_DO_STOP;
625        message.sender = t;
626        message.data.uint64 = 0;
627        trace_message_perpkts(trace, &message);
628eof:
629        /* ~~~~~~~~~~~~~~ Trace is finished do tear down ~~~~~~~~~~~~~~~~~~~~~ */
630
631        // Let the per_packet function know we have stopped
632        (*trace->per_pkt)(trace, t, MESSAGE_PAUSING, (libtrace_generic_t){0}, t);
633        (*trace->per_pkt)(trace, t, MESSAGE_STOPPING, (libtrace_generic_t){0}, t);
634
635        // Free any remaining packets
636        for (i = 0; i < trace->config.burst_size; i++) {
637                if (packets[i]) {
638                        libtrace_ocache_free(&trace->packet_freelist, (void **) &packets[i], 1, 1);
639                        packets[i] = NULL;
640                }
641        }
642
643        thread_change_state(trace, t, THREAD_FINISHED, true);
644
645        /* Make sure the reporter sees we have finished */
646        if (trace_has_reporter(trace))
647                trace_post_reporter(trace);
648
649        // Release all ocache memory before unregistering with the format
650        // because this might(it does in DPDK) unlink the formats mempool
651        // causing destroy/finish packet to fail.
652        libtrace_ocache_unregister_thread(&trace->packet_freelist);
653        if (trace->format->punregister_thread) {
654                trace->format->punregister_thread(trace, t);
655        }
656        print_memory_stats();
657
658        pthread_exit(NULL);
659}
660
661/**
662 * The start point for our single threaded hasher thread, this will read
663 * and hash a packet from a data source and queue it against the correct
664 * core to process it.
665 */
666static void* hasher_entry(void *data) {
667        libtrace_t *trace = (libtrace_t *)data;
668        libtrace_thread_t * t;
669        int i;
670        libtrace_packet_t * packet;
671        libtrace_message_t message = {0};
672        int pkt_skipped = 0;
673
674        assert(trace_has_dedicated_hasher(trace));
675        /* Wait until all threads are started and objects are initialised (ring buffers) */
676        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
677        t = &trace->hasher_thread;
678        assert(t->type == THREAD_HASHER && pthread_equal(pthread_self(), t->tid));
679        if (trace->state == STATE_ERROR) {
680                thread_change_state(trace, t, THREAD_FINISHED, false);
681                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
682                pthread_exit(NULL);
683        }
684
685        printf("Hasher Thread started\n");
686        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
687
688        /* We are reading but it is not the parallel API */
689        if (trace->format->pregister_thread) {
690                trace->format->pregister_thread(trace, t, true);
691        }
692
693        /* Read all packets in then hash and queue against the correct thread */
694        while (1) {
695                int thread;
696                if (!pkt_skipped)
697                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
698                assert(packet);
699
700                if (libtrace_halt) {
701                        packet->error = 0;
702                        break;
703                }
704
705                // Check for messages that we expect MESSAGE_DO_PAUSE, (internal messages only)
706                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
707                        switch(message.code) {
708                                case MESSAGE_DO_PAUSE:
709                                        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
710                                        thread_change_state(trace, t, THREAD_PAUSED, false);
711                                        pthread_cond_broadcast(&trace->perpkt_cond);
712                                        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
713                                                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
714                                        }
715                                        thread_change_state(trace, t, THREAD_RUNNING, false);
716                                        pthread_cond_broadcast(&trace->perpkt_cond);
717                                        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
718                                        break;
719                                case MESSAGE_DO_STOP:
720                                        assert(trace->started == false);
721                                        assert(trace->state == STATE_FINSHED);
722                                        /* Mark the current packet as EOF */
723                                        packet->error = 0;
724                                        break;
725                                default:
726                                        fprintf(stderr, "Hasher thread didn't expect message code=%d\n", message.code);
727                        }
728                        pkt_skipped = 1;
729                        continue;
730                }
731
732                if ((packet->error = trace_read_packet(trace, packet)) <1) {
733                        break; /* We are EOF or error'd either way we stop  */
734                }
735
736                /* We are guaranteed to have a hash function i.e. != NULL */
737                trace_packet_set_hash(packet, (*trace->hasher)(packet, trace->hasher_data));
738                thread = trace_packet_get_hash(packet) % trace->perpkt_thread_count;
739                /* Blocking write to the correct queue - I'm the only writer */
740                if (trace->perpkt_threads[thread].state != THREAD_FINISHED) {
741                        uint64_t order = trace_packet_get_order(packet);
742                        libtrace_ringbuffer_write(&trace->perpkt_threads[thread].rbuffer, packet);
743                        if (trace->config.tick_count && order % trace->config.tick_count == 0) {
744                                // Write ticks to everyone else
745                                libtrace_packet_t * pkts[trace->perpkt_thread_count];
746                                memset(pkts, 0, sizeof(void *) * trace->perpkt_thread_count);
747                                libtrace_ocache_alloc(&trace->packet_freelist, (void **) pkts, trace->perpkt_thread_count, trace->perpkt_thread_count);
748                                for (i = 0; i < trace->perpkt_thread_count; i++) {
749                                        pkts[i]->error = READ_TICK;
750                                        trace_packet_set_order(pkts[i], order);
751                                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, pkts[i]);
752                                }
753                        }
754                        pkt_skipped = 0;
755                } else {
756                        assert(!"Dropping a packet!!");
757                        pkt_skipped = 1; // Reuse that packet no one read it
758                }
759        }
760
761        /* Broadcast our last failed read to all threads */
762        for (i = 0; i < trace->perpkt_thread_count; i++) {
763                libtrace_packet_t * bcast;
764                fprintf(stderr, "Broadcasting error/EOF now the trace is over\n");
765                if (i == trace->perpkt_thread_count - 1) {
766                        bcast = packet;
767                } else {
768                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &bcast, 1, 1);
769                        bcast->error = packet->error;
770                }
771                ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
772                if (trace->perpkt_threads[i].state != THREAD_FINISHED) {
773                        // Unlock early otherwise we could deadlock
774                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, bcast);
775                } else {
776                        fprintf(stderr, "SKIPPING THREAD !!!%d!!!/n", (int) i);
777                }
778                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
779        }
780
781        // We don't need to free the packet
782        thread_change_state(trace, t, THREAD_FINISHED, true);
783
784        libtrace_ocache_unregister_thread(&trace->packet_freelist);
785        if (trace->format->punregister_thread) {
786                trace->format->punregister_thread(trace, t);
787        }
788        print_memory_stats();
789
790        // TODO remove from TTABLE t sometime
791        pthread_exit(NULL);
792}
793
794/* Our simplest case when a thread becomes ready it can obtain an exclusive
795 * lock to read packets from the underlying trace.
796 */
797static int trace_pread_packet_first_in_first_served(libtrace_t *libtrace,
798                                                    libtrace_thread_t *t,
799                                                    libtrace_packet_t *packets[],
800                                                    size_t nb_packets) {
801        size_t i = 0;
802        //bool tick_hit = false;
803
804        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
805        /* Read nb_packets */
806        for (i = 0; i < nb_packets; ++i) {
807                if (libtrace_halt) {
808                        break;
809                }
810                packets[i]->error = trace_read_packet(libtrace, packets[i]);
811
812                if (packets[i]->error <= 0) {
813                        /* We'll catch this next time if we have already got packets */
814                        if ( i==0 ) {
815                                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
816                                return packets[i]->error;
817                        } else {
818                                break;
819                        }
820                }
821                /*
822                if (libtrace->config.tick_count && trace_packet_get_order(packets[i]) % libtrace->config.tick_count == 0) {
823                        tick_hit = true;
824                }*/
825        }
826        // Doing this inside the lock ensures the first packet is always
827        // recorded first
828        if (packets[0]->error > 0) {
829                store_first_packet(libtrace, packets[0], t);
830        }
831        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
832        /* XXX TODO this needs to be inband with packets, or we don't bother in this case
833        if (tick_hit) {
834                libtrace_message_t tick;
835                tick.additional.uint64 = trace_packet_get_order(packets[i]);
836                tick.code = MESSAGE_TICK;
837                trace_send_message_to_perpkts(libtrace, &tick);
838        } */
839        return i;
840}
841
842/**
843 * For the case that we have a dedicated hasher thread
844 * 1. We read a packet from our buffer
845 * 2. Move that into the packet provided (packet)
846 */
847inline static int trace_pread_packet_hasher_thread(libtrace_t *libtrace,
848                                                   libtrace_thread_t *t,
849                                                   libtrace_packet_t *packets[],
850                                                   size_t nb_packets) {
851        size_t i;
852
853        /* We store the last error message here */
854        if (t->format_data) {
855                fprintf(stderr, "Hit me, ohh yeah got error %d\n",
856                        ((libtrace_packet_t *)t->format_data)->error);
857                return ((libtrace_packet_t *)t->format_data)->error;
858        }
859
860        // Always grab at least one
861        if (packets[0]) // Recycle the old get the new
862                libtrace_ocache_free(&libtrace->packet_freelist, (void **) packets, 1, 1);
863        packets[0] = libtrace_ringbuffer_read(&t->rbuffer);
864
865        if (packets[0]->error <= 0 && packets[0]->error != READ_TICK) {
866                fprintf(stderr, "Hit me, ohh yeah returning error %d\n", packets[0]->error);
867                return packets[0]->error;
868        }
869
870        for (i = 1; i < nb_packets; i++) {
871                if (packets[i]) // Recycle the old get the new
872                        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packets[i], 1, 1);
873                if (!libtrace_ringbuffer_try_read(&t->rbuffer, (void **) &packets[i])) {
874                        packets[i] = NULL;
875                        break;
876                }
877
878                /* We will return an error or EOF the next time around */
879                if (packets[i]->error <= 0 && packets[0]->error != READ_TICK) {
880                        /* The message case will be checked automatically -
881                           However other cases like EOF and error will only be
882                           sent once*/
883                        if (packets[i]->error != READ_MESSAGE) {
884                                assert(t->format_data == NULL);
885                                t->format_data = packets[i];
886                                fprintf(stderr, "Hit me, ohh yeah set error %d\n",
887                                        ((libtrace_packet_t *)t->format_data)->error);
888                        }
889                        break;
890                }
891        }
892
893        return i;
894}
895
896/**
897 * For the first packet of each queue we keep a copy and note the system
898 * time it was received at.
899 *
900 * This is used for finding the first packet when playing back a trace
901 * in trace time. And can be used by real time applications to print
902 * results out every XXX seconds.
903 */
904void store_first_packet(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t)
905{
906        if (!t->recorded_first) {
907                libtrace_message_t mesg = {0};
908                struct timeval tv;
909                libtrace_packet_t * dup;
910
911                /* We mark system time against a copy of the packet */
912                gettimeofday(&tv, NULL);
913                dup = trace_copy_packet(packet);
914
915                ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
916                libtrace->first_packets.packets[t->perpkt_num].packet = dup;
917                memcpy(&libtrace->first_packets.packets[t->perpkt_num].tv, &tv, sizeof(tv));
918                libtrace->first_packets.count++;
919
920                /* Now update the first */
921                if (libtrace->first_packets.count == 1) {
922                        /* We the first entry hence also the first known packet */
923                        libtrace->first_packets.first = t->perpkt_num;
924                } else {
925                        /* Check if we are newer than the previous 'first' packet */
926                        size_t first = libtrace->first_packets.first;
927                        if (trace_get_seconds(dup) <
928                                trace_get_seconds(libtrace->first_packets.packets[first].packet))
929                                libtrace->first_packets.first = t->perpkt_num;
930                }
931                ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
932
933                mesg.code = MESSAGE_FIRST_PACKET;
934                trace_message_reporter(libtrace, &mesg);
935                trace_message_perpkts(libtrace, &mesg);
936                t->recorded_first = true;
937        }
938}
939
940DLLEXPORT int trace_get_first_packet(libtrace_t *libtrace,
941                                     libtrace_thread_t *t,
942                                     libtrace_packet_t **packet,
943                                     struct timeval **tv)
944{
945        void * tmp;
946        int ret = 0;
947
948        if (t) {
949                if (t->type != THREAD_PERPKT || t->trace != libtrace)
950                        return -1;
951        }
952
953        /* Throw away these which we don't use */
954        if (!packet)
955                packet = (libtrace_packet_t **) &tmp;
956        if (!tv)
957                tv = (struct timeval **) &tmp;
958
959        ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
960        if (t) {
961                /* Get the requested thread */
962                *packet = libtrace->first_packets.packets[t->perpkt_num].packet;
963                *tv = &libtrace->first_packets.packets[t->perpkt_num].tv;
964        } else if (libtrace->first_packets.count) {
965                /* Get the first packet across all threads */
966                *packet = libtrace->first_packets.packets[libtrace->first_packets.first].packet;
967                *tv = &libtrace->first_packets.packets[libtrace->first_packets.first].tv;
968                if (libtrace->first_packets.count == (size_t) libtrace->perpkt_thread_count) {
969                        ret = 1;
970                } else {
971                        struct timeval curr_tv;
972                        // If a second has passed since the first entry we will assume this is the very first packet
973                        gettimeofday(&curr_tv, NULL);
974                        if (curr_tv.tv_sec > (*tv)->tv_sec) {
975                                if(curr_tv.tv_usec > (*tv)->tv_usec || curr_tv.tv_sec - (*tv)->tv_sec > 1) {
976                                        ret = 1;
977                                }
978                        }
979                }
980        } else {
981                *packet = NULL;
982                *tv = NULL;
983        }
984        ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
985        return ret;
986}
987
988
989DLLEXPORT uint64_t tv_to_usec(struct timeval *tv)
990{
991        return (uint64_t) tv->tv_sec*1000000ull + (uint64_t) tv->tv_usec;
992}
993
994inline static struct timeval usec_to_tv(uint64_t usec)
995{
996        struct timeval tv;
997        tv.tv_sec = usec / 1000000;
998        tv.tv_usec = usec % 1000000;
999        return tv;
1000}
1001
1002/** Similar to delay_tracetime but send messages to all threads periodically */
1003static void* reporter_entry(void *data) {
1004        libtrace_message_t message = {0};
1005        libtrace_t *trace = (libtrace_t *)data;
1006        libtrace_thread_t *t = &trace->reporter_thread;
1007
1008        fprintf(stderr, "Reporter thread starting\n");
1009
1010        /* Wait until all threads are started */
1011        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
1012        if (trace->state == STATE_ERROR) {
1013                thread_change_state(trace, t, THREAD_FINISHED, false);
1014                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1015                pthread_exit(NULL);
1016        }
1017        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1018
1019        if (trace->format->pregister_thread) {
1020                trace->format->pregister_thread(trace, t, false);
1021        }
1022
1023        (*trace->reporter)(trace, MESSAGE_STARTING, (libtrace_generic_t) {0}, t);
1024        (*trace->reporter)(trace, MESSAGE_RESUMING, (libtrace_generic_t) {0}, t);
1025
1026        while (!trace_has_finished(trace)) {
1027                if (trace->config.reporter_polling) {
1028                        if (libtrace_message_queue_try_get(&t->messages, &message) == LIBTRACE_MQ_FAILED)
1029                                message.code = MESSAGE_POST_REPORTER;
1030                } else {
1031                        libtrace_message_queue_get(&t->messages, &message);
1032                }
1033                switch (message.code) {
1034                        // Check for results
1035                        case MESSAGE_POST_REPORTER:
1036                                trace->combiner.read(trace, &trace->combiner);
1037                                break;
1038                        case MESSAGE_DO_PAUSE:
1039                                assert(trace->combiner.pause);
1040                                trace->combiner.pause(trace, &trace->combiner);
1041                                (*trace->reporter)(trace, MESSAGE_PAUSING, (libtrace_generic_t) {0}, t);
1042                                trace_thread_pause(trace, t);
1043                                (*trace->reporter)(trace, MESSAGE_RESUMING, (libtrace_generic_t) {0}, t);
1044                                break;
1045                default:
1046                        (*trace->reporter)(trace, message.code, message.data, message.sender);
1047                }
1048        }
1049
1050        // Flush out whats left now all our threads have finished
1051        trace->combiner.read_final(trace, &trace->combiner);
1052
1053        // GOODBYE
1054        (*trace->reporter)(trace, MESSAGE_PAUSING, (libtrace_generic_t) {0}, t);
1055        (*trace->reporter)(trace, MESSAGE_STOPPING, (libtrace_generic_t) {0}, t);
1056
1057        thread_change_state(trace, &trace->reporter_thread, THREAD_FINISHED, true);
1058        print_memory_stats();
1059        return NULL;
1060}
1061
1062/** Similar to delay_tracetime but send messages to all threads periodically */
1063static void* keepalive_entry(void *data) {
1064        struct timeval prev, next;
1065        libtrace_message_t message = {0};
1066        libtrace_t *trace = (libtrace_t *)data;
1067        uint64_t next_release;
1068        libtrace_thread_t *t = &trace->keepalive_thread;
1069
1070        fprintf(stderr, "keepalive thread is starting\n");
1071
1072        /* Wait until all threads are started */
1073        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
1074        if (trace->state == STATE_ERROR) {
1075                thread_change_state(trace, t, THREAD_FINISHED, false);
1076                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1077                pthread_exit(NULL);
1078        }
1079        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1080
1081        gettimeofday(&prev, NULL);
1082        message.code = MESSAGE_TICK_INTERVAL;
1083
1084        while (trace->state != STATE_FINSHED) {
1085                fd_set rfds;
1086                next_release = tv_to_usec(&prev) + (trace->config.tick_interval * 1000);
1087                gettimeofday(&next, NULL);
1088                if (next_release > tv_to_usec(&next)) {
1089                        next = usec_to_tv(next_release - tv_to_usec(&next));
1090                        // Wait for timeout or a message
1091                        FD_ZERO(&rfds);
1092                        FD_SET(libtrace_message_queue_get_fd(&t->messages), &rfds);
1093                        if (select(libtrace_message_queue_get_fd(&t->messages)+1, &rfds, NULL, NULL, &next) == 1) {
1094                                libtrace_message_t msg;
1095                                libtrace_message_queue_get(&t->messages, &msg);
1096                                assert(msg.code == MESSAGE_DO_STOP);
1097                                goto done;
1098                        }
1099                }
1100                prev = usec_to_tv(next_release);
1101                if (trace->state == STATE_RUNNING) {
1102                        message.data.uint64 = ((((uint64_t)prev.tv_sec) << 32) +
1103                                               (((uint64_t)prev.tv_usec << 32)/1000000));
1104                        trace_message_perpkts(trace, &message);
1105                }
1106        }
1107done:
1108
1109        thread_change_state(trace, t, THREAD_FINISHED, true);
1110        return NULL;
1111}
1112
1113/**
1114 * Delays a packets playback so the playback will be in trace time.
1115 * This may break early if a message becomes available.
1116 *
1117 * Requires the first packet for this thread to be received.
1118 * @param libtrace  The trace
1119 * @param packet    The packet to delay
1120 * @param t         The current thread
1121 * @return Either READ_MESSAGE(-2) or 0 is successful
1122 */
1123static inline int delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t) {
1124        struct timeval curr_tv, pkt_tv;
1125        uint64_t next_release = t->tracetime_offset_usec;
1126        uint64_t curr_usec;
1127
1128        if (!t->tracetime_offset_usec) {
1129                libtrace_packet_t *first_pkt;
1130                struct timeval *sys_tv;
1131                int64_t initial_offset;
1132                int stable = trace_get_first_packet(libtrace, NULL, &first_pkt, &sys_tv);
1133                assert(first_pkt);
1134                pkt_tv = trace_get_timeval(first_pkt);
1135                initial_offset = (int64_t)tv_to_usec(sys_tv) - (int64_t)tv_to_usec(&pkt_tv);
1136                /* In the unlikely case offset is 0, change it to 1 */
1137                if (stable)
1138                        t->tracetime_offset_usec = initial_offset ? initial_offset: 1;
1139                next_release = initial_offset;
1140        }
1141        /* next_release == offset */
1142        pkt_tv = trace_get_timeval(packet);
1143        next_release += tv_to_usec(&pkt_tv);
1144        gettimeofday(&curr_tv, NULL);
1145        curr_usec = tv_to_usec(&curr_tv);
1146        if (next_release > curr_usec) {
1147                int ret, mesg_fd = libtrace_message_queue_get_fd(&t->messages);
1148                struct timeval delay_tv = usec_to_tv(next_release-curr_usec);
1149                fd_set rfds;
1150                FD_ZERO(&rfds);
1151                FD_SET(mesg_fd, &rfds);
1152                // We need to wait
1153
1154                //printf("WAITING for %d.%d next=%"PRIu64" curr=%"PRIu64" seconds packettime %f\n", delay_tv.tv_sec, delay_tv.tv_usec, next_release, curr_usec, trace_get_seconds(packet));
1155                ret = select(mesg_fd+1, &rfds, NULL, NULL, &delay_tv);
1156                if (ret == 0) {
1157                        return 0;
1158                } else if (ret > 0) {
1159                        return READ_MESSAGE;
1160                } else {
1161                        fprintf(stderr, "I thnik we broke select\n");
1162                }
1163        }
1164        return 0;
1165}
1166
1167/* Discards packets that don't match the filter.
1168 * Discarded packets are emptied and then moved to the end of the packet list.
1169 *
1170 * @param trace       The trace format, containing the filter
1171 * @param packets     An array of packets
1172 * @param nb_packets  The number of valid items in packets
1173 *
1174 * @return The number of packets that passed the filter, which are moved to
1175 *          the start of the packets array
1176 */
1177static inline size_t filter_packets(libtrace_t *trace,
1178                                    libtrace_packet_t **packets,
1179                                    size_t nb_packets) {
1180        size_t offset = 0;
1181        size_t i;
1182
1183        for (i = 0; i < nb_packets; ++i) {
1184                // The filter needs the trace attached to receive the link type
1185                packets[i]->trace = trace;
1186                if (trace_apply_filter(trace->filter, packets[i])) {
1187                        libtrace_packet_t *tmp;
1188                        tmp = packets[offset];
1189                        packets[offset++] = packets[i];
1190                        packets[i] = tmp;
1191                } else {
1192                        trace_fin_packet(packets[i]);
1193                }
1194        }
1195
1196        return offset;
1197}
1198
1199/* Read a batch of packets from the trace into a buffer.
1200 * Note that this function will block until a packet is read (or EOF is reached)
1201 *
1202 * @param libtrace    The trace
1203 * @param t           The thread
1204 * @param packets     An array of packets
1205 * @param nb_packets  The number of empty packets in packets
1206 * @return The number of packets read, 0 on EOF (or an error/message -1,-2).
1207 */
1208static int trace_pread_packet_wrapper(libtrace_t *libtrace,
1209                                      libtrace_thread_t *t,
1210                                      libtrace_packet_t *packets[],
1211                                      size_t nb_packets) {
1212        int i;
1213        assert(nb_packets);
1214        assert(libtrace && "libtrace is NULL in trace_read_packet()");
1215        if (trace_is_err(libtrace))
1216                return -1;
1217        if (!libtrace->started) {
1218                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
1219                              "You must call libtrace_start() before trace_read_packet()\n");
1220                return -1;
1221        }
1222
1223        if (libtrace->format->pread_packets) {
1224                int ret;
1225                for (i = 0; i < (int) nb_packets; ++i) {
1226                        assert(i[packets]);
1227                        if (!(packets[i]->buf_control==TRACE_CTRL_PACKET ||
1228                              packets[i]->buf_control==TRACE_CTRL_EXTERNAL)) {
1229                                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,
1230                                              "Packet passed to trace_read_packet() is invalid\n");
1231                                return -1;
1232                        }
1233                }
1234                do {
1235                        ret=libtrace->format->pread_packets(libtrace, t,
1236                                                            packets,
1237                                                            nb_packets);
1238                        /* Error, EOF or message? */
1239                        if (ret <= 0) {
1240                                return ret;
1241                        }
1242
1243                        if (libtrace->filter) {
1244                                int remaining;
1245                                remaining = filter_packets(libtrace,
1246                                                           packets, ret);
1247                                t->filtered_packets += ret - remaining;
1248                                ret = remaining;
1249                        }
1250                        for (i = 0; i < ret; ++i) {
1251                                /* We do not mark the packet against the trace,
1252                                 * before hand or after. After breaks DAG meta
1253                                 * packets and before is inefficient */
1254                                //packets[i]->trace = libtrace;
1255                                /* TODO IN FORMAT?? Like traditional libtrace */
1256                                if (libtrace->snaplen>0)
1257                                        trace_set_capture_length(packets[i],
1258                                                        libtrace->snaplen);
1259                                trace_packet_set_order(packets[i], trace_get_erf_timestamp(packets[i]));
1260                        }
1261                } while(ret == 0);
1262                return ret;
1263        }
1264        trace_set_err(libtrace, TRACE_ERR_UNSUPPORTED,
1265                      "This format does not support reading packets\n");
1266        return ~0U;
1267}
1268
1269/* Restarts a parallel trace, this is called from trace_pstart.
1270 * The libtrace lock is held upon calling this function.
1271 * Typically with a parallel trace the threads are not
1272 * killed rather.
1273 */
1274static int trace_prestart(libtrace_t * libtrace, void *global_blob,
1275                          fn_per_pkt per_pkt, fn_reporter reporter) {
1276        int err = 0;
1277        if (libtrace->state != STATE_PAUSED) {
1278                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
1279                        "trace(%s) is not currently paused",
1280                              libtrace->uridata);
1281                return -1;
1282        }
1283
1284        /* Update functions if requested */
1285        if (per_pkt)
1286                libtrace->per_pkt = per_pkt;
1287        if (reporter)
1288                libtrace->reporter = reporter;
1289        if(global_blob)
1290                libtrace->global_blob = global_blob;
1291
1292        assert(libtrace_parallel);
1293        assert(!libtrace->perpkt_thread_states[THREAD_RUNNING]);
1294        assert(libtrace->per_pkt);
1295
1296        if (libtrace->perpkt_thread_count > 1 &&
1297            trace_supports_parallel(libtrace) &&
1298            !trace_has_dedicated_hasher(libtrace)) {
1299                fprintf(stderr, "Restarting trace pstart_input()\n");
1300                err = libtrace->format->pstart_input(libtrace);
1301        } else {
1302                if (libtrace->format->start_input) {
1303                        fprintf(stderr, "Restarting trace start_input()\n");
1304                        err = libtrace->format->start_input(libtrace);
1305                }
1306        }
1307
1308        if (err == 0) {
1309                libtrace->started = true;
1310                libtrace_change_state(libtrace, STATE_RUNNING, false);
1311        }
1312        return err;
1313}
1314
1315/**
1316 * @return the number of CPU cores on the machine. -1 if unknown.
1317 */
1318SIMPLE_FUNCTION static int get_nb_cores() {
1319        // TODO add BSD support
1320        return sysconf(_SC_NPROCESSORS_ONLN);
1321}
1322
1323/**
1324 * Verifies the configuration and sets default values for any values not
1325 * specified by the user.
1326 */
1327static void verify_configuration(libtrace_t *libtrace) {
1328
1329        if (libtrace->config.hasher_queue_size <= 0)
1330                libtrace->config.hasher_queue_size = 1000;
1331
1332        if (libtrace->config.perpkt_threads <= 0) {
1333                libtrace->perpkt_thread_count = get_nb_cores();
1334                if (libtrace->perpkt_thread_count <= 0)
1335                        // Lets just use one
1336                        libtrace->perpkt_thread_count = 1;
1337        } else {
1338                libtrace->perpkt_thread_count = libtrace->config.perpkt_threads;
1339        }
1340
1341        if (libtrace->config.reporter_thold <= 0)
1342                libtrace->config.reporter_thold = 100;
1343        if (libtrace->config.burst_size <= 0)
1344                libtrace->config.burst_size = 10;
1345        if (libtrace->config.packet_thread_cache_size <= 0)
1346                libtrace->config.packet_thread_cache_size = 20;
1347        if (libtrace->config.packet_cache_size <= 0)
1348                libtrace->config.packet_cache_size = (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count;
1349
1350        if (libtrace->config.packet_cache_size <
1351                (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count)
1352                fprintf(stderr, "WARNING deadlocks may occur and extra memory allocating buffer sizes (packet_freelist_size) mismatched\n");
1353
1354        if (libtrace->combiner.initialise == NULL && libtrace->combiner.publish == NULL)
1355                libtrace->combiner = combiner_unordered;
1356
1357        /* Figure out if we are using a dedicated hasher thread? */
1358        if (libtrace->hasher && libtrace->perpkt_thread_count > 1) {
1359                libtrace->hasher_thread.type = THREAD_HASHER;
1360        }
1361}
1362
1363/**
1364 * Starts a libtrace_thread, including allocating memory for messaging.
1365 * Threads are expected to wait until the libtrace look is released.
1366 * Hence why we don't init structures until later.
1367 *
1368 * @param trace The trace the thread is associated with
1369 * @param t The thread that is filled when the thread is started
1370 * @param type The type of thread
1371 * @param start_routine The entry location of the thread
1372 * @param perpkt_num The perpkt thread number (should be set -1 if not perpkt)
1373 * @param name For debugging purposes set the threads name (Optional)
1374 *
1375 * @return 0 on success or -1 upon error in which case the libtrace error is set.
1376 *         In this situation the thread structure is zeroed.
1377 */
1378static int trace_start_thread(libtrace_t *trace,
1379                       libtrace_thread_t *t,
1380                       enum thread_types type,
1381                       void *(*start_routine) (void *),
1382                       int perpkt_num,
1383                       const char *name) {
1384        pthread_attr_t attrib;
1385        cpu_set_t cpus;
1386        int ret, i;
1387        assert(t->type == THREAD_EMPTY);
1388        t->trace = trace;
1389        t->ret = NULL;
1390        t->user_data = NULL;
1391        t->type = type;
1392        t->state = THREAD_RUNNING;
1393
1394        CPU_ZERO(&cpus);
1395        for (i = 0; i < get_nb_cores(); i++)
1396                CPU_SET(i, &cpus);
1397        pthread_attr_init(&attrib);
1398        pthread_attr_setaffinity_np(&attrib, sizeof(cpus), &cpus);
1399
1400        ret = pthread_create(&t->tid, &attrib, start_routine, (void *) trace);
1401        pthread_attr_destroy(&attrib);
1402        if (ret != 0) {
1403                libtrace_zero_thread(t);
1404                trace_set_err(trace, ret, "Failed to create a thread of type=%d\n", type);
1405                return -1;
1406        }
1407        libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
1408        if (trace_has_dedicated_hasher(trace) && type == THREAD_PERPKT) {
1409                libtrace_ringbuffer_init(&t->rbuffer,
1410                                         trace->config.hasher_queue_size,
1411                                         trace->config.hasher_polling?
1412                                                 LIBTRACE_RINGBUFFER_POLLING:
1413                                                 LIBTRACE_RINGBUFFER_BLOCKING);
1414        }
1415        if(name)
1416                pthread_setname_np(t->tid, name);
1417        t->perpkt_num = perpkt_num;
1418        return 0;
1419}
1420
1421/** Parses the environment variable LIBTRACE_CONF into the supplied
1422 * configuration structure.
1423 *
1424 * @param libtrace The trace from which we determine the URI
1425 * @param uc A configuration structure to be configured.
1426 *
1427 * We search for 3 environment variables and apply them to the config in the
1428 * following order. Such that the first has the lowest priority.
1429 *
1430 * 1. LIBTRACE_CONF, The global environment configuration
1431 * 2. LIBTRACE_CONF_<FORMAT>, Applied to a given format
1432 * 3. LIBTRACE_CONF_<FORMAT_URI>, Applied the specified trace
1433 *
1434 * E.g.
1435 * - int:eth0 would match LIBTRACE_CONF, LIBTRACE_CONF_INT, LIBTRACE_CONF_INT_ETH0
1436 * - dag:/dev/dag0,0 would match LIBTRACE_CONF, LIBTRACE_CONF_DAG, LIBTRACE_CONF_DAG__DEV_DAG0_0
1437 * - test.erf would match LIBTRACE_CONF, LIBTRACE_CONF_ERF, LIBTRACE_CONF_ERF_TEST_ERF
1438 *
1439 * @note All enironment variables names MUST only contian
1440 * [A-Z], [0-9] and [_] (underscore) and not start with a number. Any characters
1441 * outside of this range should be captilised if possible or replaced with an
1442 * underscore.
1443 */
1444static void parse_env_config (libtrace_t *libtrace, struct user_configuration* uc) {
1445        char env_name[1024] = "LIBTRACE_CONF_";
1446        size_t len = strlen(env_name);
1447        size_t mark = 0;
1448        size_t i;
1449        char * env;
1450
1451        /* Make our compound string */
1452        strncpy(&env_name[len], libtrace->format->name, sizeof(env_name) - len);
1453        len += strlen(libtrace->format->name);
1454        strncpy(&env_name[len], ":", sizeof(env_name) - len);
1455        len += 1;
1456        strncpy(&env_name[len], libtrace->uridata, sizeof(env_name) - len);
1457
1458        /* env names are allowed to be A-Z (CAPS) 0-9 and _ */
1459        for (i = 0; env_name[i] != 0; ++i) {
1460                env_name[i] = toupper(env_name[i]);
1461                if(env_name[i] == ':') {
1462                        mark = i;
1463                }
1464                if (!( (env_name[i] >= 'A' && env_name[i] <= 'Z') ||
1465                       (env_name[i] >= '0' && env_name[i] <= '9') )) {
1466                        env_name[i] = '_';
1467                }
1468        }
1469
1470        /* First apply global env settings LIBTRACE_CONF */
1471        env = getenv("LIBTRACE_CONF");
1472        if (env)
1473        {
1474                printf("Got env %s", env);
1475                parse_user_config(uc, env);
1476        }
1477
1478        /* Then format settings LIBTRACE_CONF_<FORMAT> */
1479        if (mark != 0) {
1480                env_name[mark] = 0;
1481                env = getenv(env_name);
1482                if (env) {
1483                        printf("Got %s=%s", env_name, env);
1484                        parse_user_config(uc, env);
1485                }
1486                env_name[mark] = '_';
1487        }
1488
1489        /* Finally this specific trace LIBTRACE_CONF_<FORMAT_URI> */
1490        env = getenv(env_name);
1491        if (env) {
1492                printf("Got %s=%s", env_name, env);
1493                parse_user_config(uc, env);
1494        }
1495}
1496
1497DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob,
1498                           fn_per_pkt per_pkt, fn_reporter reporter) {
1499        int i;
1500        int ret = -1;
1501        char name[16];
1502        sigset_t sig_before, sig_block_all;
1503        assert(libtrace);
1504
1505        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1506        if (trace_is_err(libtrace)) {
1507                goto cleanup_none;
1508        }
1509
1510        if (libtrace->state == STATE_PAUSED) {
1511                ret = trace_prestart(libtrace, global_blob, per_pkt, reporter);
1512                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1513                return ret;
1514        }
1515
1516        if (libtrace->state != STATE_NEW) {
1517                trace_set_err(libtrace, TRACE_ERR_BAD_STATE, "trace_pstart "
1518                              "should be called on a NEW or PAUSED trace but "
1519                              "instead was called from %s",
1520                              get_trace_state_name(libtrace->state));
1521                goto cleanup_none;
1522        }
1523
1524        /* Store the user defined things against the trace */
1525        libtrace->global_blob = global_blob;
1526        libtrace->per_pkt = per_pkt;
1527        libtrace->reporter = reporter;
1528        /* And zero other fields */
1529        for (i = 0; i < THREAD_STATE_MAX; ++i) {
1530                libtrace->perpkt_thread_states[i] = 0;
1531        }
1532        libtrace->first_packets.first = 0;
1533        libtrace->first_packets.count = 0;
1534        libtrace->first_packets.packets = NULL;
1535        libtrace->perpkt_threads = NULL;
1536        /* Set a global which says we are using a parallel trace. This is
1537         * for backwards compatability due to changes when destroying packets */
1538        libtrace_parallel = 1;
1539
1540        /* Parses configuration passed through environment variables */
1541        parse_env_config(libtrace, &libtrace->config);
1542        verify_configuration(libtrace);
1543
1544        /* Try start the format - we prefer parallel over single threaded, as
1545         * these formats should support messages better */
1546        if (trace_supports_parallel(libtrace) &&
1547            !trace_has_dedicated_hasher(libtrace)) {
1548                printf("Using the parallel trace format\n");
1549                ret = libtrace->format->pstart_input(libtrace);
1550                libtrace->pread = trace_pread_packet_wrapper;
1551        } else {
1552                printf("Using single threaded interface\n");
1553                if (libtrace->format->start_input) {
1554                        ret = libtrace->format->start_input(libtrace);
1555                }
1556                if (libtrace->perpkt_thread_count > 1)
1557                        libtrace->pread = trace_pread_packet_first_in_first_served;
1558                else
1559                        /* Use standard read_packet */
1560                        libtrace->pread = NULL;
1561        }
1562
1563        if (ret != 0) {
1564                goto cleanup_none;
1565        }
1566
1567        /* --- Start all the threads we need --- */
1568        /* Disable signals because it is inherited by the threads we start */
1569        sigemptyset(&sig_block_all);
1570        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_block_all, &sig_before), == 0);
1571
1572        /* If we need a hasher thread start it
1573         * Special Case: If single threaded we don't need a hasher
1574         */
1575        if (trace_has_dedicated_hasher(libtrace)) {
1576                libtrace->hasher_thread.type = THREAD_EMPTY;
1577                ret = trace_start_thread(libtrace, &libtrace->hasher_thread,
1578                                   THREAD_HASHER, hasher_entry, -1,
1579                                   "hasher-thread");
1580                if (ret != 0)
1581                        goto cleanup_started;
1582                libtrace->pread = trace_pread_packet_hasher_thread;
1583        } else {
1584                libtrace->hasher_thread.type = THREAD_EMPTY;
1585        }
1586
1587        /* Start up our perpkt threads */
1588        libtrace->perpkt_threads = calloc(sizeof(libtrace_thread_t),
1589                                          libtrace->perpkt_thread_count);
1590        if (!libtrace->perpkt_threads) {
1591                trace_set_err(libtrace, errno, "trace_pstart "
1592                              "failed to allocate memory.");
1593                goto cleanup_threads;
1594        }
1595        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1596                snprintf(name, sizeof(name), "perpkt-%d", i);
1597                libtrace_zero_thread(&libtrace->perpkt_threads[i]);
1598                ret = trace_start_thread(libtrace, &libtrace->perpkt_threads[i],
1599                                   THREAD_PERPKT, perpkt_threads_entry, i,
1600                                   name);
1601                if (ret != 0)
1602                        goto cleanup_threads;
1603        }
1604
1605        /* Start the reporter thread */
1606        if (reporter) {
1607                if (libtrace->combiner.initialise)
1608                        libtrace->combiner.initialise(libtrace, &libtrace->combiner);
1609                ret = trace_start_thread(libtrace, &libtrace->reporter_thread,
1610                                   THREAD_REPORTER, reporter_entry, -1,
1611                                   "reporter_thread");
1612                if (ret != 0)
1613                        goto cleanup_threads;
1614        }
1615
1616        /* Start the keepalive thread */
1617        if (libtrace->config.tick_interval > 0) {
1618                ret = trace_start_thread(libtrace, &libtrace->keepalive_thread,
1619                                   THREAD_KEEPALIVE, keepalive_entry, -1,
1620                                   "keepalive_thread");
1621                if (ret != 0)
1622                        goto cleanup_threads;
1623        }
1624
1625        /* Init other data structures */
1626        libtrace->perpkt_thread_states[THREAD_RUNNING] = libtrace->perpkt_thread_count;
1627        ASSERT_RET(pthread_spin_init(&libtrace->first_packets.lock, 0), == 0);
1628        libtrace->first_packets.packets = calloc(libtrace->perpkt_thread_count,
1629                                                 sizeof(*libtrace->first_packets.packets));
1630        if (libtrace->first_packets.packets == NULL) {
1631                trace_set_err(libtrace, errno, "trace_pstart "
1632                              "failed to allocate memory.");
1633                goto cleanup_threads;
1634        }
1635
1636        if (libtrace_ocache_init(&libtrace->packet_freelist,
1637                             (void* (*)()) trace_create_packet,
1638                             (void (*)(void *))trace_destroy_packet,
1639                             libtrace->config.packet_thread_cache_size,
1640                             libtrace->config.packet_cache_size * 4,
1641                             libtrace->config.fixed_packet_count) != 0) {
1642                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "trace_pstart "
1643                              "failed to allocate ocache.");
1644                goto cleanup_threads;
1645        }
1646
1647        /* Threads don't start */
1648        libtrace->started = true;
1649        libtrace_change_state(libtrace, STATE_RUNNING, false);
1650
1651        ret = 0;
1652        goto success;
1653cleanup_threads:
1654        if (libtrace->first_packets.packets) {
1655                free(libtrace->first_packets.packets);
1656                libtrace->first_packets.packets = NULL;
1657        }
1658        libtrace_change_state(libtrace, STATE_ERROR, false);
1659        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1660        if (libtrace->hasher_thread.type == THREAD_HASHER) {
1661                pthread_join(libtrace->hasher_thread.tid, NULL);
1662                libtrace_zero_thread(&libtrace->hasher_thread);
1663        }
1664
1665        if (libtrace->perpkt_threads) {
1666                for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1667                        if (libtrace->perpkt_threads[i].type == THREAD_PERPKT) {
1668                                pthread_join(libtrace->perpkt_threads[i].tid, NULL);
1669                                libtrace_zero_thread(&libtrace->perpkt_threads[i]);
1670                        } else break;
1671                }
1672                free(libtrace->perpkt_threads);
1673                libtrace->perpkt_threads = NULL;
1674        }
1675
1676        if (libtrace->reporter_thread.type == THREAD_REPORTER) {
1677                pthread_join(libtrace->reporter_thread.tid, NULL);
1678                libtrace_zero_thread(&libtrace->reporter_thread);
1679        }
1680
1681        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
1682                pthread_join(libtrace->keepalive_thread.tid, NULL);
1683                libtrace_zero_thread(&libtrace->keepalive_thread);
1684        }
1685        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1686        libtrace_change_state(libtrace, STATE_NEW, false);
1687        assert(libtrace->perpkt_thread_states[THREAD_RUNNING] == 0);
1688        libtrace->perpkt_thread_states[THREAD_FINISHED] = 0;
1689cleanup_started:
1690        if (trace_supports_parallel(libtrace) &&
1691            !trace_has_dedicated_hasher(libtrace)
1692            && libtrace->perpkt_thread_count > 1) {
1693                if (libtrace->format->ppause_input)
1694                        libtrace->format->ppause_input(libtrace);
1695        } else {
1696                if (libtrace->format->pause_input)
1697                        libtrace->format->pause_input(libtrace);
1698        }
1699        ret = -1;
1700success:
1701        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_before, NULL), == 0);
1702cleanup_none:
1703        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1704        return ret;
1705}
1706
1707/*
1708 * Pauses a trace, this should only be called by the main thread
1709 * 1. Set started = false
1710 * 2. All perpkt threads are paused waiting on a condition var
1711 * 3. Then call ppause on the underlying format if found
1712 * 4. The traces state is paused
1713 *
1714 * Once done you should be able to modify the trace setup and call pstart again
1715 * TODO handle changing thread numbers
1716 */
1717DLLEXPORT int trace_ppause(libtrace_t *libtrace)
1718{
1719        libtrace_thread_t *t;
1720        int i;
1721        assert(libtrace);
1722
1723        t = get_thread_table(libtrace);
1724        // Check state from within the lock if we are going to change it
1725        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1726        if (!libtrace->started || libtrace->state != STATE_RUNNING) {
1727                fprintf(stderr, "pause failed started=%d state=%s (%d)\n", libtrace->started, get_trace_state_name(libtrace->state), libtrace->state);
1728                trace_set_err(libtrace,TRACE_ERR_BAD_STATE, "You must call trace_start() before calling trace_ppause()");
1729                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1730                return -1;
1731        }
1732
1733        libtrace_change_state(libtrace, STATE_PAUSING, false);
1734        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1735
1736        // Special case handle the hasher thread case
1737        if (trace_has_dedicated_hasher(libtrace)) {
1738                if (libtrace->config.debug_state)
1739                        fprintf(stderr, "Hasher thread is running, asking it to pause ...");
1740                libtrace_message_t message = {0};
1741                message.code = MESSAGE_DO_PAUSE;
1742                trace_message_thread(libtrace, &libtrace->hasher_thread, &message);
1743                // Wait for it to pause
1744                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1745                while (libtrace->hasher_thread.state == THREAD_RUNNING) {
1746                        ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
1747                }
1748                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1749                if (libtrace->config.debug_state)
1750                        fprintf(stderr, " DONE\n");
1751        }
1752
1753        if (libtrace->config.debug_state)
1754                fprintf(stderr, "Asking perpkt threads to pause ...");
1755        // Stop threads, skip this one if it's a perpkt
1756        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1757                if (&libtrace->perpkt_threads[i] != t) {
1758                        libtrace_message_t message = {0};
1759                        message.code = MESSAGE_DO_PAUSE;
1760                        ASSERT_RET(trace_message_thread(libtrace, &libtrace->perpkt_threads[i], &message), != -1);
1761                        if(trace_has_dedicated_hasher(libtrace)) {
1762                                // The hasher has stopped and other threads have messages waiting therefore
1763                                // If the queues are empty the other threads would have no data
1764                                // So send some message packets to simply ask the threads to check
1765                                // We are the only writer since hasher has paused
1766                                libtrace_packet_t *pkt;
1767                                libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &pkt, 1, 1);
1768                                pkt->error = READ_MESSAGE;
1769                                libtrace_ringbuffer_write(&libtrace->perpkt_threads[i].rbuffer, pkt);
1770                        }
1771                } else {
1772                        fprintf(stderr, "Mapper threads should not be used to pause a trace this could cause any number of problems!!\n");
1773                }
1774        }
1775
1776        if (t) {
1777                // A perpkt is doing the pausing, interesting, fake an extra thread paused
1778                // We rely on the user to *not* return before starting the trace again
1779                thread_change_state(libtrace, t, THREAD_PAUSED, true);
1780        }
1781
1782        // Wait for all threads to pause
1783        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1784        while(libtrace->perpkt_thread_states[THREAD_RUNNING]) {
1785                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
1786        }
1787        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1788
1789        if (libtrace->config.debug_state)
1790                fprintf(stderr, " DONE\n");
1791
1792        // Deal with the reporter
1793        if (trace_has_reporter(libtrace)) {
1794                if (libtrace->config.debug_state)
1795                        fprintf(stderr, "Reporter thread is running, asking it to pause ...");
1796                libtrace_message_t message = {0};
1797                message.code = MESSAGE_DO_PAUSE;
1798                trace_message_thread(libtrace, &libtrace->reporter_thread, &message);
1799                // Wait for it to pause
1800                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1801                while (libtrace->reporter_thread.state == THREAD_RUNNING) {
1802                        ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
1803                }
1804                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1805                if (libtrace->config.debug_state)
1806                        fprintf(stderr, " DONE\n");
1807        }
1808
1809        /* Cache values before we pause */
1810        if (libtrace->stats == NULL)
1811                libtrace->stats = trace_create_statistics();
1812        // Save the statistics against the trace
1813        trace_get_statistics(libtrace, NULL);
1814        if (trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace) && libtrace->perpkt_thread_count > 1) {
1815                libtrace->started = false;
1816                if (libtrace->format->ppause_input)
1817                        libtrace->format->ppause_input(libtrace);
1818                // TODO What happens if we don't have pause input??
1819        } else {
1820                int err;
1821                fprintf(stderr, "Trace is not parallel so we are doing a normal pause %s\n", libtrace->uridata);
1822                err = trace_pause(libtrace);
1823                // We should handle this a bit better
1824                if (err)
1825                        return err;
1826        }
1827
1828        // Only set as paused after the pause has been called on the trace
1829        libtrace_change_state(libtrace, STATE_PAUSED, true);
1830        return 0;
1831}
1832
1833/**
1834 * Stop trace finish prematurely as though it meet an EOF
1835 * This should only be called by the main thread
1836 * 1. Calls ppause
1837 * 2. Sends a message asking for threads to finish
1838 * 3. Releases threads which will pause
1839 */
1840DLLEXPORT int trace_pstop(libtrace_t *libtrace)
1841{
1842        int i, err;
1843        libtrace_message_t message = {0};
1844        assert(libtrace);
1845
1846        // Ensure all threads have paused and the underlying trace format has
1847        // been closed and all packets associated are cleaned up
1848        // Pause will do any state checks for us
1849        err = trace_ppause(libtrace);
1850        if (err)
1851                return err;
1852
1853        // Now send a message asking the threads to stop
1854        // This will be retrieved before trying to read another packet
1855
1856        message.code = MESSAGE_DO_STOP;
1857        trace_message_perpkts(libtrace, &message);
1858        if (trace_has_dedicated_hasher(libtrace))
1859                trace_message_thread(libtrace, &libtrace->hasher_thread, &message);
1860
1861        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1862                trace_message_thread(libtrace, &libtrace->perpkt_threads[i], &message);
1863        }
1864
1865        // Now release the threads and let them stop
1866        libtrace_change_state(libtrace, STATE_FINSHED, true);
1867        return 0;
1868}
1869
1870DLLEXPORT int trace_set_hasher(libtrace_t *trace, enum hasher_types type, fn_hasher hasher, void *data) {
1871        int ret = -1;
1872        if ((type == HASHER_CUSTOM && !hasher) || (type == HASHER_BALANCE && hasher)) {
1873                return -1;
1874        }
1875
1876        // Save the requirements
1877        trace->hasher_type = type;
1878        if (hasher) {
1879                trace->hasher = hasher;
1880                trace->hasher_data = data;
1881        } else {
1882                trace->hasher = NULL;
1883                trace->hasher_data = NULL;
1884        }
1885
1886        // Try push this to hardware - NOTE hardware could do custom if
1887        // there is a more efficient way to apply it, in this case
1888        // it will simply grab the function out of libtrace_t
1889        if (trace->format->pconfig_input)
1890                ret = trace->format->pconfig_input(trace, TRACE_OPTION_SET_HASHER, &type);
1891
1892        if (ret == -1) {
1893                /* We have to deal with this ourself */
1894                if (!hasher) {
1895                        switch (type)
1896                        {
1897                                case HASHER_CUSTOM:
1898                                case HASHER_BALANCE:
1899                                        return 0;
1900                                case HASHER_BIDIRECTIONAL:
1901                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
1902                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
1903                                        toeplitz_init_config(trace->hasher_data, 1);
1904                                        return 0;
1905                                case HASHER_UNIDIRECTIONAL:
1906                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
1907                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
1908                                        toeplitz_init_config(trace->hasher_data, 0);
1909                                        return 0;
1910                        }
1911                        return -1;
1912                }
1913        } else {
1914                /* If the hasher is hardware we zero out the hasher and hasher
1915                 * data fields - only if we need a hasher do we do this */
1916                trace->hasher = NULL;
1917                trace->hasher_data = NULL;
1918        }
1919
1920        return 0;
1921}
1922
1923// Waits for all threads to finish
1924DLLEXPORT void trace_join(libtrace_t *libtrace) {
1925        int i;
1926
1927        /* Firstly wait for the perpkt threads to finish, since these are
1928         * user controlled */
1929        for (i=0; i< libtrace->perpkt_thread_count; i++) {
1930                //printf("Waiting to join with perpkt #%d\n", i);
1931                ASSERT_RET(pthread_join(libtrace->perpkt_threads[i].tid, NULL), == 0);
1932                //printf("Joined with perpkt #%d\n", i);
1933                // So we must do our best effort to empty the queue - so
1934                // the producer (or any other threads) don't block.
1935                libtrace_packet_t * packet;
1936                assert(libtrace->perpkt_threads[i].state == THREAD_FINISHED);
1937                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
1938                        if (packet) // This could be NULL iff the perpkt finishes early
1939                                trace_destroy_packet(packet);
1940        }
1941
1942        /* Now the hasher */
1943        if (trace_has_dedicated_hasher(libtrace)) {
1944                pthread_join(libtrace->hasher_thread.tid, NULL);
1945                assert(libtrace->hasher_thread.state == THREAD_FINISHED);
1946        }
1947
1948        // Now that everything is finished nothing can be touching our
1949        // buffers so clean them up
1950        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1951                // Its possible 1 packet got added by the reporter (or 1 per any other thread) since we cleaned up
1952                // if they lost timeslice before-during a write
1953                libtrace_packet_t * packet;
1954                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
1955                        trace_destroy_packet(packet);
1956                if (libtrace->hasher) {
1957                        assert(libtrace_ringbuffer_is_empty(&libtrace->perpkt_threads[i].rbuffer));
1958                        libtrace_ringbuffer_destroy(&libtrace->perpkt_threads[i].rbuffer);
1959                }
1960                // Cannot destroy vector yet, this happens with trace_destroy
1961        }
1962
1963        if (trace_has_reporter(libtrace)) {
1964                pthread_join(libtrace->reporter_thread.tid, NULL);
1965                assert(libtrace->reporter_thread.state == THREAD_FINISHED);
1966        }
1967
1968        // Wait for the tick (keepalive) thread if it has been started
1969        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
1970                libtrace_message_t msg = {0};
1971                msg.code = MESSAGE_DO_STOP;
1972                trace_message_thread(libtrace, &libtrace->keepalive_thread, &msg);
1973                pthread_join(libtrace->keepalive_thread.tid, NULL);
1974        }
1975
1976        libtrace_change_state(libtrace, STATE_JOINED, true);
1977        print_memory_stats();
1978}
1979
1980DLLEXPORT int libtrace_thread_get_message_count(libtrace_t * libtrace,
1981                                                libtrace_thread_t *t)
1982{
1983        int ret;
1984        if (t == NULL)
1985                t = get_thread_descriptor(libtrace);
1986        if (t == NULL)
1987                return -1;
1988        ret = libtrace_message_queue_count(&t->messages);
1989        return ret < 0 ? 0 : ret;
1990}
1991
1992DLLEXPORT int libtrace_thread_get_message(libtrace_t * libtrace,
1993                                          libtrace_thread_t *t,
1994                                          libtrace_message_t * message)
1995{
1996        int ret;
1997        if (t == NULL)
1998                t = get_thread_descriptor(libtrace);
1999        if (t == NULL)
2000                return -1;
2001        ret = libtrace_message_queue_get(&t->messages, message);
2002        return ret < 0 ? 0 : ret;
2003}
2004
2005DLLEXPORT int libtrace_thread_try_get_message(libtrace_t * libtrace,
2006                                              libtrace_thread_t *t,
2007                                              libtrace_message_t * message)
2008{
2009        if (t == NULL)
2010                t = get_thread_descriptor(libtrace);
2011        if (t == NULL)
2012                return -1;
2013        if (libtrace_message_queue_try_get(&t->messages, message) != LIBTRACE_MQ_FAILED)
2014                return 0;
2015        else
2016                return -1;
2017}
2018
2019DLLEXPORT int trace_message_thread(libtrace_t * libtrace, libtrace_thread_t *t, libtrace_message_t * message)
2020{
2021        int ret;
2022        if (!message->sender)
2023                message->sender = get_thread_descriptor(libtrace);
2024
2025        ret = libtrace_message_queue_put(&t->messages, message);
2026        return ret < 0 ? 0 : ret;
2027}
2028
2029DLLEXPORT int trace_message_reporter(libtrace_t * libtrace, libtrace_message_t * message)
2030{
2031        if (!trace_has_reporter(libtrace) ||
2032            !(libtrace->reporter_thread.state == THREAD_RUNNING
2033              || libtrace->reporter_thread.state == THREAD_PAUSED))
2034                return -1;
2035
2036        return trace_message_thread(libtrace, &libtrace->reporter_thread, message);
2037}
2038
2039DLLEXPORT int trace_post_reporter(libtrace_t *libtrace)
2040{
2041        libtrace_message_t message = {0};
2042        message.code = MESSAGE_POST_REPORTER;
2043        return trace_message_reporter(libtrace, (void *) &message);
2044}
2045
2046DLLEXPORT int trace_message_perpkts(libtrace_t * libtrace, libtrace_message_t * message)
2047{
2048        int i;
2049        int missed = 0;
2050        if (message->sender == NULL)
2051                message->sender = get_thread_descriptor(libtrace);
2052        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2053                if (libtrace->perpkt_threads[i].state == THREAD_RUNNING ||
2054                    libtrace->perpkt_threads[i].state == THREAD_PAUSED) {
2055                        libtrace_message_queue_put(&libtrace->perpkt_threads[i].messages, message);
2056                } else {
2057                        missed += 1;
2058                }
2059        }
2060        return -missed;
2061}
2062
2063DLLEXPORT void * trace_get_local(libtrace_t *trace)
2064{
2065        return trace->global_blob;
2066}
2067
2068DLLEXPORT void * trace_set_local(libtrace_t *trace, void * data)
2069{
2070        void *ret;
2071        pthread_mutex_lock(&trace->libtrace_lock);
2072        ret = trace->global_blob;
2073        trace->global_blob = data;
2074        pthread_mutex_unlock(&trace->libtrace_lock);
2075        return ret;
2076}
2077
2078DLLEXPORT void * trace_get_tls(libtrace_thread_t *t)
2079{
2080        return t->user_data;
2081}
2082
2083DLLEXPORT void * trace_set_tls(libtrace_thread_t *t, void * data)
2084{
2085        void *ret = t->user_data;
2086        t->user_data = data;
2087        return ret;
2088}
2089
2090/**
2091 * Publishes a result to the reduce queue
2092 * Should only be called by a perpkt thread, i.e. from a perpkt handler
2093 */
2094DLLEXPORT void trace_publish_result(libtrace_t *libtrace, libtrace_thread_t *t, uint64_t key, libtrace_generic_t value, int type) {
2095        libtrace_result_t res;
2096        res.type = type;
2097        res.key = key;
2098        res.value = value;
2099        assert(libtrace->combiner.publish);
2100        libtrace->combiner.publish(libtrace, t->perpkt_num, &libtrace->combiner, &res);
2101        return;
2102}
2103
2104DLLEXPORT void trace_set_combiner(libtrace_t *trace, const libtrace_combine_t *combiner, libtrace_generic_t config){
2105        if (combiner) {
2106                trace->combiner = *combiner;
2107                trace->combiner.configuration = config;
2108        } else {
2109                // No combiner, so don't try use it
2110                memset(&trace->combiner, 0, sizeof(trace->combiner));
2111        }
2112}
2113
2114DLLEXPORT uint64_t trace_packet_get_order(libtrace_packet_t * packet) {
2115        return packet->order;
2116}
2117
2118DLLEXPORT uint64_t trace_packet_get_hash(libtrace_packet_t * packet) {
2119        return packet->hash;
2120}
2121
2122DLLEXPORT void trace_packet_set_order(libtrace_packet_t * packet, uint64_t order) {
2123        packet->order = order;
2124}
2125
2126DLLEXPORT void trace_packet_set_hash(libtrace_packet_t * packet, uint64_t hash) {
2127        packet->hash = hash;
2128}
2129
2130DLLEXPORT bool trace_has_finished(libtrace_t * libtrace) {
2131        return libtrace->state == STATE_FINSHED || libtrace->state == STATE_JOINED;
2132}
2133
2134DLLEXPORT int trace_parallel_config(libtrace_t *libtrace, trace_parallel_option_t option, void *value)
2135{
2136        UNUSED int ret = -1;
2137        switch (option) {
2138                case TRACE_OPTION_TICK_INTERVAL:
2139                        libtrace->config.tick_interval = *((int *) value);
2140                        return 1;
2141                case TRACE_OPTION_SET_HASHER:
2142                        return trace_set_hasher(libtrace, (enum hasher_types) *((int *) value), NULL, NULL);
2143                case TRACE_OPTION_SET_PERPKT_THREAD_COUNT:
2144                        libtrace->config.perpkt_threads = *((int *) value);
2145                        return 1;
2146                case TRACE_OPTION_TRACETIME:
2147                        if(*((int *) value))
2148                                libtrace->tracetime = 1;
2149                        else
2150                                libtrace->tracetime = 0;
2151                        return 0;
2152                case TRACE_OPTION_SET_CONFIG:
2153                        libtrace->config = *((struct user_configuration *) value);
2154                case TRACE_OPTION_GET_CONFIG:
2155                        *((struct user_configuration *) value) = libtrace->config;
2156        }
2157        return 0;
2158}
2159
2160static bool config_bool_parse(char *value, size_t nvalue) {
2161        if (strncmp(value, "true", nvalue) == 0)
2162                return true;
2163        else if (strncmp(value, "false", nvalue) == 0)
2164                return false;
2165        else
2166                return strtoll(value, NULL, 10) != 0;
2167}
2168
2169static void config_string(struct user_configuration *uc, char *key, size_t nkey, char *value, size_t nvalue) {
2170        assert(key);
2171        assert(value);
2172        assert(uc);
2173        if (strncmp(key, "packet_cache_size", nkey) == 0
2174            || strncmp(key, "pcs", nkey) == 0) {
2175                uc->packet_cache_size = strtoll(value, NULL, 10);
2176        } else if (strncmp(key, "packet_thread_cache_size", nkey) == 0
2177                   || strncmp(key, "ptcs", nkey) == 0) {
2178                uc->packet_thread_cache_size = strtoll(value, NULL, 10);
2179        } else if (strncmp(key, "fixed_packet_count", nkey) == 0
2180                   || strncmp(key, "fpc", nkey) == 0) {
2181                uc->fixed_packet_count = config_bool_parse(value, nvalue);
2182        } else if (strncmp(key, "burst_size", nkey) == 0
2183                   || strncmp(key, "bs", nkey) == 0) {
2184                uc->burst_size = strtoll(value, NULL, 10);
2185        } else if (strncmp(key, "tick_interval", nkey) == 0
2186                   || strncmp(key, "ti", nkey) == 0) {
2187                uc->tick_interval = strtoll(value, NULL, 10);
2188        } else if (strncmp(key, "tick_count", nkey) == 0
2189                   || strncmp(key, "tc", nkey) == 0) {
2190                uc->tick_count = strtoll(value, NULL, 10);
2191        } else if (strncmp(key, "perpkt_threads", nkey) == 0
2192                   || strncmp(key, "pt", nkey) == 0) {
2193                uc->perpkt_threads = strtoll(value, NULL, 10);
2194        } else if (strncmp(key, "hasher_queue_size", nkey) == 0
2195                   || strncmp(key, "hqs", nkey) == 0) {
2196                uc->hasher_queue_size = strtoll(value, NULL, 10);
2197        } else if (strncmp(key, "hasher_polling", nkey) == 0
2198                   || strncmp(key, "hp", nkey) == 0) {
2199                uc->hasher_polling = config_bool_parse(value, nvalue);
2200        } else if (strncmp(key, "reporter_polling", nkey) == 0
2201                   || strncmp(key, "rp", nkey) == 0) {
2202                uc->reporter_polling = config_bool_parse(value, nvalue);
2203        } else if (strncmp(key, "reporter_thold", nkey) == 0
2204                   || strncmp(key, "rt", nkey) == 0) {
2205                uc->reporter_thold = strtoll(value, NULL, 10);
2206        } else if (strncmp(key, "debug_state", nkey) == 0
2207                   || strncmp(key, "ds", nkey) == 0) {
2208                uc->debug_state = config_bool_parse(value, nvalue);
2209        } else {
2210                fprintf(stderr, "No matching value %s(=%s)\n", key, value);
2211        }
2212}
2213
2214DLLEXPORT void parse_user_config(struct user_configuration* uc, char * str) {
2215        char *pch;
2216        char key[100];
2217        char value[100];
2218        assert(str);
2219        assert(uc);
2220        pch = strtok (str," ,.-");
2221        while (pch != NULL)
2222        {
2223                if (sscanf(pch, "%99[^=]=%99s", key, value) == 2) {
2224                        config_string(uc, key, sizeof(key), value, sizeof(value));
2225                } else {
2226                        fprintf(stderr, "Error parsing %s\n", pch);
2227                }
2228                pch = strtok (NULL," ,.-");
2229        }
2230}
2231
2232DLLEXPORT void parse_user_config_file(struct user_configuration* uc, FILE *file) {
2233        char line[1024];
2234        while (fgets(line, sizeof(line), file) != NULL)
2235        {
2236                parse_user_config(uc, line);
2237        }
2238}
2239
2240DLLEXPORT void trace_free_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
2241        assert(packet);
2242        /* Always release any resources this might be holding */
2243        trace_fin_packet(packet);
2244        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packet, 1, 1);
2245}
2246
2247DLLEXPORT libtrace_info_t *trace_get_information(libtrace_t * libtrace) {
2248        if (libtrace->format)
2249                return &libtrace->format->info;
2250        else
2251                return NULL;
2252}
Note: See TracBrowser for help on using the repository browser.