source: lib/trace_parallel.c @ 5ab626a

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 5ab626a was 5ab626a, checked in by Richard Sanger <rsangerarj@…>, 7 years ago

Deprecate trace_get_filtered/accepted/recevied/dropped() in favour of a single function

Adds the single trace_get_statistics function. This allows the structure to be filled
at a point in time, rather than making multiple calls to the library during which state
might have changed.

This has been designed such that the structure can be added to in the future without
breaking old code.

The old internal get_captured_packets was removed from the formats as it was never used.
Eventually we should completely remove get_filtered and received from the formats and replace
them with get_statistics.

In additon some extra fields have added, such as error and captured and the pre-existing
fields are better defined.

The linux formats have been updated to use this new API, which combined with reading
/proc/net/dev returns a full set of statistics.

  • Property mode set to 100644
File size: 79.1 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * All rights reserved.
8 *
9 * This code has been developed by the University of Waikato WAND
10 * research group. For further information please see http://www.wand.net.nz/
11 *
12 * libtrace is free software; you can redistribute it and/or modify
13 * it under the terms of the GNU General Public License as published by
14 * the Free Software Foundation; either version 2 of the License, or
15 * (at your option) any later version.
16 *
17 * libtrace is distributed in the hope that it will be useful,
18 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 * GNU General Public License for more details.
21 *
22 * You should have received a copy of the GNU General Public License
23 * along with libtrace; if not, write to the Free Software
24 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25 *
26 * $Id$
27 *
28 */
29
30
31#define _GNU_SOURCE
32#include "common.h"
33#include "config.h"
34#include <assert.h>
35#include <errno.h>
36#include <fcntl.h>
37#include <stdio.h>
38#include <stdlib.h>
39#include <string.h>
40#include <sys/stat.h>
41#include <sys/types.h>
42#ifndef WIN32
43#include <sys/socket.h>
44#endif
45#include <stdarg.h>
46#include <sys/param.h>
47
48#ifdef HAVE_LIMITS_H
49#  include <limits.h>
50#endif
51
52#ifdef HAVE_SYS_LIMITS_H
53#  include <sys/limits.h>
54#endif
55
56#ifdef HAVE_NET_IF_ARP_H
57#  include <net/if_arp.h>
58#endif
59
60#ifdef HAVE_NET_IF_H
61#  include <net/if.h>
62#endif
63
64#ifdef HAVE_NETINET_IN_H
65#  include <netinet/in.h>
66#endif
67
68#ifdef HAVE_NET_ETHERNET_H
69#  include <net/ethernet.h>
70#endif
71
72#ifdef HAVE_NETINET_IF_ETHER_H
73#  include <netinet/if_ether.h>
74#endif
75
76#include <time.h>
77#ifdef WIN32
78#include <sys/timeb.h>
79#endif
80
81#include "libtrace.h"
82#include "libtrace_int.h"
83
84#ifdef HAVE_PCAP_BPF_H
85#  include <pcap-bpf.h>
86#else
87#  ifdef HAVE_NET_BPF_H
88#    include <net/bpf.h>
89#  endif
90#endif
91
92
93#include "libtrace_int.h"
94#include "format_helper.h"
95#include "rt_protocol.h"
96#include "hash_toeplitz.h"
97#include "combiners.h"
98
99#include <pthread.h>
100#include <signal.h>
101#include <unistd.h>
102
103static inline int delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t);
104extern int libtrace_parallel;
105
106struct multithreading_stats {
107        uint64_t full_queue_hits;
108        uint64_t wait_for_fill_complete_hits;
109} contention_stats[1024];
110
111struct mem_stats {
112        struct memfail {
113           uint64_t cache_hit;
114           uint64_t ring_hit;
115           uint64_t miss;
116           uint64_t recycled;
117        } readbulk, read, write, writebulk;
118};
119
120// Grrr gcc wants this spelt out
121__thread struct mem_stats mem_hits = {{0},{0},{0},{0}};
122
123static void print_memory_stats() {
124#if 0
125        char t_name[50];
126        uint64_t total;
127        pthread_getname_np(pthread_self(), t_name, sizeof(t_name));
128
129        fprintf(stderr, "Thread ID#%d - %s\n", (int) pthread_self(), t_name);
130
131        total = mem_hits.read.cache_hit + mem_hits.read.ring_hit + mem_hits.read.miss;
132        if (total) {
133                fprintf(stderr, "\tRead:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
134                                mem_hits.read.cache_hit, mem_hits.read.ring_hit, mem_hits.read.miss, mem_hits.read.recycled);
135                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
136                                total, (double) mem_hits.read.miss / (double) total * 100.0);
137        }
138
139        total = mem_hits.readbulk.cache_hit + mem_hits.readbulk.ring_hit + mem_hits.readbulk.miss;
140        if (total) {
141                fprintf(stderr, "\tReadbulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
142                                mem_hits.readbulk.cache_hit, mem_hits.readbulk.ring_hit, mem_hits.readbulk.miss, mem_hits.readbulk.recycled);
143
144
145                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
146                                total, (double) mem_hits.readbulk.miss / (double) total * 100.0);
147        }
148
149        total = mem_hits.write.cache_hit + mem_hits.write.ring_hit + mem_hits.write.miss;
150        if (total) {
151                fprintf(stderr, "\tWrite:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
152                                mem_hits.write.cache_hit, mem_hits.write.ring_hit, mem_hits.write.miss, mem_hits.write.recycled);
153
154                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
155                                total, (double) mem_hits.write.miss / (double) total * 100.0);
156        }
157
158        total = mem_hits.writebulk.cache_hit + mem_hits.writebulk.ring_hit + mem_hits.writebulk.miss;
159        if (total) {
160                fprintf(stderr, "\tWritebulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
161                                mem_hits.writebulk.cache_hit, mem_hits.writebulk.ring_hit, mem_hits.writebulk.miss, mem_hits.writebulk.recycled);
162
163                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
164                                total, (double) mem_hits.writebulk.miss / (double) total * 100.0);
165        }
166#endif
167}
168
169/**
170 * True if the trace has dedicated hasher thread otherwise false.
171 * This can be used once the hasher thread has been started.
172 */
173static inline int trace_has_dedicated_hasher(libtrace_t * libtrace)
174{
175        return libtrace->hasher_thread.type == THREAD_HASHER;
176}
177
178/**
179 * True if the trace has dedicated hasher thread otherwise false,
180 * to be used after the trace is running
181 */
182static inline int trace_has_dedicated_reporter(libtrace_t * libtrace)
183{
184        assert(libtrace->state != STATE_NEW);
185        return libtrace->reporter_thread.type == THREAD_REPORTER && libtrace->reporter;
186}
187
188/**
189 * When running the number of perpkt threads in use.
190 * TODO what if the trace is not running yet, or has finished??
191 *
192 * @brief libtrace_perpkt_thread_nb
193 * @param t The trace
194 * @return
195 */
196DLLEXPORT int libtrace_get_perpkt_count(libtrace_t * t) {
197        return t->perpkt_thread_count;
198}
199
200/**
201 * Changes a thread's state and broadcasts the condition variable. This
202 * should always be done when the lock is held.
203 *
204 * Additionally for perpkt threads the state counts are updated.
205 *
206 * @param trace A pointer to the trace
207 * @param t A pointer to the thread to modify
208 * @param new_state The new state of the thread
209 * @param need_lock Set to true if libtrace_lock is not held, otherwise
210 *        false in the case the lock is currently held by this thread.
211 */
212static inline void thread_change_state(libtrace_t *trace, libtrace_thread_t *t,
213        const enum thread_states new_state, const bool need_lock)
214{
215        enum thread_states prev_state;
216        if (need_lock)
217                pthread_mutex_lock(&trace->libtrace_lock);
218        prev_state = t->state;
219        t->state = new_state;
220        if (t->type == THREAD_PERPKT) {
221                --trace->perpkt_thread_states[prev_state];
222                ++trace->perpkt_thread_states[new_state];
223        }
224
225        if (trace->config.debug_state)
226                fprintf(stderr, "Thread %d state changed from %d to %d\n",
227                        (int) t->tid, prev_state, t->state);
228
229        pthread_cond_broadcast(&trace->perpkt_cond);
230        if (need_lock)
231                pthread_mutex_unlock(&trace->libtrace_lock);
232}
233
234/**
235 * Changes the overall traces state and signals the condition.
236 *
237 * @param trace A pointer to the trace
238 * @param new_state The new state of the trace
239 * @param need_lock Set to true if libtrace_lock is not held, otherwise
240 *        false in the case the lock is currently held by this thread.
241 */
242static inline void libtrace_change_state(libtrace_t *trace,
243        const enum trace_state new_state, const bool need_lock)
244{
245        UNUSED enum trace_state prev_state;
246        if (need_lock)
247                pthread_mutex_lock(&trace->libtrace_lock);
248        prev_state = trace->state;
249        trace->state = new_state;
250
251        if (trace->config.debug_state)
252                fprintf(stderr, "Trace(%s) state changed from %s to %s\n",
253                        trace->uridata, get_trace_state_name(prev_state),
254                        get_trace_state_name(trace->state));
255
256        pthread_cond_broadcast(&trace->perpkt_cond);
257        if (need_lock)
258                pthread_mutex_unlock(&trace->libtrace_lock);
259}
260
261/**
262 * @return True if the format supports parallel threads.
263 */
264static inline bool trace_supports_parallel(libtrace_t *trace)
265{
266        assert(trace);
267        assert(trace->format);
268        if (trace->format->pstart_input)
269                return true;
270        else
271                return false;
272}
273
274DLLEXPORT void print_contention_stats(libtrace_t *libtrace) {
275        int i;
276        struct multithreading_stats totals = {0};
277        for (i = 0; i < libtrace->perpkt_thread_count ; i++) {
278                fprintf(stderr, "\nStats for perpkt thread#%d\n", i);
279                fprintf(stderr, "\tfull_queue_hits: %"PRIu64"\n", contention_stats[i].full_queue_hits);
280                totals.full_queue_hits += contention_stats[i].full_queue_hits;
281                fprintf(stderr, "\twait_for_fill_complete_hits: %"PRIu64"\n", contention_stats[i].wait_for_fill_complete_hits);
282                totals.wait_for_fill_complete_hits += contention_stats[i].wait_for_fill_complete_hits;
283        }
284        fprintf(stderr, "\nTotals for perpkt threads\n");
285        fprintf(stderr, "\tfull_queue_hits: %"PRIu64"\n", totals.full_queue_hits);
286        fprintf(stderr, "\twait_for_fill_complete_hits: %"PRIu64"\n", totals.wait_for_fill_complete_hits);
287
288        return;
289}
290
291void libtrace_zero_thread(libtrace_thread_t * t) {
292        t->accepted_packets = 0;
293        t->filtered_packets = 0;
294        t->recorded_first = false;
295        t->tracetime_offset_usec = 0;
296        t->user_data = 0;
297        t->format_data = 0;
298        libtrace_zero_ringbuffer(&t->rbuffer);
299        t->trace = NULL;
300        t->ret = NULL;
301        t->type = THREAD_EMPTY;
302        t->perpkt_num = -1;
303}
304
305// Ints are aligned int is atomic so safe to read and write at same time
306// However write must be locked, read doesn't (We never try read before written to table)
307libtrace_thread_t * get_thread_table(libtrace_t *libtrace) {
308        int i = 0;
309        pthread_t tid = pthread_self();
310
311        for (;i<libtrace->perpkt_thread_count ;++i) {
312                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
313                        return &libtrace->perpkt_threads[i];
314        }
315        return NULL;
316}
317
318int get_thread_table_num(libtrace_t *libtrace) {
319        int i = 0;
320        pthread_t tid = pthread_self();
321        for (;i<libtrace->perpkt_thread_count; ++i) {
322                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
323                        return i;
324        }
325        return -1;
326}
327
328static libtrace_thread_t * get_thread_descriptor(libtrace_t *libtrace) {
329        libtrace_thread_t *ret;
330        if (!(ret = get_thread_table(libtrace))) {
331                pthread_t tid = pthread_self();
332                // Check if we are reporter or something else
333                if (pthread_equal(tid, libtrace->reporter_thread.tid))
334                        ret = &libtrace->reporter_thread;
335                else if (pthread_equal(tid, libtrace->hasher_thread.tid))
336                        ret = &libtrace->hasher_thread;
337                else
338                        ret = NULL;
339        }
340        return ret;
341}
342
343/** Makes a packet safe, a packet may become invaild after a
344 * pause (or stop/destroy) of a trace. This copies a packet
345 * in such a way that it will be able to survive a pause.
346 *
347 * However this will not allow the packet to be used after
348 * the format is destroyed. Or while the trace is still paused.
349 */
350DLLEXPORT void libtrace_make_packet_safe(libtrace_packet_t *pkt) {
351        // Duplicate the packet in standard malloc'd memory and free the
352        // original, This is a 1:1 exchange so is ocache count remains unchanged.
353        if (pkt->buf_control != TRACE_CTRL_PACKET) {
354                libtrace_packet_t *dup;
355                dup = trace_copy_packet(pkt);
356                /* Release the external buffer */
357                trace_fin_packet(pkt);
358                /* Copy the duplicated packet over the existing */
359                memcpy(pkt, dup, sizeof(libtrace_packet_t));
360        }
361}
362
363/**
364 * Makes a libtrace_result_t safe, used when pausing a trace.
365 * This will call libtrace_make_packet_safe if the result is
366 * a packet.
367 */
368DLLEXPORT void libtrace_make_result_safe(libtrace_result_t *res) {
369        if (res->type == RESULT_PACKET) {
370                libtrace_make_packet_safe(res->value.pkt);
371        }
372}
373
374/**
375 * Holds threads in a paused state, until released by broadcasting
376 * the condition mutex.
377 */
378static void trace_thread_pause(libtrace_t *trace, libtrace_thread_t *t) {
379        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
380        thread_change_state(trace, t, THREAD_PAUSED, false);
381        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
382                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
383        }
384        thread_change_state(trace, t, THREAD_RUNNING, false);
385        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
386}
387
388/**
389 * Sends a packet to the user, expects either a valid packet or a TICK packet.
390 *
391 * Note READ_MESSAGE will only be returned if tracetime is true.
392 *
393 * @brief dispatch_packet
394 * @param trace
395 * @param t
396 * @param packet A pointer to the packet storage, which may be set to null upon
397 *               return, or a packet to be finished.
398 * @return 0 is successful, otherwise if playing back in tracetime
399 *         READ_MESSAGE(-2) can be returned in which case the packet is not sent.
400 */
401static inline int dispatch_packet(libtrace_t *trace,
402                                                 libtrace_thread_t *t,
403                                                 libtrace_packet_t **packet,
404                                                 bool tracetime) {
405        if ((*packet)->error > 0) {
406                if (tracetime) {
407                        if (delay_tracetime(trace, packet[0], t) == READ_MESSAGE)
408                                return READ_MESSAGE;
409                }
410                *packet = (*trace->per_pkt)(trace, *packet, NULL, t);
411                trace_fin_packet(*packet);
412        } else {
413                libtrace_message_t message;
414                assert((*packet)->error == READ_TICK);
415                message.code = MESSAGE_TICK;
416                message.additional.uint64 = trace_packet_get_order(*packet);
417                message.sender = t;
418                (*trace->per_pkt)(trace, NULL, &message, t);
419        }
420        return 0;
421}
422
423/**
424 * Pauses a per packet thread, messages will not be processed when the thread
425 * is paused.
426 *
427 * This process involves reading packets if a hasher thread is used. As such
428 * this function can fail to pause due to errors when reading in which case
429 * the thread should be stopped instead.
430 *
431 *
432 * @brief trace_perpkt_thread_pause
433 * @return READ_ERROR(-1) or READ_EOF(0) or 1 if successfull
434 */
435static int trace_perpkt_thread_pause(libtrace_t *trace, libtrace_thread_t *t,
436                                     libtrace_packet_t *packets[],
437                                     int *nb_packets, int *empty, int *offset) {
438        libtrace_message_t message = {0};
439        libtrace_packet_t * packet = NULL;
440
441        /* Let the user thread know we are going to pause */
442        message.code = MESSAGE_PAUSING;
443        message.sender = t;
444        (*trace->per_pkt)(trace, NULL, &message, t);
445
446        /* Send through any remaining packets (or messages) without delay */
447
448        /* First send those packets already read, as fast as possible
449         * This should never fail or check for messages etc. */
450        for (;*offset < *nb_packets; ++*offset) {
451                ASSERT_RET(dispatch_packet(trace, t, &packets[*offset], false), == 0);
452                /* Move full slots to front as we go */
453                if (packets[*offset]) {
454                        if (*empty != *offset) {
455                                packets[*empty] = packets[*offset];
456                                packets[*offset] = NULL;
457                        }
458                        ++*empty;
459                }
460        }
461
462        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
463        /* If a hasher thread is running, empty input queues so we don't lose data */
464        if (trace_has_dedicated_hasher(trace)) {
465                fprintf(stderr, "Trace is using a hasher thread emptying queues\n");
466                // The hasher has stopped by this point, so the queue shouldn't be filling
467                while(!libtrace_ringbuffer_is_empty(&t->rbuffer) || t->format_data) {
468                        int ret = trace->pread(trace, t, &packet, 1);
469                        if (ret == 1) {
470                                if (packet->error > 0) {
471                                        store_first_packet(trace, packet, t);
472                                }
473                                ASSERT_RET(dispatch_packet(trace, t, &packet, 1), == 0);
474                                if (packet == NULL)
475                                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
476                        } else if (ret != READ_MESSAGE) {
477                                /* Ignore messages we pick these up next loop */
478                                assert (ret == READ_EOF || ret == READ_ERROR);
479                                /* Verify no packets are remaining */
480                                /* TODO refactor this sanity check out!! */
481                                while (!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
482                                        ASSERT_RET(trace->pread(trace, t, &packet, 1), <= 0);
483                                        // No packets after this should have any data in them
484                                        assert(packet->error <= 0);
485                                }
486                                fprintf(stderr, "PREAD_FAILED %d\n", ret);
487                                libtrace_ocache_free(&trace->packet_freelist, (void **) &packet, 1, 1);
488                                return -1;
489                        }
490                }
491        }
492        libtrace_ocache_free(&trace->packet_freelist, (void **) &packet, 1, 1);
493
494        /* Now we do the actual pause, this returns when we resumed */
495        trace_thread_pause(trace, t);
496        message.code = MESSAGE_RESUMING;
497        (*trace->per_pkt)(trace, NULL, &message, t);
498        return 1;
499}
500
501/**
502 * The is the entry point for our packet processing threads.
503 */
504static void* perpkt_threads_entry(void *data) {
505        libtrace_t *trace = (libtrace_t *)data;
506        libtrace_thread_t *t;
507        libtrace_message_t message = {0};
508        libtrace_packet_t *packets[trace->config.burst_size];
509        size_t i;
510        //int ret;
511        /* The current reading position into the packets */
512        int offset = 0;
513        /* The number of packets last read */
514        int nb_packets = 0;
515        /* The offset to the first NULL packet upto offset */
516        int empty = 0;
517
518        /* Wait until trace_pstart has been completed */
519        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
520        t = get_thread_table(trace);
521        assert(t);
522        if (trace->state == STATE_ERROR) {
523                thread_change_state(trace, t, THREAD_FINISHED, false);
524                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
525                pthread_exit(NULL);
526        }
527        //printf("Yay Started perpkt thread #%d\n", (int) get_thread_table_num(trace));
528        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
529
530        if (trace->format->pregister_thread) {
531                trace->format->pregister_thread(trace, t, !trace_has_dedicated_hasher(trace));
532        }
533
534        /* Fill our buffer with empty packets */
535        memset(&packets, 0, sizeof(void*) * trace->config.burst_size);
536        libtrace_ocache_alloc(&trace->packet_freelist, (void **) packets,
537                              trace->config.burst_size,
538                              trace->config.burst_size);
539
540        /* ~~~~~~~~~~~ Setup complete now we loop ~~~~~~~~~~~~~~~ */
541        // Send a message to say we've started
542
543        // Let the per_packet function know we have started
544        message.code = MESSAGE_STARTING;
545        message.sender = t;
546        (*trace->per_pkt)(trace, NULL, &message, t);
547        message.code = MESSAGE_RESUMING;
548        (*trace->per_pkt)(trace, NULL, &message, t);
549
550
551        for (;;) {
552
553                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
554                        int ret;
555                        switch (message.code) {
556                                case MESSAGE_DO_PAUSE: // This is internal
557                                        ret = trace_perpkt_thread_pause(trace, t, packets, &nb_packets, &empty, &offset);
558                                        if (ret == READ_EOF) {
559                                                fprintf(stderr, "PAUSE stop eof!!\n");
560                                                goto eof;
561                                        } else if (ret == READ_ERROR) {
562                                                fprintf(stderr, "PAUSE stop error!!\n");
563                                                goto error;
564                                        }
565                                        assert(ret == 1);
566                                        continue;
567                                case MESSAGE_DO_STOP: // This is internal
568                                        fprintf(stderr, "DO_STOP stop!!\n");
569                                        goto eof;
570                        }
571                        (*trace->per_pkt)(trace, NULL, &message, t);
572                        /* Continue and the empty messages out before packets */
573                        continue;
574                }
575
576
577                /* Do we need to read a new set of packets MOST LIKELY we do */
578                if (offset == nb_packets) {
579                        /* Refill the packet buffer */
580                        if (empty != nb_packets) {
581                                // Refill the empty packets
582                                libtrace_ocache_alloc(&trace->packet_freelist,
583                                                      (void **) &packets[empty],
584                                                      nb_packets - empty,
585                                                      nb_packets - empty);
586                        }
587                        if (!trace->pread) {
588                                assert(packets[0]);
589                                nb_packets = trace_read_packet(trace, packets[0]);
590                                packets[0]->error = nb_packets;
591                                if (nb_packets > 0)
592                                        nb_packets = 1;
593                        } else {
594                                nb_packets = trace->pread(trace, t, packets, trace->config.burst_size);
595                        }
596                        offset = 0;
597                        empty = 0;
598                }
599
600                /* Handle error/message cases */
601                if (nb_packets > 0) {
602                        /* Store the first packet */
603                        if (packets[0]->error > 0) {
604                                store_first_packet(trace, packets[0], t);
605                        }
606                        for (;offset < nb_packets; ++offset) {
607                                int ret;
608                                ret = dispatch_packet(trace, t, &packets[offset], trace->tracetime);
609                                if (ret == 0) {
610                                        /* Move full slots to front as we go */
611                                        if (packets[offset]) {
612                                                if (empty != offset) {
613                                                        packets[empty] = packets[offset];
614                                                        packets[offset] = NULL;
615                                                }
616                                                ++empty;
617                                        }
618                                } else {
619                                        assert(ret == READ_MESSAGE);
620                                        /* Loop around and process the message, note */
621                                        continue;
622                                }
623                        }
624                } else {
625                        switch (nb_packets) {
626                        case READ_EOF:
627                                fprintf(stderr, "EOF stop %d!!\n", nb_packets);
628                                goto eof;
629                        case READ_ERROR:
630                                fprintf(stderr, "ERROR stop %d!!\n", nb_packets);
631                                goto error;
632                        case READ_MESSAGE:
633                                nb_packets = 0;
634                                continue;
635                        default:
636                                fprintf(stderr, "Unexpected error %d!!\n", nb_packets);
637                                goto error;
638                        }
639                }
640
641        }
642
643error:
644        fprintf(stderr, "An error occured in trace\n");
645eof:
646        fprintf(stderr, "An eof occured in trace\n");
647        /* ~~~~~~~~~~~~~~ Trace is finished do tear down ~~~~~~~~~~~~~~~~~~~~~ */
648
649        // Let the per_packet function know we have stopped
650        message.code = MESSAGE_PAUSING;
651        message.sender = t;
652        (*trace->per_pkt)(trace, NULL, &message, t);
653        message.code = MESSAGE_STOPPING;
654        message.additional.uint64 = 0;
655        (*trace->per_pkt)(trace, NULL, &message, t);
656
657        // Free any remaining packets
658        for (i = 0; i < trace->config.burst_size; i++) {
659                if (packets[i]) {
660                        libtrace_ocache_free(&trace->packet_freelist, (void **) &packets[i], 1, 1);
661                        packets[i] = NULL;
662                }
663        }
664
665
666        thread_change_state(trace, t, THREAD_FINISHED, true);
667
668        // Notify only after we've defiantly set the state to finished
669        message.code = MESSAGE_PERPKT_ENDED;
670        message.additional.uint64 = 0;
671        trace_send_message_to_reporter(trace, &message);
672
673        // Release all ocache memory before unregistering with the format
674        // because this might(it does in DPDK) unlink the formats mempool
675        // causing destroy/finish packet to fail.
676        libtrace_ocache_unregister_thread(&trace->packet_freelist);
677        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
678        if (trace->format->punregister_thread) {
679                trace->format->punregister_thread(trace, t);
680        }
681        print_memory_stats();
682
683        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
684
685        pthread_exit(NULL);
686};
687
688/**
689 * The start point for our single threaded hasher thread, this will read
690 * and hash a packet from a data source and queue it against the correct
691 * core to process it.
692 */
693static void* hasher_entry(void *data) {
694        libtrace_t *trace = (libtrace_t *)data;
695        libtrace_thread_t * t;
696        int i;
697        libtrace_packet_t * packet;
698        libtrace_message_t message = {0};
699        int pkt_skipped = 0;
700
701        assert(trace_has_dedicated_hasher(trace));
702        /* Wait until all threads are started and objects are initialised (ring buffers) */
703        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
704        t = &trace->hasher_thread;
705        assert(t->type == THREAD_HASHER && pthread_equal(pthread_self(), t->tid));
706        if (trace->state == STATE_ERROR) {
707                thread_change_state(trace, t, THREAD_FINISHED, false);
708                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
709                pthread_exit(NULL);
710        }
711
712        printf("Hasher Thread started\n");
713        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
714
715        if (trace->format->pregister_thread) {
716                trace->format->pregister_thread(trace, t, true);
717        }
718
719        /* Read all packets in then hash and queue against the correct thread */
720        while (1) {
721                int thread;
722                if (!pkt_skipped)
723                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
724                assert(packet);
725
726                if (libtrace_halt) // Signal to die has been sent - TODO
727                        break;
728
729                // Check for messages that we expect MESSAGE_DO_PAUSE, (internal messages only)
730                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
731                        switch(message.code) {
732                                case MESSAGE_DO_PAUSE:
733                                        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
734                                        thread_change_state(trace, t, THREAD_PAUSED, false);
735                                        pthread_cond_broadcast(&trace->perpkt_cond);
736                                        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
737                                                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
738                                        }
739                                        thread_change_state(trace, t, THREAD_RUNNING, false);
740                                        pthread_cond_broadcast(&trace->perpkt_cond);
741                                        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
742                                        break;
743                                case MESSAGE_DO_STOP:
744                                        // Stop called after pause
745                                        assert(trace->started == false);
746                                        assert(trace->state == STATE_FINSHED);
747                                        break;
748                                default:
749                                        fprintf(stderr, "Hasher thread didn't expect message code=%d\n", message.code);
750                        }
751                        pkt_skipped = 1;
752                        continue;
753                }
754
755                if ((packet->error = trace_read_packet(trace, packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
756                        break; /* We are EOF or error'd either way we stop  */
757                }
758
759                /* We are guaranteed to have a hash function i.e. != NULL */
760                trace_packet_set_hash(packet, (*trace->hasher)(packet, trace->hasher_data));
761                thread = trace_packet_get_hash(packet) % trace->perpkt_thread_count;
762                /* Blocking write to the correct queue - I'm the only writer */
763                if (trace->perpkt_threads[thread].state != THREAD_FINISHED) {
764                        uint64_t order = trace_packet_get_order(packet);
765                        libtrace_ringbuffer_write(&trace->perpkt_threads[thread].rbuffer, packet);
766                        if (trace->config.tick_count && order % trace->config.tick_count == 0) {
767                                // Write ticks to everyone else
768                                libtrace_packet_t * pkts[trace->perpkt_thread_count];
769                                memset(pkts, 0, sizeof(void *) * trace->perpkt_thread_count);
770                                libtrace_ocache_alloc(&trace->packet_freelist, (void **) pkts, trace->perpkt_thread_count, trace->perpkt_thread_count);
771                                for (i = 0; i < trace->perpkt_thread_count; i++) {
772                                        pkts[i]->error = READ_TICK;
773                                        trace_packet_set_order(pkts[i], order);
774                                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, pkts[i]);
775                                }
776                        }
777                        pkt_skipped = 0;
778                } else {
779                        assert(!"Dropping a packet!!");
780                        pkt_skipped = 1; // Reuse that packet no one read it
781                }
782        }
783
784        /* Broadcast our last failed read to all threads */
785        for (i = 0; i < trace->perpkt_thread_count; i++) {
786                libtrace_packet_t * bcast;
787                fprintf(stderr, "Broadcasting error/EOF now the trace is over\n");
788                if (i == trace->perpkt_thread_count - 1) {
789                        bcast = packet;
790                } else {
791                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &bcast, 1, 1);
792                        bcast->error = packet->error;
793                }
794                ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
795                if (trace->perpkt_threads[i].state != THREAD_FINISHED) {
796                        // Unlock early otherwise we could deadlock
797                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, bcast);
798                        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
799                } else {
800                        fprintf(stderr, "SKIPPING THREAD !!!%d!!!/n", (int) i);
801                        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
802                }
803        }
804
805        // We don't need to free the packet
806        thread_change_state(trace, t, THREAD_FINISHED, true);
807
808        // Notify only after we've defiantly set the state to finished
809        message.code = MESSAGE_PERPKT_ENDED;
810        message.additional.uint64 = 0;
811        trace_send_message_to_reporter(trace, &message);
812        libtrace_ocache_unregister_thread(&trace->packet_freelist);
813        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
814        if (trace->format->punregister_thread) {
815                trace->format->punregister_thread(trace, t);
816        }
817        print_memory_stats();
818        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
819
820        // TODO remove from TTABLE t sometime
821        pthread_exit(NULL);
822};
823
824/**
825 * Moves src into dest(Complete copy) and copies the memory buffer and
826 * its flags from dest into src ready for reuse without needing extra mallocs.
827 */
828static inline void swap_packets(libtrace_packet_t *dest, libtrace_packet_t *src) {
829        // Save the passed in buffer status
830        assert(dest->trace == NULL); // Must be a empty packet
831        void * temp_buf = dest->buffer;
832        buf_control_t temp_buf_control = dest->buf_control;
833        // Completely copy StoredPacket into packet
834        memcpy(dest, src, sizeof(libtrace_packet_t));
835        // Set the buffer settings on the returned packet
836        src->buffer = temp_buf;
837        src->buf_control = temp_buf_control;
838        src->trace = NULL;
839}
840
841/* Our simplest case when a thread becomes ready it can obtain an exclusive
842 * lock to read packets from the underlying trace.
843 */
844static int trace_pread_packet_first_in_first_served(libtrace_t *libtrace,
845                                                    libtrace_thread_t *t,
846                                                    libtrace_packet_t *packets[],
847                                                    size_t nb_packets) {
848        size_t i = 0;
849        //bool tick_hit = false;
850
851        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
852        /* Read nb_packets */
853        for (i = 0; i < nb_packets; ++i) {
854                packets[i]->error = trace_read_packet(libtrace, packets[i]);
855
856                if (packets[i]->error <= 0) {
857                        /* We'll catch this next time if we have already got packets */
858                        if ( i==0 ) {
859                                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
860                                return packets[i]->error;
861                        } else {
862                                break;
863                        }
864                }
865                /*
866                if (libtrace->config.tick_count && trace_packet_get_order(packets[i]) % libtrace->config.tick_count == 0) {
867                        tick_hit = true;
868                }*/
869        }
870        // Doing this inside the lock ensures the first packet is always
871        // recorded first
872        if (packets[0]->error > 0) {
873                store_first_packet(libtrace, packets[0], t);
874        }
875        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
876        /* XXX TODO this needs to be inband with packets, or we don't bother in this case
877        if (tick_hit) {
878                libtrace_message_t tick;
879                tick.additional.uint64 = trace_packet_get_order(packets[i]);
880                tick.code = MESSAGE_TICK;
881                trace_send_message_to_perpkts(libtrace, &tick);
882        } */
883        return i;
884}
885
886/**
887 * For the case that we have a dedicated hasher thread
888 * 1. We read a packet from our buffer
889 * 2. Move that into the packet provided (packet)
890 */
891inline static int trace_pread_packet_hasher_thread(libtrace_t *libtrace,
892                                                      libtrace_thread_t *t,
893                                                      libtrace_packet_t *packets[],
894                                                      size_t nb_packets) {
895        size_t i;
896
897        /* We store the last error message here */
898        if (t->format_data) {
899                fprintf(stderr, "Hit me, ohh yeah got error %d\n",
900                        ((libtrace_packet_t *)t->format_data)->error);
901                return ((libtrace_packet_t *)t->format_data)->error;
902        }
903
904        // Always grab at least one
905        if (packets[0]) // Recycle the old get the new
906                libtrace_ocache_free(&libtrace->packet_freelist, (void **) packets, 1, 1);
907        packets[0] = libtrace_ringbuffer_read(&t->rbuffer);
908
909        if (packets[0]->error <= 0 && packets[0]->error != READ_TICK) {
910                fprintf(stderr, "Hit me, ohh yeah returning error %d\n", packets[0]->error);
911                return packets[0]->error;
912        }
913
914        for (i = 1; i < nb_packets; i++) {
915                if (packets[i]) // Recycle the old get the new
916                        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packets[i], 1, 1);
917                if (!libtrace_ringbuffer_try_read(&t->rbuffer, (void **) &packets[i])) {
918                        packets[i] = NULL;
919                        break;
920                }
921
922                /* We will return an error or EOF the next time around */
923                if (packets[i]->error <= 0 && packets[0]->error != READ_TICK) {
924                        /* The message case will be checked automatically -
925                           However other cases like EOF and error will only be
926                           sent once*/
927                        if (packets[i]->error != READ_MESSAGE) {
928                                assert(t->format_data == NULL);
929                                t->format_data = packets[i];
930                                fprintf(stderr, "Hit me, ohh yeah set error %d\n",
931                                        ((libtrace_packet_t *)t->format_data)->error);
932                        }
933                        break;
934                }
935        }
936
937        return i;
938}
939
940/**
941 * Tries to read from our queue and returns 1 if a packet was retrieved
942 */
943static inline int try_waiting_queue(libtrace_t *libtrace, libtrace_thread_t * t, libtrace_packet_t **packet, int * ret)
944{
945        libtrace_packet_t* retrived_packet;
946
947        /* Lets see if we have one waiting */
948        if (libtrace_ringbuffer_try_read(&t->rbuffer, (void **) &retrived_packet)) {
949                /* Copy paste from trace_pread_packet_hasher_thread() except that we try read (non-blocking) */
950                assert(retrived_packet);
951
952                if (*packet) // Recycle the old get the new
953                        libtrace_ocache_free(&libtrace->packet_freelist, (void **) packet, 1, 1);
954                *packet = retrived_packet;
955                *ret = (*packet)->error;
956                return 1;
957        }
958        return 0;
959}
960
961/**
962 * Allows us to ensure all threads are finished writing to our threads ring_buffer
963 * before returning EOF/error.
964 */
965inline static int trace_handle_finishing_perpkt(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t)
966{
967        /* We are waiting for the condition that another thread ends to check
968         * our queue for new data, once all threads end we can go to finished */
969        bool complete = false;
970        int ret;
971
972        do {
973                // Wait for a thread to end
974                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
975
976                // Check before
977                if (libtrace->perpkt_thread_states[THREAD_FINISHING] == libtrace->perpkt_thread_count) {
978                        complete = true;
979                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
980                        continue;
981                }
982
983                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
984
985                // Check after
986                if (libtrace->perpkt_thread_states[THREAD_FINISHING] == libtrace->perpkt_thread_count) {
987                        complete = true;
988                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
989                        continue;
990                }
991
992                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
993
994                // Always trying to keep our buffer empty for the unlikely case more threads than buffer space want to write into our queue
995                if(try_waiting_queue(libtrace, t, packet, &ret))
996                        return ret;
997        } while (!complete);
998
999        // We can only end up here once all threads complete
1000        try_waiting_queue(libtrace, t, packet, &ret);
1001
1002        return ret;
1003        // TODO rethink this logic fix bug here
1004}
1005
1006/**
1007 * Expects the libtrace_lock to not be held
1008 */
1009inline static int trace_finish_perpkt(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t)
1010{
1011        thread_change_state(libtrace, t, THREAD_FINISHING, true);
1012        return trace_handle_finishing_perpkt(libtrace, packet, t);
1013}
1014
1015/**
1016 * This case is much like the dedicated hasher, except that we will become
1017 * hasher if we don't have a a packet waiting.
1018 *
1019 * Note: This is only every used if we have are doing hashing.
1020 *
1021 * TODO: Can block on zero copy formats such as ring: and dpdk: if the
1022 * queue sizes in total are larger than the ring size.
1023 *
1024 * 1. We read a packet from our buffer
1025 * 2. Move that into the packet provided (packet)
1026 */
1027inline static int trace_pread_packet_hash_locked(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packet)
1028{
1029        int thread, ret/*, psize*/;
1030
1031        while (1) {
1032                if(try_waiting_queue(libtrace, t, packet, &ret))
1033                        return ret;
1034                // Can still block here if another thread is writing to a full queue
1035                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1036
1037                // Its impossible for our own queue to overfill, because no one can write
1038                // when we are in the lock
1039                if(try_waiting_queue(libtrace, t, packet, &ret)) {
1040                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1041                        return ret;
1042                }
1043
1044                // Another thread cannot write a packet because a queue has filled up. Is it ours?
1045                if (libtrace->perpkt_queue_full) {
1046                        contention_stats[t->perpkt_num].wait_for_fill_complete_hits++;
1047                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1048                        continue;
1049                }
1050
1051                if (!*packet)
1052                        libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) packet, 1, 1);
1053                assert(*packet);
1054
1055                // If we fail here we can guarantee that our queue is empty (and no new data will be added because we hold the lock)
1056                if (libtrace_halt || ((*packet)->error = trace_read_packet(libtrace, *packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
1057                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1058                        if (libtrace_halt)
1059                                return 0;
1060                        else
1061                                return (*packet)->error;
1062                }
1063
1064                trace_packet_set_hash(*packet, (*libtrace->hasher)(*packet, libtrace->hasher_data));
1065                thread = trace_packet_get_hash(*packet) % libtrace->perpkt_thread_count;
1066                if (thread == t->perpkt_num) {
1067                        // If it's this thread we must be in order because we checked the buffer once we got the lock
1068                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1069                        return (*packet)->error;
1070                }
1071
1072                if (libtrace->perpkt_threads[thread].state != THREAD_FINISHED) {
1073                        while (!libtrace_ringbuffer_try_swrite_bl(&libtrace->perpkt_threads[thread].rbuffer, *packet)) {
1074                                libtrace->perpkt_queue_full = true;
1075                                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1076                                contention_stats[t->perpkt_num].full_queue_hits++;
1077                                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1078                        }
1079                        *packet = NULL;
1080                        libtrace->perpkt_queue_full = false;
1081                } else {
1082                        /* We can get here if the user closes the thread before natural completion/or error */
1083                        assert (!"packet_hash_locked() The user terminated the trace in a abnormal manner");
1084                }
1085                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1086        }
1087}
1088
1089/**
1090 * For the first packet of each queue we keep a copy and note the system
1091 * time it was received at.
1092 *
1093 * This is used for finding the first packet when playing back a trace
1094 * in trace time. And can be used by real time applications to print
1095 * results out every XXX seconds.
1096 */
1097void store_first_packet(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t)
1098{
1099        if (!t->recorded_first) {
1100                struct timeval tv;
1101                libtrace_packet_t * dup;
1102                // For what it's worth we can call these outside of the lock
1103                gettimeofday(&tv, NULL);
1104                dup = trace_copy_packet(packet);
1105                ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
1106                libtrace->first_packets.packets[t->perpkt_num].packet = dup;
1107                //printf("Stored first packet time=%f\n", trace_get_seconds(dup));
1108                memcpy(&libtrace->first_packets.packets[t->perpkt_num].tv, &tv, sizeof(tv));
1109                // Now update the first
1110                libtrace->first_packets.count++;
1111                if (libtrace->first_packets.count == 1) {
1112                        // We the first entry hence also the first known packet
1113                        libtrace->first_packets.first = t->perpkt_num;
1114                } else {
1115                        // Check if we are newer than the previous 'first' packet
1116                        size_t first = libtrace->first_packets.first;
1117                        if (trace_get_seconds(dup) <
1118                                trace_get_seconds(libtrace->first_packets.packets[first].packet))
1119                                libtrace->first_packets.first = t->perpkt_num;
1120                }
1121                ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
1122                libtrace_message_t mesg = {0};
1123                mesg.code = MESSAGE_FIRST_PACKET;
1124                trace_send_message_to_reporter(libtrace, &mesg);
1125                t->recorded_first = true;
1126        }
1127}
1128
1129/**
1130 * Returns 1 if it's certain that the first packet is truly the first packet
1131 * rather than a best guess based upon threads that have published so far.
1132 * Otherwise 0 is returned.
1133 * It's recommended that this result is stored rather than calling this
1134 * function again.
1135 */
1136DLLEXPORT int retrive_first_packet(libtrace_t *libtrace, libtrace_packet_t **packet, struct timeval **tv)
1137{
1138        int ret = 0;
1139        ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
1140        if (libtrace->first_packets.count) {
1141                *packet = libtrace->first_packets.packets[libtrace->first_packets.first].packet;
1142                *tv = &libtrace->first_packets.packets[libtrace->first_packets.first].tv;
1143                if (libtrace->first_packets.count == (size_t) libtrace->perpkt_thread_count) {
1144                        ret = 1;
1145                } else {
1146                        struct timeval curr_tv;
1147                        // If a second has passed since the first entry we will assume this is the very first packet
1148                        gettimeofday(&curr_tv, NULL);
1149                        if (curr_tv.tv_sec > (*tv)->tv_sec) {
1150                                if(curr_tv.tv_usec > (*tv)->tv_usec || curr_tv.tv_sec - (*tv)->tv_sec > 1) {
1151                                        ret = 1;
1152                                }
1153                        }
1154                }
1155        } else {
1156                *packet = NULL;
1157                *tv = NULL;
1158        }
1159        ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
1160        return ret;
1161}
1162
1163
1164DLLEXPORT uint64_t tv_to_usec(struct timeval *tv)
1165{
1166        return (uint64_t) tv->tv_sec*1000000ull + (uint64_t) tv->tv_usec;
1167}
1168
1169inline static struct timeval usec_to_tv(uint64_t usec)
1170{
1171        struct timeval tv;
1172        tv.tv_sec = usec / 1000000;
1173        tv.tv_usec = usec % 1000000;
1174        return tv;
1175}
1176
1177/** Similar to delay_tracetime but send messages to all threads periodically */
1178static void* reporter_entry(void *data) {
1179        libtrace_message_t message = {0};
1180        libtrace_t *trace = (libtrace_t *)data;
1181        libtrace_thread_t *t = &trace->reporter_thread;
1182
1183        fprintf(stderr, "Reporter thread starting\n");
1184
1185        /* Wait until all threads are started */
1186        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
1187        if (trace->state == STATE_ERROR) {
1188                thread_change_state(trace, t, THREAD_FINISHED, false);
1189                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1190                pthread_exit(NULL);
1191        }
1192        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1193
1194        message.code = MESSAGE_STARTING;
1195        message.sender = t;
1196        (*trace->reporter)(trace, NULL, &message);
1197        message.code = MESSAGE_RESUMING;
1198        (*trace->reporter)(trace, NULL, &message);
1199
1200        while (!trace_finished(trace)) {
1201                if (trace->config.reporter_polling) {
1202                        if (libtrace_message_queue_try_get(&t->messages, &message) == LIBTRACE_MQ_FAILED)
1203                                message.code = MESSAGE_POST_REPORTER;
1204                } else {
1205                        libtrace_message_queue_get(&t->messages, &message);
1206                }
1207                switch (message.code) {
1208                        // Check for results
1209                        case MESSAGE_POST_REPORTER:
1210                                trace->combiner.read(trace, &trace->combiner);
1211                                break;
1212                        case MESSAGE_DO_PAUSE:
1213                                assert(trace->combiner.pause);
1214                                trace->combiner.pause(trace, &trace->combiner);
1215                                message.code = MESSAGE_PAUSING;
1216                                message.sender = t;
1217                                (*trace->reporter)(trace, NULL, &message);
1218                                trace_thread_pause(trace, t);
1219                                message.code = MESSAGE_RESUMING;
1220                                (*trace->reporter)(trace, NULL, &message);
1221                                break;
1222                        default:
1223                                (*trace->reporter)(trace, NULL, &message);
1224                }
1225        }
1226
1227        // Flush out whats left now all our threads have finished
1228        trace->combiner.read_final(trace, &trace->combiner);
1229
1230        // GOODBYE
1231        message.code = MESSAGE_PAUSING;
1232        message.sender = t;
1233        (*trace->reporter)(trace, NULL, &message);
1234        message.code = MESSAGE_STOPPING;
1235        (*trace->reporter)(trace, NULL, &message);
1236
1237        thread_change_state(trace, &trace->reporter_thread, THREAD_FINISHED, true);
1238        print_memory_stats();
1239        return NULL;
1240}
1241
1242/** Similar to delay_tracetime but send messages to all threads periodically */
1243static void* keepalive_entry(void *data) {
1244        struct timeval prev, next;
1245        libtrace_message_t message = {0};
1246        libtrace_t *trace = (libtrace_t *)data;
1247        uint64_t next_release;
1248        fprintf(stderr, "keepalive thread is starting\n");
1249
1250        /* Wait until all threads are started */
1251        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
1252        if (trace->state == STATE_ERROR) {
1253                thread_change_state(trace, &trace->keepalive_thread, THREAD_FINISHED, false);
1254                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1255                pthread_exit(NULL);
1256        }
1257        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
1258
1259        gettimeofday(&prev, NULL);
1260        message.code = MESSAGE_TICK;
1261        while (trace->state != STATE_FINSHED) {
1262                fd_set rfds;
1263                next_release = tv_to_usec(&prev) + (trace->config.tick_interval * 1000);
1264                gettimeofday(&next, NULL);
1265                if (next_release > tv_to_usec(&next)) {
1266                        next = usec_to_tv(next_release - tv_to_usec(&next));
1267                        // Wait for timeout or a message
1268                        FD_ZERO(&rfds);
1269                        FD_SET(libtrace_message_queue_get_fd(&trace->keepalive_thread.messages), &rfds);
1270                        if (select(libtrace_message_queue_get_fd(&trace->keepalive_thread.messages)+1, &rfds, NULL, NULL, &next) == 1) {
1271                                libtrace_message_t msg;
1272                                libtrace_message_queue_get(&trace->keepalive_thread.messages, &msg);
1273                                assert(msg.code == MESSAGE_DO_STOP);
1274                                goto done;
1275                        }
1276                }
1277                prev = usec_to_tv(next_release);
1278                if (trace->state == STATE_RUNNING) {
1279                        message.additional.uint64 = tv_to_usec(&prev);
1280                        trace_send_message_to_perpkts(trace, &message);
1281                }
1282        }
1283done:
1284
1285        thread_change_state(trace, &trace->keepalive_thread, THREAD_FINISHED, true);
1286        return NULL;
1287}
1288
1289/**
1290 * Delays a packets playback so the playback will be in trace time.
1291 * This may break early if a message becomes available.
1292 *
1293 * Requires the first packet for this thread to be received.
1294 * @param libtrace  The trace
1295 * @param packet    The packet to delay
1296 * @param t         The current thread
1297 * @return Either READ_MESSAGE(-2) or 0 is successful
1298 */
1299static inline int delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t) {
1300        struct timeval curr_tv, pkt_tv;
1301        uint64_t next_release = t->tracetime_offset_usec;
1302        uint64_t curr_usec;
1303
1304        if (!t->tracetime_offset_usec) {
1305                libtrace_packet_t *first_pkt;
1306                struct timeval *sys_tv;
1307                int64_t initial_offset;
1308                int stable = retrive_first_packet(libtrace, &first_pkt, &sys_tv);
1309                assert(first_pkt);
1310                pkt_tv = trace_get_timeval(first_pkt);
1311                initial_offset = (int64_t)tv_to_usec(sys_tv) - (int64_t)tv_to_usec(&pkt_tv);
1312                /* In the unlikely case offset is 0, change it to 1 */
1313                if (stable)
1314                        t->tracetime_offset_usec = initial_offset ? initial_offset: 1;
1315                next_release = initial_offset;
1316        }
1317        /* next_release == offset */
1318        pkt_tv = trace_get_timeval(packet);
1319        next_release += tv_to_usec(&pkt_tv);
1320        gettimeofday(&curr_tv, NULL);
1321        curr_usec = tv_to_usec(&curr_tv);
1322        if (next_release > curr_usec) {
1323                int ret, mesg_fd = libtrace_message_queue_get_fd(&t->messages);
1324                struct timeval delay_tv = usec_to_tv(next_release-curr_usec);
1325                fd_set rfds;
1326                FD_ZERO(&rfds);
1327                FD_SET(mesg_fd, &rfds);
1328                // We need to wait
1329
1330                //printf("WAITING for %d.%d next=%"PRIu64" curr=%"PRIu64" seconds packettime %f\n", delay_tv.tv_sec, delay_tv.tv_usec, next_release, curr_usec, trace_get_seconds(packet));
1331                ret = select(mesg_fd+1, &rfds, NULL, NULL, &delay_tv);
1332                if (ret == 0) {
1333                        return 0;
1334                } else if (ret > 0) {
1335                        return READ_MESSAGE;
1336                } else {
1337                        fprintf(stderr, "I thnik we broke select\n");
1338                }
1339        }
1340        return 0;
1341}
1342
1343/* Discards packets that don't match the filter.
1344 * Discarded packets are emptied and then moved to the end of the packet list.
1345 *
1346 * @param trace       The trace format, containing the filter
1347 * @param packets     An array of packets
1348 * @param nb_packets  The number of valid items in packets
1349 *
1350 * @return The number of packets that passed the filter, which are moved to
1351 *          the start of the packets array
1352 */
1353static inline size_t filter_packets(libtrace_t *trace,
1354                                    libtrace_packet_t **packets,
1355                                    size_t nb_packets) {
1356        size_t offset = 0;
1357        size_t i;
1358
1359        for (i = 0; i < nb_packets; ++i) {
1360                // The filter needs the trace attached to receive the link type
1361                packets[i]->trace = trace;
1362                if (trace_apply_filter(trace->filter, packets[i])) {
1363                        libtrace_packet_t *tmp;
1364                        tmp = packets[offset];
1365                        packets[offset++] = packets[i];
1366                        packets[i] = tmp;
1367                } else {
1368                        trace_fin_packet(packets[i]);
1369                }
1370        }
1371
1372        return offset;
1373}
1374
1375/* Read a batch of packets from the trace into a buffer.
1376 * Note that this function will block until a packet is read (or EOF is reached)
1377 *
1378 * @param libtrace    The trace
1379 * @param t           The thread
1380 * @param packets     An array of packets
1381 * @param nb_packets  The number of empty packets in packets
1382 * @return The number of packets read, 0 on EOF (or an error/message -1,-2).
1383 */
1384static int trace_pread_packet_wrapper(libtrace_t *libtrace,
1385                                      libtrace_thread_t *t,
1386                                      libtrace_packet_t *packets[],
1387                                      size_t nb_packets) {
1388        int i;
1389        assert(nb_packets);
1390        assert(libtrace && "libtrace is NULL in trace_read_packet()");
1391        if (trace_is_err(libtrace))
1392                return -1;
1393        if (!libtrace->started) {
1394                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
1395                              "You must call libtrace_start() before trace_read_packet()\n");
1396                return -1;
1397        }
1398
1399        if (libtrace->format->pread_packets) {
1400                int ret;
1401                for (i = 0; i < (int) nb_packets; ++i) {
1402                        assert(i[packets]);
1403                        if (!(packets[i]->buf_control==TRACE_CTRL_PACKET ||
1404                              packets[i]->buf_control==TRACE_CTRL_EXTERNAL)) {
1405                                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,
1406                                              "Packet passed to trace_read_packet() is invalid\n");
1407                                return -1;
1408                        }
1409                }
1410                do {
1411                        ret=libtrace->format->pread_packets(libtrace, t,
1412                                                            packets,
1413                                                            nb_packets);
1414                        /* Error, EOF or message? */
1415                        if (ret <= 0) {
1416                                return ret;
1417                        }
1418
1419                        if (libtrace->filter) {
1420                                int remaining;
1421                                remaining = filter_packets(libtrace,
1422                                                           packets, ret);
1423                                t->filtered_packets += ret - remaining;
1424                                ret = remaining;
1425                        }
1426                        for (i = 0; i < ret; ++i) {
1427                                packets[i]->trace = libtrace;
1428                                /* TODO IN FORMAT?? Like traditional libtrace */
1429                                if (libtrace->snaplen>0)
1430                                        trace_set_capture_length(packets[i],
1431                                                        libtrace->snaplen);
1432                                trace_packet_set_order(packets[i], trace_get_erf_timestamp(packets[i]));
1433                        }
1434                        t->accepted_packets += ret;
1435                } while(ret == 0);
1436                return ret;
1437        }
1438        trace_set_err(libtrace, TRACE_ERR_UNSUPPORTED,
1439                      "This format does not support reading packets\n");
1440        return ~0U;
1441}
1442
1443/* Restarts a parallel trace, this is called from trace_pstart.
1444 * The libtrace lock is held upon calling this function.
1445 * Typically with a parallel trace the threads are not
1446 * killed rather.
1447 */
1448static int trace_prestart(libtrace_t * libtrace, void *global_blob,
1449                          fn_per_pkt per_pkt, fn_reporter reporter) {
1450        int err = 0;
1451        if (libtrace->state != STATE_PAUSED) {
1452                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
1453                        "trace(%s) is not currently paused",
1454                              libtrace->uridata);
1455                return -1;
1456        }
1457
1458        /* Update functions if requested */
1459        if (per_pkt)
1460                libtrace->per_pkt = per_pkt;
1461        if (reporter)
1462                libtrace->reporter = reporter;
1463        if(global_blob)
1464                libtrace->global_blob = global_blob;
1465
1466        assert(libtrace_parallel);
1467        assert(!libtrace->perpkt_thread_states[THREAD_RUNNING]);
1468        assert(libtrace->per_pkt);
1469
1470        if (libtrace->perpkt_thread_count > 1 &&
1471            trace_supports_parallel(libtrace) &&
1472            !trace_has_dedicated_hasher(libtrace)) {
1473                fprintf(stderr, "Restarting trace pstart_input()\n");
1474                err = libtrace->format->pstart_input(libtrace);
1475        } else {
1476                if (libtrace->format->start_input) {
1477                        fprintf(stderr, "Restarting trace start_input()\n");
1478                        err = libtrace->format->start_input(libtrace);
1479                }
1480        }
1481
1482        if (err == 0) {
1483                libtrace->started = true;
1484                libtrace_change_state(libtrace, STATE_RUNNING, false);
1485        }
1486        return err;
1487}
1488
1489/**
1490 * Verifies the configuration and sets default values for any values not
1491 * specified by the user.
1492 * @return
1493 */
1494static void verify_configuration(libtrace_t *libtrace) {
1495
1496        if (libtrace->config.hasher_queue_size <= 0)
1497                libtrace->config.hasher_queue_size = 1000;
1498
1499        if (libtrace->config.perpkt_threads <= 0) {
1500                // TODO add BSD support
1501                libtrace->perpkt_thread_count = sysconf(_SC_NPROCESSORS_ONLN);
1502                if (libtrace->perpkt_thread_count <= 0)
1503                        // Lets just use one
1504                        libtrace->perpkt_thread_count = 1;
1505        } else {
1506                libtrace->perpkt_thread_count = libtrace->config.perpkt_threads;
1507        }
1508
1509        if (libtrace->config.reporter_thold <= 0)
1510                libtrace->config.reporter_thold = 100;
1511        if (libtrace->config.burst_size <= 0)
1512                libtrace->config.burst_size = 10;
1513        if (libtrace->config.packet_thread_cache_size <= 0)
1514                libtrace->config.packet_thread_cache_size = 20;
1515        if (libtrace->config.packet_cache_size <= 0)
1516                libtrace->config.packet_cache_size = (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count;
1517
1518        if (libtrace->config.packet_cache_size <
1519                (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count)
1520                fprintf(stderr, "WARNING deadlocks may occur and extra memory allocating buffer sizes (packet_freelist_size) mismatched\n");
1521
1522        if (libtrace->combiner.initialise == NULL && libtrace->combiner.publish == NULL)
1523                libtrace->combiner = combiner_unordered;
1524}
1525
1526/**
1527 * Starts a libtrace_thread, including allocating memory for messaging.
1528 * Threads are expected to wait until the libtrace look is released.
1529 * Hence why we don't init structures until later.
1530 *
1531 * @param trace The trace the thread is associated with
1532 * @param t The thread that is filled when the thread is started
1533 * @param type The type of thread
1534 * @param start_routine The entry location of the thread
1535 * @param perpkt_num The perpkt thread number (should be set -1 if not perpkt)
1536 * @param name For debugging purposes set the threads name (Optional)
1537 *
1538 * @return 0 on success or -1 upon error in which case the libtrace error is set.
1539 *         In this situation the thread structure is zeroed.
1540 */
1541static int trace_start_thread(libtrace_t *trace,
1542                       libtrace_thread_t *t,
1543                       enum thread_types type,
1544                       void *(*start_routine) (void *),
1545                       int perpkt_num,
1546                       const char *name) {
1547        int ret;
1548        assert(t->type == THREAD_EMPTY);
1549        t->trace = trace;
1550        t->ret = NULL;
1551        t->user_data = NULL;
1552        t->type = type;
1553        t->state = THREAD_RUNNING;
1554        ret = pthread_create(&t->tid, NULL, start_routine, (void *) trace);
1555        if (ret != 0) {
1556                libtrace_zero_thread(t);
1557                trace_set_err(trace, ret, "Failed to create a thread");
1558                return -1;
1559        }
1560        libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
1561        if (trace_has_dedicated_hasher(trace) && type == THREAD_PERPKT) {
1562                libtrace_ringbuffer_init(&t->rbuffer,
1563                                         trace->config.hasher_queue_size,
1564                                         trace->config.hasher_polling?
1565                                                 LIBTRACE_RINGBUFFER_POLLING:
1566                                                 LIBTRACE_RINGBUFFER_BLOCKING);
1567        }
1568        if(name)
1569                pthread_setname_np(t->tid, name);
1570        t->perpkt_num = perpkt_num;
1571        return 0;
1572}
1573
1574/* Start an input trace in the parallel libtrace framework.
1575 * This can also be used to restart an existing parallel.
1576 *
1577 * NOTE: libtrace lock is held for the majority of this function
1578 *
1579 * @param libtrace the input trace to start
1580 * @param global_blob some global data you can share with the new perpkt threads
1581 * @returns 0 on success, otherwise -1 to indicate an error has occured
1582 */
1583DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob,
1584                           fn_per_pkt per_pkt, fn_reporter reporter) {
1585        int i;
1586        int ret = -1;
1587        char name[16];
1588        sigset_t sig_before, sig_block_all;
1589        assert(libtrace);
1590
1591        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1592        if (trace_is_err(libtrace)) {
1593                goto cleanup_none;
1594        }
1595
1596        if (libtrace->state == STATE_PAUSED) {
1597                ret = trace_prestart(libtrace, global_blob, per_pkt, reporter);
1598                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1599                return ret;
1600        }
1601
1602        if (libtrace->state != STATE_NEW) {
1603                trace_set_err(libtrace, TRACE_ERR_BAD_STATE, "trace_pstart "
1604                              "should be called on a NEW or PAUSED trace but "
1605                              "instead was called from %s",
1606                              get_trace_state_name(libtrace->state));
1607                goto cleanup_none;
1608        }
1609
1610        /* Store the user defined things against the trace */
1611        libtrace->global_blob = global_blob;
1612        libtrace->per_pkt = per_pkt;
1613        libtrace->reporter = reporter;
1614        /* And zero other fields */
1615        for (i = 0; i < THREAD_STATE_MAX; ++i) {
1616                libtrace->perpkt_thread_states[i] = 0;
1617        }
1618        libtrace->first_packets.first = 0;
1619        libtrace->first_packets.count = 0;
1620        libtrace->first_packets.packets = NULL;
1621        libtrace->perpkt_threads = NULL;
1622        /* Set a global which says we are using a parallel trace. This is
1623         * for backwards compatability due to changes when destroying packets */
1624        libtrace_parallel = 1;
1625
1626        verify_configuration(libtrace);
1627
1628        /* Try start the format */
1629        if (libtrace->perpkt_thread_count > 1 &&
1630            trace_supports_parallel(libtrace) &&
1631            !trace_has_dedicated_hasher(libtrace)) {
1632                printf("This format has direct support for p's\n");
1633                ret = libtrace->format->pstart_input(libtrace);
1634                libtrace->pread = trace_pread_packet_wrapper;
1635        } else {
1636                if (libtrace->format->start_input) {
1637                        ret = libtrace->format->start_input(libtrace);
1638                }
1639                if (libtrace->perpkt_thread_count > 1)
1640                        libtrace->pread = trace_pread_packet_first_in_first_served;
1641                else
1642                        libtrace->pread = NULL;
1643        }
1644
1645        if (ret != 0) {
1646                goto cleanup_none;
1647        }
1648
1649        /* --- Start all the threads we need --- */
1650        /* Disable signals because it is inherited by the threads we start */
1651        sigemptyset(&sig_block_all);
1652        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_block_all, &sig_before), == 0);
1653
1654        /* If we need a hasher thread start it
1655         * Special Case: If single threaded we don't need a hasher
1656         */
1657        if (libtrace->perpkt_thread_count > 1 && libtrace->hasher
1658            && libtrace->hasher_type != HASHER_HARDWARE) {
1659                ret = trace_start_thread(libtrace, &libtrace->hasher_thread,
1660                                   THREAD_HASHER, hasher_entry, -1,
1661                                   "hasher-thread");
1662                if (ret != 0) {
1663                        trace_set_err(libtrace, errno, "trace_pstart "
1664                                      "failed to start a hasher thread.");
1665                        goto cleanup_started;
1666                }
1667                libtrace->pread = trace_pread_packet_hasher_thread;
1668        } else {
1669                libtrace->hasher_thread.type = THREAD_EMPTY;
1670        }
1671
1672        /* Start up our perpkt threads */
1673        libtrace->perpkt_threads = calloc(sizeof(libtrace_thread_t),
1674                                          libtrace->perpkt_thread_count);
1675        if (!libtrace->perpkt_threads) {
1676                trace_set_err(libtrace, errno, "trace_pstart "
1677                              "failed to allocate memory.");
1678                goto cleanup_threads;
1679        }
1680        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1681                snprintf(name, sizeof(name), "perpkt-%d", i);
1682                libtrace_zero_thread(&libtrace->perpkt_threads[i]);
1683                ret = trace_start_thread(libtrace, &libtrace->perpkt_threads[i],
1684                                   THREAD_PERPKT, perpkt_threads_entry, i,
1685                                   name);
1686                if (ret != 0) {
1687                        trace_set_err(libtrace, errno, "trace_pstart "
1688                                      "failed to start a perpkt thread.");
1689                        goto cleanup_threads;
1690                }
1691        }
1692
1693        /* Start the reporter thread */
1694        if (reporter) {
1695                if (libtrace->combiner.initialise)
1696                        libtrace->combiner.initialise(libtrace, &libtrace->combiner);
1697                ret = trace_start_thread(libtrace, &libtrace->reporter_thread,
1698                                   THREAD_REPORTER, reporter_entry, -1,
1699                                   "reporter_thread");
1700                if (ret != 0) {
1701                        trace_set_err(libtrace, errno, "trace_pstart "
1702                                      "failed to start reporter thread.");
1703                        goto cleanup_threads;
1704                }
1705        }
1706
1707        /* Start the keepalive thread */
1708        if (libtrace->config.tick_interval > 0) {
1709                ret = trace_start_thread(libtrace, &libtrace->keepalive_thread,
1710                                   THREAD_KEEPALIVE, keepalive_entry, -1,
1711                                   "keepalive_thread");
1712                if (ret != 0) {
1713                        trace_set_err(libtrace, errno, "trace_pstart "
1714                                      "failed to start keepalive thread.");
1715                        goto cleanup_threads;
1716                }
1717        }
1718
1719        /* Init other data structures */
1720        libtrace->perpkt_thread_states[THREAD_RUNNING] = libtrace->perpkt_thread_count;
1721        ASSERT_RET(pthread_spin_init(&libtrace->first_packets.lock, 0), == 0);
1722        libtrace->first_packets.packets = calloc(libtrace->perpkt_thread_count,
1723                                                 sizeof(struct  __packet_storage_magic_type));
1724        if (libtrace->first_packets.packets == NULL) {
1725                trace_set_err(libtrace, errno, "trace_pstart "
1726                              "failed to allocate memory.");
1727                goto cleanup_threads;
1728        }
1729
1730        /*trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "trace_pstart "
1731                      "failed to allocate ocache.");
1732        goto cleanup_threads;*/
1733
1734        if (libtrace_ocache_init(&libtrace->packet_freelist,
1735                             (void* (*)()) trace_create_packet,
1736                             (void (*)(void *))trace_destroy_packet,
1737                             libtrace->config.packet_thread_cache_size,
1738                             libtrace->config.packet_cache_size * 4,
1739                             libtrace->config.fixed_packet_count) != 0) {
1740                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "trace_pstart "
1741                              "failed to allocate ocache.");
1742                goto cleanup_threads;
1743        }
1744
1745        /* Threads don't start */
1746        libtrace->started = true;
1747        libtrace_change_state(libtrace, STATE_RUNNING, false);
1748
1749        ret = 0;
1750        goto success;
1751cleanup_threads:
1752        if (libtrace->first_packets.packets) {
1753                free(libtrace->first_packets.packets);
1754                libtrace->first_packets.packets = NULL;
1755        }
1756        libtrace_change_state(libtrace, STATE_ERROR, false);
1757        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1758        if (libtrace->hasher_thread.type == THREAD_HASHER) {
1759                pthread_join(libtrace->hasher_thread.tid, NULL);
1760                libtrace_zero_thread(&libtrace->hasher_thread);
1761        }
1762
1763        if (libtrace->perpkt_threads) {
1764                for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1765                        if (libtrace->perpkt_threads[i].type == THREAD_PERPKT) {
1766                                pthread_join(libtrace->perpkt_threads[i].tid, NULL);
1767                                libtrace_zero_thread(&libtrace->perpkt_threads[i]);
1768                        } else break;
1769                }
1770                free(libtrace->perpkt_threads);
1771                libtrace->perpkt_threads = NULL;
1772        }
1773
1774        if (libtrace->reporter_thread.type == THREAD_REPORTER) {
1775                pthread_join(libtrace->reporter_thread.tid, NULL);
1776                libtrace_zero_thread(&libtrace->reporter_thread);
1777        }
1778
1779        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
1780                pthread_join(libtrace->keepalive_thread.tid, NULL);
1781                libtrace_zero_thread(&libtrace->keepalive_thread);
1782        }
1783        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1784        libtrace_change_state(libtrace, STATE_NEW, false);
1785        assert(libtrace->perpkt_thread_states[THREAD_RUNNING] == 0);
1786        libtrace->perpkt_thread_states[THREAD_FINISHED] = 0;
1787cleanup_started:
1788        if (trace_supports_parallel(libtrace) &&
1789            !trace_has_dedicated_hasher(libtrace)
1790            && libtrace->perpkt_thread_count > 1) {
1791                if (libtrace->format->ppause_input)
1792                        libtrace->format->ppause_input(libtrace);
1793        } else {
1794                if (libtrace->format->pause_input)
1795                        libtrace->format->pause_input(libtrace);
1796        }
1797        ret = -1;
1798success:
1799        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_before, NULL), == 0);
1800cleanup_none:
1801        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1802        return ret;
1803}
1804
1805/**
1806 * Pauses a trace, this should only be called by the main thread
1807 * 1. Set started = false
1808 * 2. All perpkt threads are paused waiting on a condition var
1809 * 3. Then call ppause on the underlying format if found
1810 * 4. The traces state is paused
1811 *
1812 * Once done you should be able to modify the trace setup and call pstart again
1813 * TODO handle changing thread numbers
1814 */
1815DLLEXPORT int trace_ppause(libtrace_t *libtrace)
1816{
1817        libtrace_thread_t *t;
1818        int i;
1819        assert(libtrace);
1820
1821        t = get_thread_table(libtrace);
1822        // Check state from within the lock if we are going to change it
1823        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1824        if (!libtrace->started || libtrace->state != STATE_RUNNING) {
1825                fprintf(stderr, "pause failed started=%d state=%s (%d)\n", libtrace->started, get_trace_state_name(libtrace->state), libtrace->state);
1826                trace_set_err(libtrace,TRACE_ERR_BAD_STATE, "You must call trace_start() before calling trace_ppause()");
1827                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1828                return -1;
1829        }
1830
1831        libtrace_change_state(libtrace, STATE_PAUSING, false);
1832        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1833
1834        // Special case handle the hasher thread case
1835        if (trace_has_dedicated_hasher(libtrace)) {
1836                if (libtrace->config.debug_state)
1837                        fprintf(stderr, "Hasher thread is running, asking it to pause ...");
1838                libtrace_message_t message = {0};
1839                message.code = MESSAGE_DO_PAUSE;
1840                trace_send_message_to_thread(libtrace, &libtrace->hasher_thread, &message);
1841                // Wait for it to pause
1842                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1843                while (libtrace->hasher_thread.state == THREAD_RUNNING) {
1844                        ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
1845                }
1846                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1847                if (libtrace->config.debug_state)
1848                        fprintf(stderr, " DONE\n");
1849        }
1850
1851        if (libtrace->config.debug_state)
1852                fprintf(stderr, "Asking perpkt threads to pause ...");
1853        // Stop threads, skip this one if it's a perpkt
1854        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1855                if (&libtrace->perpkt_threads[i] != t) {
1856                        libtrace_message_t message = {0};
1857                        message.code = MESSAGE_DO_PAUSE;
1858                        trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message);
1859                        if(trace_has_dedicated_hasher(libtrace)) {
1860                                // The hasher has stopped and other threads have messages waiting therefore
1861                                // If the queues are empty the other threads would have no data
1862                                // So send some message packets to simply ask the threads to check
1863                                // We are the only writer since hasher has paused
1864                                libtrace_packet_t *pkt;
1865                                libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &pkt, 1, 1);
1866                                pkt->error = READ_MESSAGE;
1867                                libtrace_ringbuffer_write(&libtrace->perpkt_threads[i].rbuffer, pkt);
1868                        }
1869                } else {
1870                        fprintf(stderr, "Mapper threads should not be used to pause a trace this could cause any number of problems!!\n");
1871                }
1872        }
1873
1874        if (t) {
1875                // A perpkt is doing the pausing, interesting, fake an extra thread paused
1876                // We rely on the user to *not* return before starting the trace again
1877                thread_change_state(libtrace, t, THREAD_PAUSED, true);
1878        }
1879
1880        // Wait for all threads to pause
1881        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1882        while(libtrace->perpkt_thread_states[THREAD_RUNNING]) {
1883                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
1884        }
1885        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1886
1887        if (libtrace->config.debug_state)
1888                fprintf(stderr, " DONE\n");
1889
1890        // Deal with the reporter
1891        if (trace_has_dedicated_reporter(libtrace)) {
1892                if (libtrace->config.debug_state)
1893                        fprintf(stderr, "Reporter thread is running, asking it to pause ...");
1894                libtrace_message_t message = {0};
1895                message.code = MESSAGE_DO_PAUSE;
1896                trace_send_message_to_thread(libtrace, &libtrace->reporter_thread, &message);
1897                // Wait for it to pause
1898                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
1899                while (libtrace->reporter_thread.state == THREAD_RUNNING) {
1900                        ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
1901                }
1902                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
1903                if (libtrace->config.debug_state)
1904                        fprintf(stderr, " DONE\n");
1905        }
1906
1907        /* Cache values before we pause */
1908        if (libtrace->stats == NULL)
1909                libtrace->stats = trace_create_statistics();
1910        // Save the statistics against the trace
1911        trace_get_statistics(libtrace, NULL);
1912        if (trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace) && libtrace->perpkt_thread_count > 1) {
1913                libtrace->started = false;
1914                if (libtrace->format->ppause_input)
1915                        libtrace->format->ppause_input(libtrace);
1916                // TODO What happens if we don't have pause input??
1917        } else {
1918                int err;
1919                fprintf(stderr, "Trace is not parallel so we are doing a normal pause %s\n", libtrace->uridata);
1920                err = trace_pause(libtrace);
1921                // We should handle this a bit better
1922                if (err)
1923                        return err;
1924        }
1925
1926        // Only set as paused after the pause has been called on the trace
1927        libtrace_change_state(libtrace, STATE_PAUSED, true);
1928        return 0;
1929}
1930
1931/**
1932 * Stop trace finish prematurely as though it meet an EOF
1933 * This should only be called by the main thread
1934 * 1. Calls ppause
1935 * 2. Sends a message asking for threads to finish
1936 * 3. Releases threads which will pause
1937 */
1938DLLEXPORT int trace_pstop(libtrace_t *libtrace)
1939{
1940        int i, err;
1941        libtrace_message_t message = {0};
1942        assert(libtrace);
1943
1944        // Ensure all threads have paused and the underlying trace format has
1945        // been closed and all packets associated are cleaned up
1946        // Pause will do any state checks for us
1947        err = trace_ppause(libtrace);
1948        if (err)
1949                return err;
1950
1951        // Now send a message asking the threads to stop
1952        // This will be retrieved before trying to read another packet
1953
1954        message.code = MESSAGE_DO_STOP;
1955        trace_send_message_to_perpkts(libtrace, &message);
1956        if (trace_has_dedicated_hasher(libtrace))
1957                trace_send_message_to_thread(libtrace, &libtrace->hasher_thread, &message);
1958
1959        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1960                trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message);
1961        }
1962
1963        // Now release the threads and let them stop
1964        libtrace_change_state(libtrace, STATE_FINSHED, true);
1965        return 0;
1966}
1967
1968/**
1969 * Set the hasher type along with a selected function, if hardware supports
1970 * that generic type of hashing it will be used otherwise the supplied
1971 * hasher function will be used and passed data when called.
1972 *
1973 * @return 0 if successful otherwise -1 on error
1974 */
1975DLLEXPORT int trace_set_hasher(libtrace_t *trace, enum hasher_types type, fn_hasher hasher, void *data) {
1976        int ret = -1;
1977        if (type == HASHER_HARDWARE || (type == HASHER_CUSTOM && !hasher) || (type == HASHER_BALANCE && hasher)) {
1978                return -1;
1979        }
1980
1981        // Save the requirements
1982        trace->hasher_type = type;
1983        if (hasher) {
1984                trace->hasher = hasher;
1985                trace->hasher_data = data;
1986        } else {
1987                trace->hasher = NULL;
1988                // TODO consider how to handle freeing this
1989                trace->hasher_data = NULL;
1990        }
1991
1992        // Try push this to hardware - NOTE hardware could do custom if
1993        // there is a more efficient way to apply it, in this case
1994        // it will simply grab the function out of libtrace_t
1995        if (trace->format->pconfig_input)
1996                ret = trace->format->pconfig_input(trace, TRACE_OPTION_SET_HASHER, &type);
1997
1998        if (ret == -1) {
1999                // We have to deal with this ourself
2000                // This most likely means single threaded reading of the trace
2001                if (!hasher) {
2002                        switch (type)
2003                        {
2004                                case HASHER_CUSTOM:
2005                                case HASHER_BALANCE:
2006                                        return 0;
2007                                case HASHER_BIDIRECTIONAL:
2008                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
2009                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
2010                                        toeplitz_init_config(trace->hasher_data, 1);
2011                                        return 0;
2012                                case HASHER_UNIDIRECTIONAL:
2013                                        trace->hasher = (fn_hasher) toeplitz_hash_packet;
2014                                        trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t));
2015                                        toeplitz_init_config(trace->hasher_data, 0);
2016                                        return 0;
2017                                case HASHER_HARDWARE:
2018                                        return -1;
2019                        }
2020                        return -1;
2021                }
2022        } else {
2023                // The hardware is dealing with this yay
2024                trace->hasher_type = HASHER_HARDWARE;
2025        }
2026
2027        return 0;
2028}
2029
2030// Waits for all threads to finish
2031DLLEXPORT void trace_join(libtrace_t *libtrace) {
2032        int i;
2033
2034        /* Firstly wait for the perpkt threads to finish, since these are
2035         * user controlled */
2036        for (i=0; i< libtrace->perpkt_thread_count; i++) {
2037                //printf("Waiting to join with perpkt #%d\n", i);
2038                ASSERT_RET(pthread_join(libtrace->perpkt_threads[i].tid, NULL), == 0);
2039                //printf("Joined with perpkt #%d\n", i);
2040                // So we must do our best effort to empty the queue - so
2041                // the producer (or any other threads) don't block.
2042                libtrace_packet_t * packet;
2043                assert(libtrace->perpkt_threads[i].state == THREAD_FINISHED);
2044                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
2045                        if (packet) // This could be NULL iff the perpkt finishes early
2046                                trace_destroy_packet(packet);
2047        }
2048
2049        /* Now the hasher */
2050        if (trace_has_dedicated_hasher(libtrace)) {
2051                pthread_join(libtrace->hasher_thread.tid, NULL);
2052                assert(libtrace->hasher_thread.state == THREAD_FINISHED);
2053        }
2054
2055        // Now that everything is finished nothing can be touching our
2056        // buffers so clean them up
2057        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2058                // Its possible 1 packet got added by the reporter (or 1 per any other thread) since we cleaned up
2059                // if they lost timeslice before-during a write
2060                libtrace_packet_t * packet;
2061                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
2062                        trace_destroy_packet(packet);
2063                if (libtrace->hasher) {
2064                        assert(libtrace_ringbuffer_is_empty(&libtrace->perpkt_threads[i].rbuffer));
2065                        libtrace_ringbuffer_destroy(&libtrace->perpkt_threads[i].rbuffer);
2066                }
2067                // Cannot destroy vector yet, this happens with trace_destroy
2068        }
2069        // TODO consider perpkt threads marking trace as finished before join is called
2070        libtrace_change_state(libtrace, STATE_FINSHED, true);
2071
2072        if (trace_has_dedicated_reporter(libtrace)) {
2073                pthread_join(libtrace->reporter_thread.tid, NULL);
2074                assert(libtrace->reporter_thread.state == THREAD_FINISHED);
2075        }
2076
2077        // Wait for the tick (keepalive) thread if it has been started
2078        if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) {
2079                libtrace_message_t msg = {0};
2080                msg.code = MESSAGE_DO_STOP;
2081                trace_send_message_to_thread(libtrace, &libtrace->keepalive_thread, &msg);
2082                pthread_join(libtrace->keepalive_thread.tid, NULL);
2083        }
2084
2085        libtrace_change_state(libtrace, STATE_JOINED, true);
2086        print_memory_stats();
2087}
2088
2089DLLEXPORT int libtrace_thread_get_message_count(libtrace_t * libtrace)
2090{
2091        libtrace_thread_t * t = get_thread_descriptor(libtrace);
2092        assert(t);
2093        return libtrace_message_queue_count(&t->messages);
2094}
2095
2096DLLEXPORT int libtrace_thread_get_message(libtrace_t * libtrace, libtrace_message_t * message)
2097{
2098        libtrace_thread_t * t = get_thread_descriptor(libtrace);
2099        assert(t);
2100        return libtrace_message_queue_get(&t->messages, message);
2101}
2102
2103DLLEXPORT int libtrace_thread_try_get_message(libtrace_t * libtrace, libtrace_message_t * message)
2104{
2105        libtrace_thread_t * t = get_thread_descriptor(libtrace);
2106        assert(t);
2107        return libtrace_message_queue_try_get(&t->messages, message);
2108}
2109
2110/**
2111 * Return backlog indicator
2112 */
2113DLLEXPORT int trace_post_reporter(libtrace_t *libtrace)
2114{
2115        libtrace_message_t message = {0};
2116        message.code = MESSAGE_POST_REPORTER;
2117        message.sender = get_thread_descriptor(libtrace);
2118        return libtrace_message_queue_put(&libtrace->reporter_thread.messages, (void *) &message);
2119}
2120
2121/**
2122 * Return backlog indicator
2123 */
2124DLLEXPORT int trace_send_message_to_reporter(libtrace_t * libtrace, libtrace_message_t * message)
2125{
2126        //printf("Sending message code=%d to reporter\n", message->code);
2127        message->sender = get_thread_descriptor(libtrace);
2128        return libtrace_message_queue_put(&libtrace->reporter_thread.messages, message);
2129}
2130
2131/**
2132 *
2133 */
2134DLLEXPORT int trace_send_message_to_thread(libtrace_t * libtrace, libtrace_thread_t *t, libtrace_message_t * message)
2135{
2136        //printf("Sending message code=%d to reporter\n", message->code);
2137        message->sender = get_thread_descriptor(libtrace);
2138        return libtrace_message_queue_put(&t->messages, message);
2139}
2140
2141DLLEXPORT int trace_send_message_to_perpkts(libtrace_t * libtrace, libtrace_message_t * message)
2142{
2143        int i;
2144        message->sender = get_thread_descriptor(libtrace);
2145        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
2146                libtrace_message_queue_put(&libtrace->perpkt_threads[i].messages, message);
2147        }
2148        //printf("Sending message code=%d to reporter\n", message->code);
2149        return 0;
2150}
2151
2152DLLEXPORT void libtrace_result_set_key(libtrace_result_t * result, uint64_t key) {
2153        result->key = key;
2154}
2155DLLEXPORT uint64_t libtrace_result_get_key(libtrace_result_t * result) {
2156        return result->key;
2157}
2158DLLEXPORT void libtrace_result_set_value(libtrace_result_t * result, libtrace_generic_types_t value) {
2159        result->value = value;
2160}
2161DLLEXPORT libtrace_generic_types_t libtrace_result_get_value(libtrace_result_t * result) {
2162        return result->value;
2163}
2164DLLEXPORT void libtrace_result_set_key_value(libtrace_result_t * result, uint64_t key, libtrace_generic_types_t value) {
2165        result->key = key;
2166        result->value = value;
2167}
2168DLLEXPORT void trace_destroy_result(libtrace_result_t ** result) {
2169        free(*result);
2170        result = NULL;
2171        // TODO automatically back with a free list!!
2172}
2173
2174DLLEXPORT void * trace_get_global(libtrace_t *trace)
2175{
2176        return trace->global_blob;
2177}
2178
2179DLLEXPORT void * trace_set_global(libtrace_t *trace, void * data)
2180{
2181        if (trace->global_blob && trace->global_blob != data) {
2182                void * ret = trace->global_blob;
2183                trace->global_blob = data;
2184                return ret;
2185        } else {
2186                trace->global_blob = data;
2187                return NULL;
2188        }
2189}
2190
2191DLLEXPORT void * trace_get_tls(libtrace_thread_t *t)
2192{
2193        return t->user_data;
2194}
2195
2196DLLEXPORT void * trace_set_tls(libtrace_thread_t *t, void * data)
2197{
2198        if(t->user_data && t->user_data != data) {
2199                void *ret = t->user_data;
2200                t->user_data = data;
2201                return ret;
2202        } else {
2203                t->user_data = data;
2204                return NULL;
2205        }
2206}
2207
2208/**
2209 * Publishes a result to the reduce queue
2210 * Should only be called by a perpkt thread, i.e. from a perpkt handler
2211 */
2212DLLEXPORT void trace_publish_result(libtrace_t *libtrace, libtrace_thread_t *t, uint64_t key, libtrace_generic_types_t value, int type) {
2213        libtrace_result_t res;
2214        res.type = type;
2215        res.key = key;
2216        res.value = value;
2217        assert(libtrace->combiner.publish);
2218        libtrace->combiner.publish(libtrace, t->perpkt_num, &libtrace->combiner, &res);
2219        return;
2220}
2221
2222/**
2223 * Sets a combiner function against the trace.
2224 */
2225DLLEXPORT void trace_set_combiner(libtrace_t *trace, const libtrace_combine_t *combiner, libtrace_generic_types_t config){
2226        if (combiner) {
2227                trace->combiner = *combiner;
2228                trace->combiner.configuration = config;
2229        } else {
2230                // No combiner, so don't try use it
2231                memset(&trace->combiner, 0, sizeof(trace->combiner));
2232        }
2233}
2234
2235DLLEXPORT uint64_t trace_packet_get_order(libtrace_packet_t * packet) {
2236        return packet->order;
2237}
2238
2239DLLEXPORT uint64_t trace_packet_get_hash(libtrace_packet_t * packet) {
2240        return packet->hash;
2241}
2242
2243DLLEXPORT void trace_packet_set_order(libtrace_packet_t * packet, uint64_t order) {
2244        packet->order = order;
2245}
2246
2247DLLEXPORT void trace_packet_set_hash(libtrace_packet_t * packet, uint64_t hash) {
2248        packet->hash = hash;
2249}
2250
2251DLLEXPORT int trace_finished(libtrace_t * libtrace) {
2252        // TODO I don't like using this so much, we could use state!!!
2253        return libtrace->perpkt_thread_states[THREAD_FINISHED] == libtrace->perpkt_thread_count;
2254}
2255
2256DLLEXPORT int trace_parallel_config(libtrace_t *libtrace, trace_parallel_option_t option, void *value)
2257{
2258        UNUSED int ret = -1;
2259        switch (option) {
2260                case TRACE_OPTION_TICK_INTERVAL:
2261                        libtrace->config.tick_interval = *((int *) value);
2262                        return 1;
2263                case TRACE_OPTION_SET_HASHER:
2264                        return trace_set_hasher(libtrace, (enum hasher_types) *((int *) value), NULL, NULL);
2265                case TRACE_OPTION_SET_PERPKT_THREAD_COUNT:
2266                        libtrace->config.perpkt_threads = *((int *) value);
2267                        return 1;
2268                case TRACE_OPTION_TRACETIME:
2269                        if(*((int *) value))
2270                                libtrace->tracetime = 1;
2271                        else
2272                                libtrace->tracetime = 0;
2273                        return 0;
2274                case TRACE_OPTION_SET_CONFIG:
2275                        libtrace->config = *((struct user_configuration *) value);
2276                case TRACE_OPTION_GET_CONFIG:
2277                        *((struct user_configuration *) value) = libtrace->config;
2278        }
2279        return 0;
2280}
2281
2282static bool config_bool_parse(char *value, size_t nvalue) {
2283        if (strncmp(value, "true", nvalue) == 0)
2284                return true;
2285        else if (strncmp(value, "false", nvalue) == 0)
2286                return false;
2287        else
2288                return strtoll(value, NULL, 10) != 0;
2289}
2290
2291static void config_string(struct user_configuration *uc, char *key, size_t nkey, char *value, size_t nvalue) {
2292        assert(key);
2293        assert(value);
2294        assert(uc);
2295        if (strncmp(key, "packet_cache_size", nkey) == 0
2296            || strncmp(key, "pcs", nkey) == 0) {
2297                uc->packet_cache_size = strtoll(value, NULL, 10);
2298        } else if (strncmp(key, "packet_thread_cache_size", nkey) == 0
2299                   || strncmp(key, "ptcs", nkey) == 0) {
2300                uc->packet_thread_cache_size = strtoll(value, NULL, 10);
2301        } else if (strncmp(key, "fixed_packet_count", nkey) == 0
2302                   || strncmp(key, "fpc", nkey) == 0) {
2303                uc->fixed_packet_count = config_bool_parse(value, nvalue);
2304        } else if (strncmp(key, "burst_size", nkey) == 0
2305                   || strncmp(key, "bs", nkey) == 0) {
2306                uc->burst_size = strtoll(value, NULL, 10);
2307        } else if (strncmp(key, "tick_interval", nkey) == 0
2308                   || strncmp(key, "ti", nkey) == 0) {
2309                uc->tick_interval = strtoll(value, NULL, 10);
2310        } else if (strncmp(key, "tick_count", nkey) == 0
2311                   || strncmp(key, "tc", nkey) == 0) {
2312                uc->tick_count = strtoll(value, NULL, 10);
2313        } else if (strncmp(key, "perpkt_threads", nkey) == 0
2314                   || strncmp(key, "pt", nkey) == 0) {
2315                uc->perpkt_threads = strtoll(value, NULL, 10);
2316        } else if (strncmp(key, "hasher_queue_size", nkey) == 0
2317                   || strncmp(key, "hqs", nkey) == 0) {
2318                uc->hasher_queue_size = strtoll(value, NULL, 10);
2319        } else if (strncmp(key, "hasher_polling", nkey) == 0
2320                   || strncmp(key, "hp", nkey) == 0) {
2321                uc->hasher_polling = config_bool_parse(value, nvalue);
2322        } else if (strncmp(key, "reporter_polling", nkey) == 0
2323                   || strncmp(key, "rp", nkey) == 0) {
2324                uc->reporter_polling = config_bool_parse(value, nvalue);
2325        } else if (strncmp(key, "reporter_thold", nkey) == 0
2326                   || strncmp(key, "rt", nkey) == 0) {
2327                uc->reporter_thold = strtoll(value, NULL, 10);
2328        } else if (strncmp(key, "debug_state", nkey) == 0
2329                   || strncmp(key, "ds", nkey) == 0) {
2330                uc->debug_state = config_bool_parse(value, nvalue);
2331        } else {
2332                fprintf(stderr, "No matching value %s(=%s)\n", key, value);
2333        }
2334}
2335
2336DLLEXPORT void parse_user_config(struct user_configuration* uc, char * str) {
2337        char *pch;
2338        char key[100];
2339        char value[100];
2340        assert(str);
2341        assert(uc);
2342        pch = strtok (str," ,.-");
2343        while (pch != NULL)
2344        {
2345                if (sscanf(pch, "%99[^=]=%99s", key, value) == 2) {
2346                        config_string(uc, key, sizeof(key), value, sizeof(value));
2347                } else {
2348                        fprintf(stderr, "Error parsing %s\n", pch);
2349                }
2350                pch = strtok (NULL," ,.-");
2351        }
2352}
2353
2354DLLEXPORT void parse_user_config_file(struct user_configuration* uc, FILE *file) {
2355        char line[1024];
2356        while (fgets(line, sizeof(line), file) != NULL)
2357        {
2358                parse_user_config(uc, line);
2359        }
2360}
2361
2362DLLEXPORT libtrace_packet_t* trace_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet) {
2363        libtrace_packet_t* result;
2364        libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &result, 1, 1);
2365        assert(result);
2366        swap_packets(result, packet); // Move the current packet into our copy
2367        return result;
2368}
2369
2370DLLEXPORT void trace_free_result_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
2371        // Try write back the packet
2372        assert(packet);
2373        // Always release any resources this might be holding such as a slot in a ringbuffer
2374        trace_fin_packet(packet);
2375        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packet, 1, 1);
2376}
2377
2378DLLEXPORT libtrace_info_t *trace_get_information(libtrace_t * libtrace) {
2379        if (libtrace->format)
2380                return &libtrace->format->info;
2381        else
2382                return NULL;
2383}
Note: See TracBrowser for help on using the repository browser.