source: lib/combiner_ordered.c @ 2498008

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 2498008 was 2498008, checked in by Richard Sanger <rsangerarj@…>, 7 years ago

Refactor the combining step to allow user defined functions here.

Remove the old trace_get_results, now instead simply provide a reporter function which gets called as soon as results are ready.
The combiner function used determines the order of these results and when they are released etc.
The combiner function can be selected from those built-in or a custom version can be defined results are provided when ready.
Quickly hacked the parallel tests to work with this update, these are still a bit messy.

Also some fixes some compile warnings.

  • Property mode set to 100644
File size: 4.7 KB
Line 
1#include "libtrace.h"
2#include "libtrace_int.h"
3#include "data-struct/deque.h"
4#include <assert.h>
5#include <stdlib.h>
6
7static int init_combiner(libtrace_t *t, libtrace_combine_t *c) {
8        int i = 0;
9        assert(libtrace_get_perpkt_count(t) > 0);
10        libtrace_queue_t *queues;
11        c->queues = calloc(sizeof(libtrace_queue_t), libtrace_get_perpkt_count(t));
12        queues = c->queues;
13        for (i = 0; i < libtrace_get_perpkt_count(t); ++i) {
14                libtrace_deque_init(&queues[i], sizeof(libtrace_result_t));
15        }
16        return 0;
17}
18
19static void publish(libtrace_t *trace, int t_id, libtrace_combine_t *c, libtrace_result_t *res) {
20        libtrace_queue_t *queue = &((libtrace_queue_t*)c->queues)[t_id];
21        if (libtrace_deque_get_size(queue) >= trace->config.reporter_thold) {
22                trace_post_reporter(trace);
23        }
24        //while (libtrace_deque_get_size(&t->deque) >= 1000)
25        //      sched_yield();
26        libtrace_deque_push_back(queue, res); // Automatically locking for us :)
27}
28
29static void inline read_internal(libtrace_t *trace, libtrace_queue_t *queues, bool final){
30        int i;
31        int flags = trace->reporter_flags; // Hint these aren't changing
32        int live_count = 0;
33        bool live[libtrace_get_perpkt_count(trace)]; // Set if a trace is alive
34        uint64_t key[libtrace_get_perpkt_count(trace)]; // Cached keys
35        uint64_t min_key = UINT64_MAX;
36        int min_queue = -1;
37
38        /* Loop through check all are alive (have data) and find the smallest */
39        for (i = 0; i < libtrace_get_perpkt_count(trace); ++i) {
40                libtrace_queue_t *v = &queues[i];
41                if (libtrace_deque_get_size(v) != 0) {
42                        libtrace_result_t r;
43                        libtrace_deque_peek_front(v, (void *) &r);
44                        live_count++;
45                        live[i] = true;
46                        key[i] = libtrace_result_get_key(&r);
47                        if (i==0 || min_key > key[i]) {
48                                min_key = key[i];
49                                min_queue = i;
50                        }
51                } else {
52                        live[i] = false;
53                }
54        }
55
56        /* Now remove the smallest and loop - special case if all threads have joined we always flush what's left */
57        while ((live_count == libtrace_get_perpkt_count(trace)) || (live_count &&
58                        ((flags & REDUCE_SEQUENTIAL && min_key == trace->expected_key) ||
59                        final))) {
60                /* Get the minimum queue and then do stuff */
61                libtrace_result_t r;
62
63                ASSERT_RET (libtrace_deque_pop_front(&queues[min_queue], (void *) &r), == 1);
64                trace->reporter(trace, &r, NULL);
65
66                // We expect the key we read +1 now , todo put expected in our storage area
67                trace->expected_key = key[min_queue] + 1;
68
69                // Now update the one we just removed
70                if (libtrace_deque_get_size(&queues[min_queue]) )
71                {
72                        libtrace_deque_peek_front(&queues[min_queue], (void *) &r);
73                        key[min_queue] = libtrace_result_get_key(&r);
74                        if (key[min_queue] <= min_key) {
75                                // We are still the smallest, might be out of order though :(
76                                min_key = key[min_queue];
77                        } else {
78                                min_key = key[min_queue]; // Update our minimum
79                                // Check all find the smallest again - all are alive
80                                for (i = 0; i < libtrace_get_perpkt_count(trace); ++i) {
81                                        if (live[i] && min_key > key[i]) {
82                                                min_key = key[i];
83                                                min_queue = i;
84                                        }
85                                }
86                        }
87                } else {
88                        live[min_queue] = false;
89                        live_count--;
90                        min_key = UINT64_MAX; // Update our minimum
91                        // Check all find the smallest again - all are alive
92                        for (i = 0; i < libtrace_get_perpkt_count(trace); ++i) {
93                                // Still not 100% TODO (what if order is wrong or not increasing)
94                                if (live[i] && min_key >= key[i]) {
95                                        min_key = key[i];
96                                        min_queue = i;
97                                }
98                        }
99                }
100        }
101}
102
103static void read(libtrace_t *trace, libtrace_combine_t *c) {
104        read_internal(trace, c->queues, false);
105}
106
107static void read_final(libtrace_t *trace, libtrace_combine_t *c) {
108        read_internal(trace, c->queues, true);
109}
110
111static void destroy(libtrace_t *trace, libtrace_combine_t *c) {
112        int i;
113        libtrace_queue_t *queues = c->queues;
114
115        for (i = 0; i < libtrace_get_perpkt_count(trace); i++) {
116                assert(libtrace_deque_get_size(&queues[i]) == 0);
117        }
118        free(queues);
119        queues = NULL;
120}
121
122/** Used below in trace_make_results_packets_safe*/
123static void do_copy_result_packet(void *data)
124{
125        libtrace_result_t *res = (libtrace_result_t *)data;
126        if (res->type == RESULT_PACKET) {
127                // Duplicate the packet in standard malloc'd memory and free the
128                // original, This is a 1:1 exchange so is ocache count remains unchanged.
129                libtrace_packet_t *oldpkt, *dup;
130                oldpkt = (libtrace_packet_t *) res->value;
131                dup = trace_copy_packet(oldpkt);
132                res->value = (void *)dup;
133                trace_destroy_packet(oldpkt);
134        }
135}
136
137static void pause(libtrace_t *trace, libtrace_combine_t *c) {
138        libtrace_queue_t *queues = c->queues;
139        int i;
140        for (i = 0; i < libtrace_get_perpkt_count(trace); i++) {
141                libtrace_deque_apply_function(&queues[i], &do_copy_result_packet);
142        }
143}
144
145const libtrace_combine_t combiner_ordered = {
146    init_combiner,      /* initialise */
147        destroy,                /* destroy */
148        publish,                /* publish */
149    read,                       /* read */
150    read_final,         /* read_final */
151    pause,                      /* pause */
152    NULL,                       /* queues */
153    0                           /* opts */
154};
Note: See TracBrowser for help on using the repository browser.