source: lib/combiner_sorted.c @ f6f3ae5

develop
Last change on this file since f6f3ae5 was f6f3ae5, checked in by Jacob Van Walraven <jcv9@…>, 2 years ago

Assertion cleanup

  • Property mode set to 100644
File size: 4.1 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27
28#include "libtrace.h"
29#include "libtrace_int.h"
30#include "data-struct/vector.h"
31#include <assert.h>
32#include <stdlib.h>
33
34static int init_combiner(libtrace_t *t, libtrace_combine_t *c) {
35        int i = 0;
36        /*assert(trace_get_perpkt_threads(t) > 0);*/
37        if (trace_get_perpkt_threads(t) <= 0) {
38                trace_set_err(t, TRACE_ERR_INIT_FAILED, "You must have atleast 1 processing thread");
39                return -1;
40        }
41        libtrace_vector_t *queues;
42        c->queues = calloc(sizeof(libtrace_vector_t), trace_get_perpkt_threads(t));
43        queues = c->queues;
44        for (i = 0; i < trace_get_perpkt_threads(t); ++i) {
45                libtrace_vector_init(&queues[i], sizeof(libtrace_result_t));
46        }
47        return 0;
48}
49
50static void publish(libtrace_t *trace UNUSED, int t_id, libtrace_combine_t *c, libtrace_result_t *res) {
51        libtrace_vector_t *vec = &((libtrace_vector_t*)c->queues)[t_id];
52        libtrace_vector_push_back(vec, res);
53}
54
55static void read(libtrace_t *trace UNUSED, libtrace_combine_t *c UNUSED){
56        return;
57}
58
59static int compare_result(const void* p1, const void* p2)
60{
61        const libtrace_result_t * r1 = p1;
62        const libtrace_result_t * r2 = p2;
63        if (r1->key < r2->key)
64                return -1;
65        if (r1->key == r2->key)
66                return 0;
67        else
68                return 1;
69}
70
71static void pause(libtrace_t *trace, libtrace_combine_t *c) {
72        libtrace_vector_t *queues = c->queues;
73        int i;
74        for (i = 0; i < trace_get_perpkt_threads(trace); ++i) {
75                libtrace_vector_apply_function(&queues[i], (vector_data_fn) libtrace_make_result_safe);
76        }
77}
78
79static void read_final(libtrace_t *trace, libtrace_combine_t *c) {
80        libtrace_vector_t *queues = c->queues;
81        int i;
82        size_t a;
83        // Combine all results into queue 1
84        for (i = 1; i < trace_get_perpkt_threads(trace); ++i)
85        {
86                libtrace_vector_append(&queues[0],&queues[i]);
87        }
88        // Sort them
89        libtrace_vector_qsort(&queues[0], compare_result);
90
91        for (a = 0; a < libtrace_vector_get_size(&queues[0]); ++a) {
92                libtrace_result_t r;
93                libtrace_generic_t gt = {.res = &r};
94                ASSERT_RET (libtrace_vector_get(&queues[0], a, (void *) &r), == 1);
95                if (r.type == RESULT_TICK_INTERVAL ||
96                                r.type == RESULT_TICK_COUNT) {
97                        /* Ticks are essentially useless for this combiner? */
98                        continue;
99                }
100                send_message(trace, &trace->reporter_thread, MESSAGE_RESULT,
101                                gt, NULL);
102        }
103        libtrace_vector_empty(&queues[0]);
104}
105
106static void destroy(libtrace_t *trace, libtrace_combine_t *c) {
107        int i;
108        libtrace_vector_t *queues = c->queues;
109
110        for (i = 0; i < trace_get_perpkt_threads(trace); i++) {
111                /*assert(libtrace_vector_get_size(&queues[i]) == 0);*/
112                if (libtrace_vector_get_size(&queues[i]) != 0) {
113                        trace_set_err(trace, TRACE_ERR_COMBINER,
114                                "Failed to destory queues, A thread still has data in destroy()");
115                        return;
116                }
117                libtrace_vector_destroy(&queues[i]);
118        }
119        free(queues);
120        queues = NULL;
121}
122
123DLLEXPORT const libtrace_combine_t combiner_sorted = {
124    init_combiner,      /* initialise */
125        destroy,                /* destroy */
126        publish,                /* publish */
127    read,                       /* read */
128    read_final,                 /* read_final */
129    pause,                      /* pause */
130    NULL,                       /* queues */
131    0,                          /* last_count_tick */
132    0,                          /* last_ts_tick */
133    {0}                         /* opts */
134};
Note: See TracBrowser for help on using the repository browser.