source: lib/combiner_ordered.c @ a31e166

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since a31e166 was a31e166, checked in by Shane Alcock <salcock@…>, 6 years ago

Rename libtrace_get_perpkt_count to trace_get_perpkt_threads

This is to be consistent with trace_set_perpkt_threads.

Remove 'packet' parameter from the first_packet callback.
trace_get_first_packet should be used to get the first packet instead.

  • Property mode set to 100644
File size: 8.9 KB
Line 
1#include "libtrace.h"
2#include "libtrace_int.h"
3#include "data-struct/deque.h"
4#include <assert.h>
5#include <stdlib.h>
6
7/* TODO hook up configuration option for sequentual packets again */
8
9static int init_combiner(libtrace_t *t, libtrace_combine_t *c) {
10        int i = 0;
11        assert(trace_get_perpkt_threads(t) > 0);
12        libtrace_queue_t *queues;
13        c->queues = calloc(sizeof(libtrace_queue_t), trace_get_perpkt_threads(t));
14        queues = c->queues;
15        for (i = 0; i < trace_get_perpkt_threads(t); ++i) {
16                libtrace_deque_init(&queues[i], sizeof(libtrace_result_t));
17        }
18        return 0;
19}
20
21static void publish(libtrace_t *trace, int t_id, libtrace_combine_t *c, libtrace_result_t *res) {
22        libtrace_queue_t *queue = &((libtrace_queue_t*)c->queues)[t_id];
23        //while (libtrace_deque_get_size(&t->deque) >= 1000)
24        //      sched_yield();
25        libtrace_deque_push_back(queue, res); // Automatically locking for us :)
26
27        if (libtrace_deque_get_size(queue) >= trace->config.reporter_thold) {
28                trace_post_reporter(trace);
29        }
30}
31
32inline static uint64_t next_message(libtrace_queue_t *v) {
33
34        libtrace_result_t r;
35        if (libtrace_deque_peek_front(v, (void *) &r) == 0)
36                return 0;
37
38        if (r.type == RESULT_TICK_INTERVAL || r.type == RESULT_TICK_COUNT)
39                return 0;
40
41        return r.key;
42}
43
44inline static int peek_queue(libtrace_t *trace, libtrace_combine_t *c,
45                libtrace_queue_t *v, uint64_t *key) {
46
47        libtrace_result_t r;
48        libtrace_deque_peek_front(v, (void *) &r);
49
50        /* Ticks are a bit tricky, because we can get TS
51         * ticks in amongst packets indexed by their cardinal
52         * order and vice versa. Also, every thread will
53         * produce an equivalent tick and we should really
54         * combine those into a single tick for the reporter
55         * thread.
56         */
57
58        if (r.type == RESULT_TICK_INTERVAL) {
59                if (r.key > c->last_ts_tick) {
60                        c->last_ts_tick = r.key;
61
62                        /* Tick doesn't match packet order */
63                        if (!trace_is_parallel(trace)) {
64                                /* Pass straight to reporter */
65                                libtrace_generic_t gt = {.res = &r};
66                                ASSERT_RET (libtrace_deque_pop_front(v, (void *) &r), == 1);
67                                send_message(trace, &trace->reporter_thread,
68                                                MESSAGE_RESULT, gt,
69                                                &trace->reporter_thread);
70                                return 0;
71                        }
72                        /* Tick matches packet order */
73                        *key = r.key;
74                        return 1;
75
76                } else {
77                        /* Duplicate -- pop it */
78                        ASSERT_RET (libtrace_deque_pop_front(v, (void *) &r), == 1);
79                        return 0;
80                }
81        }
82
83        if (r.type == RESULT_TICK_COUNT) {
84                if (r.key > c->last_count_tick) {
85                        c->last_count_tick = r.key;
86
87                        /* Tick doesn't match packet order */
88                        if (trace_is_parallel(trace)) {
89                                /* Pass straight to reporter */
90                                libtrace_generic_t gt = {.res = &r};
91                                ASSERT_RET (libtrace_deque_pop_front(v, (void *) &r), == 1);
92                                send_message(trace, &trace->reporter_thread,
93                                                MESSAGE_RESULT, gt,
94                                                &trace->reporter_thread);
95                                return 0;
96                        }
97                        /* Tick matches packet order */
98                        *key = r.key;
99                        return 1;
100
101                        /* Tick doesn't match packet order */
102                } else {
103                        /* Duplicate -- pop it */
104                        ASSERT_RET (libtrace_deque_pop_front(v, (void *) &r), == 1);
105                        return 0;
106                }
107        }
108       
109        *key = r.key;
110        return 1;
111}
112
113inline static void read_internal(libtrace_t *trace, libtrace_combine_t *c, const bool final){
114        int i;
115        int live_count = 0;
116        libtrace_queue_t *queues = c->queues;
117        bool allactive = true;
118        bool live[trace_get_perpkt_threads(trace)]; // Set if a trace is alive
119        uint64_t key[trace_get_perpkt_threads(trace)]; // Cached keys
120        uint64_t min_key = UINT64_MAX;
121        uint64_t prev_min = 0;
122        uint64_t peeked = 0;
123        int min_queue = -1;
124
125        /* Loop through check all are alive (have data) and find the smallest */
126        for (i = 0; i < trace_get_perpkt_threads(trace); ++i) {
127                libtrace_queue_t *v = &queues[i];
128                if (libtrace_deque_get_size(v) != 0) {
129                        if (peek_queue(trace, c, v, &peeked)) {
130                                live_count ++;
131                                live[i] = true;
132                                key[i] = peeked;
133                                if (i == 0 || min_key > peeked) {
134                                        min_key = peeked;
135                                        min_queue = i;
136                                }
137                        } else {
138                                live[i] = false;
139                                key[i] = 0;
140                        }
141                } else {
142                        allactive = false;
143                        live[i] = false;
144                        key[i] = 0;
145                }
146        }
147
148        /* Now remove the smallest and loop - special case if all threads have
149         * joined we always flush what's left. Or the next smallest is the same
150         * value or less than the previous */
151        while ((allactive && min_queue != -1) || (live_count && final)
152               || (live_count && prev_min >= min_key)) {
153                /* Get the minimum queue and then do stuff */
154                libtrace_result_t r;
155                libtrace_generic_t gt = {.res = &r};
156
157                ASSERT_RET (libtrace_deque_pop_front(&queues[min_queue], (void *) &r), == 1);
158                send_message(trace, &trace->reporter_thread,
159                                MESSAGE_RESULT, gt,
160                                NULL);
161
162                // Now update the one we just removed
163                peeked = next_message(&queues[min_queue]);
164                if (libtrace_deque_get_size(&queues[min_queue]) &&
165                                peeked != 0) {
166
167                        key[min_queue] = peeked;
168                        // We are still the smallest, might be out of order :(
169                        if (key[min_queue] <= min_key) {
170                                min_key = key[min_queue];
171                        } else {
172                                min_key = key[min_queue]; // Update our minimum
173                                // Check all find the smallest again - all are alive
174                                for (i = 0; i < trace_get_perpkt_threads(trace); ++i) {
175                                        if (live[i] && min_key >= key[i]) {
176                                                min_key = key[i];
177                                                min_queue = i;
178                                        }
179                                }
180                        }
181                } else {
182                        allactive = false;
183                        live[min_queue] = false;
184                        live_count--;
185                        prev_min = min_key;
186                        min_key = UINT64_MAX; // Update our minimum
187                        // Check all find the smallest again - all are alive
188                        for (i = 0; i < trace_get_perpkt_threads(trace); ++i) {
189                                if (live[i] && min_key >= key[i]) {
190                                        min_key = key[i];
191                                        min_queue = i;
192                                }
193                        }
194                }
195        }
196}
197
198static void read(libtrace_t *trace, libtrace_combine_t *c) {
199        read_internal(trace, c, false);
200}
201
202static void read_final(libtrace_t *trace, libtrace_combine_t *c) {
203        int empty = 0, i;
204        libtrace_queue_t *q = c->queues;
205
206        do {
207                read_internal(trace, c, true);
208                empty = 0;
209                for (i = 0; i < trace_get_perpkt_threads(trace); ++i) {
210                        if (libtrace_deque_get_size(&q[i]) == 0)
211                                empty ++;
212                }
213        }
214        while (empty < trace_get_perpkt_threads(trace));
215}
216
217static void destroy(libtrace_t *trace, libtrace_combine_t *c) {
218        int i;
219        libtrace_queue_t *queues = c->queues;
220
221        for (i = 0; i < trace_get_perpkt_threads(trace); i++) {
222                assert(libtrace_deque_get_size(&queues[i]) == 0);
223        }
224        free(queues);
225        queues = NULL;
226}
227
228
229static void pause(libtrace_t *trace, libtrace_combine_t *c) {
230        libtrace_queue_t *queues = c->queues;
231        int i;
232        for (i = 0; i < trace_get_perpkt_threads(trace); i++) {
233                libtrace_deque_apply_function(&queues[i], (deque_data_fn) libtrace_make_result_safe);
234        }
235}
236
237DLLEXPORT const libtrace_combine_t combiner_ordered = {
238        init_combiner,  /* initialise */
239        destroy,                /* destroy */
240        publish,                /* publish */
241        read,                   /* read */
242        read_final,             /* read_final */
243        pause,                  /* pause */
244        NULL,                   /* queues */
245        0,                      /* last_count_tick */
246        0,                      /* last_ts_tick */
247        {0}                             /* opts */
248};
Note: See TracBrowser for help on using the repository browser.