source: lib/combiner_unordered.c @ 89e2ff7

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivendag_formatrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 89e2ff7 was ee6e802, checked in by Shane Alcock <salcock@…>, 5 years ago

Updated copyright blurb on all source files

In some cases, this meant adding copyright blurbs to files that
had never had them before.

  • Property mode set to 100644
File size: 3.5 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27
28#include "libtrace.h"
29#include "libtrace_int.h"
30#include "data-struct/deque.h"
31#include <assert.h>
32#include <stdlib.h>
33
34static int init_combiner(libtrace_t *t, libtrace_combine_t *c) {
35        int i = 0;
36        assert(trace_get_perpkt_threads(t) > 0);
37        libtrace_queue_t *queues;
38        c->queues = calloc(sizeof(libtrace_queue_t), trace_get_perpkt_threads(t));
39        queues = c->queues;
40        for (i = 0; i < trace_get_perpkt_threads(t); ++i) {
41                libtrace_deque_init(&queues[i], sizeof(libtrace_result_t));
42        }
43        return 0;
44}
45
46static void publish(libtrace_t *trace, int t_id, libtrace_combine_t *c, libtrace_result_t *res) {
47        libtrace_queue_t *queue = &((libtrace_queue_t*)c->queues)[t_id];
48        libtrace_deque_push_back(queue, res); // Automatically locking for us :)
49
50        if (libtrace_deque_get_size(queue) >= trace->config.reporter_thold) {
51                trace_post_reporter(trace);
52        }
53}
54
55static void read(libtrace_t *trace, libtrace_combine_t *c){
56        libtrace_queue_t *queues = c->queues;
57        int i;
58
59        /* Loop through and read all that are here */
60        for (i = 0; i < trace_get_perpkt_threads(trace); ++i) {
61                libtrace_queue_t *v = &queues[i];
62                while (libtrace_deque_get_size(v) != 0) {
63                        libtrace_result_t r;
64                        libtrace_generic_t gt = {.res = &r};
65                        ASSERT_RET (libtrace_deque_pop_front(v, (void *) &r), == 1);
66                        /* Ignore any ticks that we've already seen */
67                        if (r.type == RESULT_TICK_INTERVAL) {
68                                if (r.key <= c->last_ts_tick)
69                                        continue;
70                                c->last_ts_tick = r.key;
71                        }
72
73                        if (r.type == RESULT_TICK_COUNT) {
74                                if (r.key <= c->last_count_tick)
75                                        continue;
76                                c->last_count_tick = r.key;
77                        }
78                        send_message(trace, &trace->reporter_thread,
79                                MESSAGE_RESULT, gt, NULL);
80                }
81        }
82}
83
84static void destroy(libtrace_t *trace, libtrace_combine_t *c) {
85        int i;
86        libtrace_queue_t *queues = c->queues;
87
88        for (i = 0; i < trace_get_perpkt_threads(trace); i++) {
89                assert(libtrace_deque_get_size(&queues[i]) == 0);
90        }
91        free(queues);
92        queues = NULL;
93}
94
95DLLEXPORT const libtrace_combine_t combiner_unordered = {
96    init_combiner,      /* initialise */
97        destroy,                /* destroy */
98        publish,                /* publish */
99    read,                       /* read */
100    read,                       /* read_final */
101    read,                       /* pause */
102    NULL,                       /* queues */
103    0,                          /* last_count_tick */
104    0,                          /* last_ts_tick */
105    {0}                         /* opts */
106};
Note: See TracBrowser for help on using the repository browser.