source: lib/combiner_sorted.c @ 692bf9c

develop
Last change on this file since 692bf9c was 2193905, checked in by Jacob Van Walraven <jcv9@…>, 2 years ago

Apply changes required for pull request #81

  • Property mode set to 100644
File size: 4.0 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27
28#include "libtrace.h"
29#include "libtrace_int.h"
30#include "data-struct/vector.h"
31#include <assert.h>
32#include <stdlib.h>
33
34static int init_combiner(libtrace_t *t, libtrace_combine_t *c) {
35        int i = 0;
36        if (trace_get_perpkt_threads(t) <= 0) {
37                trace_set_err(t, TRACE_ERR_INIT_FAILED, "You must have atleast 1 processing thread");
38                return -1;
39        }
40        libtrace_vector_t *queues;
41        c->queues = calloc(sizeof(libtrace_vector_t), trace_get_perpkt_threads(t));
42        queues = c->queues;
43        for (i = 0; i < trace_get_perpkt_threads(t); ++i) {
44                libtrace_vector_init(&queues[i], sizeof(libtrace_result_t));
45        }
46        return 0;
47}
48
49static void publish(libtrace_t *trace UNUSED, int t_id, libtrace_combine_t *c, libtrace_result_t *res) {
50        libtrace_vector_t *vec = &((libtrace_vector_t*)c->queues)[t_id];
51        libtrace_vector_push_back(vec, res);
52}
53
54static void read(libtrace_t *trace UNUSED, libtrace_combine_t *c UNUSED){
55        return;
56}
57
58static int compare_result(const void* p1, const void* p2)
59{
60        const libtrace_result_t * r1 = p1;
61        const libtrace_result_t * r2 = p2;
62        if (r1->key < r2->key)
63                return -1;
64        if (r1->key == r2->key)
65                return 0;
66        else
67                return 1;
68}
69
70static void pause(libtrace_t *trace, libtrace_combine_t *c) {
71        libtrace_vector_t *queues = c->queues;
72        int i;
73        for (i = 0; i < trace_get_perpkt_threads(trace); ++i) {
74                libtrace_vector_apply_function(&queues[i], (vector_data_fn) libtrace_make_result_safe);
75        }
76}
77
78static void read_final(libtrace_t *trace, libtrace_combine_t *c) {
79        libtrace_vector_t *queues = c->queues;
80        int i;
81        size_t a;
82        // Combine all results into queue 1
83        for (i = 1; i < trace_get_perpkt_threads(trace); ++i)
84        {
85                libtrace_vector_append(&queues[0],&queues[i]);
86        }
87        // Sort them
88        libtrace_vector_qsort(&queues[0], compare_result);
89
90        for (a = 0; a < libtrace_vector_get_size(&queues[0]); ++a) {
91                libtrace_result_t r;
92                libtrace_generic_t gt = {.res = &r};
93                ASSERT_RET (libtrace_vector_get(&queues[0], a, (void *) &r), == 1);
94                if (r.type == RESULT_TICK_INTERVAL ||
95                                r.type == RESULT_TICK_COUNT) {
96                        /* Ticks are essentially useless for this combiner? */
97                        continue;
98                }
99                send_message(trace, &trace->reporter_thread, MESSAGE_RESULT,
100                                gt, NULL);
101        }
102        libtrace_vector_empty(&queues[0]);
103}
104
105static void destroy(libtrace_t *trace, libtrace_combine_t *c) {
106        int i;
107        libtrace_vector_t *queues = c->queues;
108
109        for (i = 0; i < trace_get_perpkt_threads(trace); i++) {
110                if (libtrace_vector_get_size(&queues[i]) != 0) {
111                        trace_set_err(trace, TRACE_ERR_COMBINER,
112                                "Failed to destroy queues, A thread still has data in destroy()");
113                        return;
114                }
115                libtrace_vector_destroy(&queues[i]);
116        }
117        free(queues);
118        queues = NULL;
119}
120
121DLLEXPORT const libtrace_combine_t combiner_sorted = {
122    init_combiner,      /* initialise */
123        destroy,                /* destroy */
124        publish,                /* publish */
125    read,                       /* read */
126    read_final,                 /* read_final */
127    pause,                      /* pause */
128    NULL,                       /* queues */
129    0,                          /* last_count_tick */
130    0,                          /* last_ts_tick */
131    {0}                         /* opts */
132};
Note: See TracBrowser for help on using the repository browser.