source: lib/combiner_ordered.c @ 62b3c4e

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 62b3c4e was 62b3c4e, checked in by Richard Sanger <rsangerarj@…>, 6 years ago

Update the reporter method to match with the per_pkt method.

We now treat a result as a type of message.

  • Property mode set to 100644
File size: 4.4 KB
Line 
1#include "libtrace.h"
2#include "libtrace_int.h"
3#include "data-struct/deque.h"
4#include <assert.h>
5#include <stdlib.h>
6
7/* TODO hook up configuration option for sequentual packets again */
8
9static int init_combiner(libtrace_t *t, libtrace_combine_t *c) {
10        int i = 0;
11        assert(libtrace_get_perpkt_count(t) > 0);
12        libtrace_queue_t *queues;
13        c->queues = calloc(sizeof(libtrace_queue_t), libtrace_get_perpkt_count(t));
14        queues = c->queues;
15        for (i = 0; i < libtrace_get_perpkt_count(t); ++i) {
16                libtrace_deque_init(&queues[i], sizeof(libtrace_result_t));
17        }
18        return 0;
19}
20
21static void publish(libtrace_t *trace, int t_id, libtrace_combine_t *c, libtrace_result_t *res) {
22        libtrace_queue_t *queue = &((libtrace_queue_t*)c->queues)[t_id];
23        if (libtrace_deque_get_size(queue) >= trace->config.reporter_thold) {
24                trace_post_reporter(trace);
25        }
26        //while (libtrace_deque_get_size(&t->deque) >= 1000)
27        //      sched_yield();
28        libtrace_deque_push_back(queue, res); // Automatically locking for us :)
29}
30
31inline static void read_internal(libtrace_t *trace, libtrace_queue_t *queues, const bool final){
32        int i;
33        int live_count = 0;
34        bool live[libtrace_get_perpkt_count(trace)]; // Set if a trace is alive
35        uint64_t key[libtrace_get_perpkt_count(trace)]; // Cached keys
36        uint64_t min_key = UINT64_MAX;
37        int min_queue = -1;
38
39        /* Loop through check all are alive (have data) and find the smallest */
40        for (i = 0; i < libtrace_get_perpkt_count(trace); ++i) {
41                libtrace_queue_t *v = &queues[i];
42                if (libtrace_deque_get_size(v) != 0) {
43                        libtrace_result_t r;
44                        libtrace_deque_peek_front(v, (void *) &r);
45                        live_count++;
46                        live[i] = true;
47                        key[i] = libtrace_result_get_key(&r);
48                        if (i==0 || min_key > key[i]) {
49                                min_key = key[i];
50                                min_queue = i;
51                        }
52                } else {
53                        live[i] = false;
54                }
55        }
56
57        /* Now remove the smallest and loop - special case if all threads have joined we always flush what's left */
58        while ((live_count == libtrace_get_perpkt_count(trace)) || (live_count && final)) {
59                // || (live_count && ((flags & REDUCE_SEQUENTIAL && min_key == trace->expected_key)))
60                /* Get the minimum queue and then do stuff */
61                libtrace_result_t r;
62                libtrace_generic_t gt = {.res = &r};
63
64                ASSERT_RET (libtrace_deque_pop_front(&queues[min_queue], (void *) &r), == 1);
65                trace->reporter(trace, MESSAGE_RESULT, gt, &trace->reporter_thread);
66
67                // We expect the key we read +1 now , todo put expected in our storage area
68                //trace->expected_key = key[min_queue] + 1;
69
70                // Now update the one we just removed
71                if (libtrace_deque_get_size(&queues[min_queue]) )
72                {
73                        libtrace_deque_peek_front(&queues[min_queue], (void *) &r);
74                        key[min_queue] = libtrace_result_get_key(&r);
75                        if (key[min_queue] <= min_key) {
76                                // We are still the smallest, might be out of order though :(
77                                min_key = key[min_queue];
78                        } else {
79                                min_key = key[min_queue]; // Update our minimum
80                                // Check all find the smallest again - all are alive
81                                for (i = 0; i < libtrace_get_perpkt_count(trace); ++i) {
82                                        if (live[i] && min_key > key[i]) {
83                                                min_key = key[i];
84                                                min_queue = i;
85                                        }
86                                }
87                        }
88                } else {
89                        live[min_queue] = false;
90                        live_count--;
91                        min_key = UINT64_MAX; // Update our minimum
92                        // Check all find the smallest again - all are alive
93                        for (i = 0; i < libtrace_get_perpkt_count(trace); ++i) {
94                                // Still not 100% TODO (what if order is wrong or not increasing)
95                                if (live[i] && min_key >= key[i]) {
96                                        min_key = key[i];
97                                        min_queue = i;
98                                }
99                        }
100                }
101        }
102}
103
104static void read(libtrace_t *trace, libtrace_combine_t *c) {
105        read_internal(trace, c->queues, false);
106}
107
108static void read_final(libtrace_t *trace, libtrace_combine_t *c) {
109        read_internal(trace, c->queues, true);
110}
111
112static void destroy(libtrace_t *trace, libtrace_combine_t *c) {
113        int i;
114        libtrace_queue_t *queues = c->queues;
115
116        for (i = 0; i < libtrace_get_perpkt_count(trace); i++) {
117                assert(libtrace_deque_get_size(&queues[i]) == 0);
118        }
119        free(queues);
120        queues = NULL;
121}
122
123
124static void pause(libtrace_t *trace, libtrace_combine_t *c) {
125        libtrace_queue_t *queues = c->queues;
126        int i;
127        for (i = 0; i < libtrace_get_perpkt_count(trace); i++) {
128                libtrace_deque_apply_function(&queues[i], (deque_data_fn) libtrace_make_result_safe);
129        }
130}
131
132DLLEXPORT const libtrace_combine_t combiner_ordered = {
133    init_combiner,      /* initialise */
134        destroy,                /* destroy */
135        publish,                /* publish */
136    read,                       /* read */
137    read_final,         /* read_final */
138    pause,                      /* pause */
139    NULL,                       /* queues */
140    {0}                         /* opts */
141};
Note: See TracBrowser for help on using the repository browser.