source: lib/combiner_ordered.c @ 771ab22

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 771ab22 was d994324, checked in by Richard Sanger <rsangerarj@…>, 7 years ago

Remove anything to do with the combiner from set configuration and removes any options/storage related to these such as the next expected packet.

Instead this is done using trace_set_combiner now, and the for the built-in combiners.h header. This is a lot more flexible and allows the users to specify there own combiner, and any number of options for it.

  • Property mode set to 100644
File size: 4.3 KB
Line 
1#include "libtrace.h"
2#include "libtrace_int.h"
3#include "data-struct/deque.h"
4#include <assert.h>
5#include <stdlib.h>
6
7/* TODO hook up configuration option for sequentual packets again */
8
9static int init_combiner(libtrace_t *t, libtrace_combine_t *c) {
10        int i = 0;
11        assert(libtrace_get_perpkt_count(t) > 0);
12        libtrace_queue_t *queues;
13        c->queues = calloc(sizeof(libtrace_queue_t), libtrace_get_perpkt_count(t));
14        queues = c->queues;
15        for (i = 0; i < libtrace_get_perpkt_count(t); ++i) {
16                libtrace_deque_init(&queues[i], sizeof(libtrace_result_t));
17        }
18        return 0;
19}
20
21static void publish(libtrace_t *trace, int t_id, libtrace_combine_t *c, libtrace_result_t *res) {
22        libtrace_queue_t *queue = &((libtrace_queue_t*)c->queues)[t_id];
23        if (libtrace_deque_get_size(queue) >= trace->config.reporter_thold) {
24                trace_post_reporter(trace);
25        }
26        //while (libtrace_deque_get_size(&t->deque) >= 1000)
27        //      sched_yield();
28        libtrace_deque_push_back(queue, res); // Automatically locking for us :)
29}
30
31inline static void read_internal(libtrace_t *trace, libtrace_queue_t *queues, const bool final){
32        int i;
33        int live_count = 0;
34        bool live[libtrace_get_perpkt_count(trace)]; // Set if a trace is alive
35        uint64_t key[libtrace_get_perpkt_count(trace)]; // Cached keys
36        uint64_t min_key = UINT64_MAX;
37        int min_queue = -1;
38
39        /* Loop through check all are alive (have data) and find the smallest */
40        for (i = 0; i < libtrace_get_perpkt_count(trace); ++i) {
41                libtrace_queue_t *v = &queues[i];
42                if (libtrace_deque_get_size(v) != 0) {
43                        libtrace_result_t r;
44                        libtrace_deque_peek_front(v, (void *) &r);
45                        live_count++;
46                        live[i] = true;
47                        key[i] = libtrace_result_get_key(&r);
48                        if (i==0 || min_key > key[i]) {
49                                min_key = key[i];
50                                min_queue = i;
51                        }
52                } else {
53                        live[i] = false;
54                }
55        }
56
57        /* Now remove the smallest and loop - special case if all threads have joined we always flush what's left */
58        while ((live_count == libtrace_get_perpkt_count(trace)) || (live_count && final)) {
59                // || (live_count && ((flags & REDUCE_SEQUENTIAL && min_key == trace->expected_key)))
60                /* Get the minimum queue and then do stuff */
61                libtrace_result_t r;
62
63                ASSERT_RET (libtrace_deque_pop_front(&queues[min_queue], (void *) &r), == 1);
64                trace->reporter(trace, &r, NULL);
65
66                // We expect the key we read +1 now , todo put expected in our storage area
67                //trace->expected_key = key[min_queue] + 1;
68
69                // Now update the one we just removed
70                if (libtrace_deque_get_size(&queues[min_queue]) )
71                {
72                        libtrace_deque_peek_front(&queues[min_queue], (void *) &r);
73                        key[min_queue] = libtrace_result_get_key(&r);
74                        if (key[min_queue] <= min_key) {
75                                // We are still the smallest, might be out of order though :(
76                                min_key = key[min_queue];
77                        } else {
78                                min_key = key[min_queue]; // Update our minimum
79                                // Check all find the smallest again - all are alive
80                                for (i = 0; i < libtrace_get_perpkt_count(trace); ++i) {
81                                        if (live[i] && min_key > key[i]) {
82                                                min_key = key[i];
83                                                min_queue = i;
84                                        }
85                                }
86                        }
87                } else {
88                        live[min_queue] = false;
89                        live_count--;
90                        min_key = UINT64_MAX; // Update our minimum
91                        // Check all find the smallest again - all are alive
92                        for (i = 0; i < libtrace_get_perpkt_count(trace); ++i) {
93                                // Still not 100% TODO (what if order is wrong or not increasing)
94                                if (live[i] && min_key >= key[i]) {
95                                        min_key = key[i];
96                                        min_queue = i;
97                                }
98                        }
99                }
100        }
101}
102
103static void read(libtrace_t *trace, libtrace_combine_t *c) {
104        read_internal(trace, c->queues, false);
105}
106
107static void read_final(libtrace_t *trace, libtrace_combine_t *c) {
108        read_internal(trace, c->queues, true);
109}
110
111static void destroy(libtrace_t *trace, libtrace_combine_t *c) {
112        int i;
113        libtrace_queue_t *queues = c->queues;
114
115        for (i = 0; i < libtrace_get_perpkt_count(trace); i++) {
116                assert(libtrace_deque_get_size(&queues[i]) == 0);
117        }
118        free(queues);
119        queues = NULL;
120}
121
122
123static void pause(libtrace_t *trace, libtrace_combine_t *c) {
124        libtrace_queue_t *queues = c->queues;
125        int i;
126        for (i = 0; i < libtrace_get_perpkt_count(trace); i++) {
127                libtrace_deque_apply_function(&queues[i], (deque_data_fn) libtrace_make_result_safe);
128        }
129}
130
131DLLEXPORT const libtrace_combine_t combiner_ordered = {
132    init_combiner,      /* initialise */
133        destroy,                /* destroy */
134        publish,                /* publish */
135    read,                       /* read */
136    read_final,         /* read_final */
137    pause,                      /* pause */
138    NULL,                       /* queues */
139    {0}                         /* opts */
140};
Note: See TracBrowser for help on using the repository browser.