source: lib/combiner_unordered.c

develop
Last change on this file was 2193905, checked in by Jacob Van Walraven <jcv9@…>, 2 years ago

Apply changes required for pull request #81

  • Property mode set to 100644
File size: 3.7 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27
28#include "libtrace.h"
29#include "libtrace_int.h"
30#include "data-struct/deque.h"
31#include <assert.h>
32#include <stdlib.h>
33
34static int init_combiner(libtrace_t *t, libtrace_combine_t *c) {
35        int i = 0;
36        if (trace_get_perpkt_threads(t) <= 0) {
37                trace_set_err(t, TRACE_ERR_INIT_FAILED, "You must have atleast 1 processing thread");
38                return -1;
39        }
40        libtrace_queue_t *queues;
41        c->queues = calloc(sizeof(libtrace_queue_t), trace_get_perpkt_threads(t));
42        queues = c->queues;
43        for (i = 0; i < trace_get_perpkt_threads(t); ++i) {
44                libtrace_deque_init(&queues[i], sizeof(libtrace_result_t));
45        }
46        return 0;
47}
48
49static void publish(libtrace_t *trace, int t_id, libtrace_combine_t *c, libtrace_result_t *res) {
50        libtrace_queue_t *queue = &((libtrace_queue_t*)c->queues)[t_id];
51        libtrace_deque_push_back(queue, res); // Automatically locking for us :)
52
53        if (libtrace_deque_get_size(queue) >= trace->config.reporter_thold) {
54                trace_post_reporter(trace);
55        }
56}
57
58static void read(libtrace_t *trace, libtrace_combine_t *c){
59        libtrace_queue_t *queues = c->queues;
60        int i;
61
62        /* Loop through and read all that are here */
63        for (i = 0; i < trace_get_perpkt_threads(trace); ++i) {
64                libtrace_queue_t *v = &queues[i];
65                while (libtrace_deque_get_size(v) != 0) {
66                        libtrace_result_t r;
67                        libtrace_generic_t gt = {.res = &r};
68                        ASSERT_RET (libtrace_deque_pop_front(v, (void *) &r), == 1);
69                        /* Ignore any ticks that we've already seen */
70                        if (r.type == RESULT_TICK_INTERVAL) {
71                                if (r.key <= c->last_ts_tick)
72                                        continue;
73                                c->last_ts_tick = r.key;
74                        }
75
76                        if (r.type == RESULT_TICK_COUNT) {
77                                if (r.key <= c->last_count_tick)
78                                        continue;
79                                c->last_count_tick = r.key;
80                        }
81                        send_message(trace, &trace->reporter_thread,
82                                MESSAGE_RESULT, gt, NULL);
83                }
84        }
85}
86
87static void destroy(libtrace_t *trace, libtrace_combine_t *c) {
88        int i;
89        libtrace_queue_t *queues = c->queues;
90
91        for (i = 0; i < trace_get_perpkt_threads(trace); i++) {
92                if (libtrace_deque_get_size(&queues[i]) != 0) {
93                        trace_set_err(trace, TRACE_ERR_COMBINER,
94                                "Failed to destroy queues, A thread still has data in destroy()");
95                        return;
96                }
97        }
98        free(queues);
99        queues = NULL;
100}
101
102DLLEXPORT const libtrace_combine_t combiner_unordered = {
103    init_combiner,      /* initialise */
104        destroy,                /* destroy */
105        publish,                /* publish */
106    read,                       /* read */
107    read,                       /* read_final */
108    read,                       /* pause */
109    NULL,                       /* queues */
110    0,                          /* last_count_tick */
111    0,                          /* last_ts_tick */
112    {0}                         /* opts */
113};
Note: See TracBrowser for help on using the repository browser.