source: lib/combiner_unordered.c @ d994324

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since d994324 was d994324, checked in by Richard Sanger <rsangerarj@…>, 7 years ago

Remove anything to do with the combiner from set configuration and removes any options/storage related to these such as the next expected packet.

Instead this is done using trace_set_combiner now, and the for the built-in combiners.h header. This is a lot more flexible and allows the users to specify there own combiner, and any number of options for it.

  • Property mode set to 100644
File size: 1.8 KB
Line 
1#include "libtrace.h"
2#include "libtrace_int.h"
3#include "data-struct/deque.h"
4#include <assert.h>
5#include <stdlib.h>
6
7static int init_combiner(libtrace_t *t, libtrace_combine_t *c) {
8        int i = 0;
9        assert(libtrace_get_perpkt_count(t) > 0);
10        libtrace_queue_t *queues;
11        c->queues = calloc(sizeof(libtrace_queue_t), libtrace_get_perpkt_count(t));
12        queues = c->queues;
13        for (i = 0; i < libtrace_get_perpkt_count(t); ++i) {
14                libtrace_deque_init(&queues[i], sizeof(libtrace_result_t));
15        }
16        return 0;
17}
18
19static void publish(libtrace_t *trace, int t_id, libtrace_combine_t *c, libtrace_result_t *res) {
20        libtrace_queue_t *queue = &((libtrace_queue_t*)c->queues)[t_id];
21        if (libtrace_deque_get_size(queue) >= trace->config.reporter_thold) {
22                trace_post_reporter(trace);
23        }
24        libtrace_deque_push_back(queue, res); // Automatically locking for us :)
25}
26
27static void read(libtrace_t *trace, libtrace_combine_t *c){
28        libtrace_queue_t *queues = c->queues;
29        int i;
30
31        /* Loop through and read all that are here */
32        for (i = 0; i < libtrace_get_perpkt_count(trace); ++i) {
33                libtrace_queue_t *v = &queues[i];
34                while (libtrace_deque_get_size(v) != 0) {
35                        libtrace_result_t r;
36                        ASSERT_RET (libtrace_deque_pop_front(v, (void *) &r), == 1);
37                        trace->reporter(trace, &r, NULL);
38                }
39        }
40}
41
42static void destroy(libtrace_t *trace, libtrace_combine_t *c) {
43        int i;
44        libtrace_queue_t *queues = c->queues;
45
46        for (i = 0; i < libtrace_get_perpkt_count(trace); i++) {
47                assert(libtrace_deque_get_size(&queues[i]) == 0);
48        }
49        free(queues);
50        queues = NULL;
51}
52
53DLLEXPORT const libtrace_combine_t combiner_unordered = {
54    init_combiner,      /* initialise */
55        destroy,                /* destroy */
56        publish,                /* publish */
57    read,                       /* read */
58    read,                       /* read_final */
59    read,                       /* pause */
60    NULL,                       /* queues */
61    {0}                         /* opts */
62};
Note: See TracBrowser for help on using the repository browser.