source: lib/combiner_ordered.c @ 5e43b8b

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 5e43b8b was f01c479, checked in by Richard Sanger <rsangerarj@…>, 7 years ago

Move reporter threashold check to after adding the next item, so it makes more sense
in the case the threashold is set to 1.

Add logic to the order combiner allowing multiple results with the same key
to be sent even if all queues are not full.

  • Property mode set to 100644
File size: 4.2 KB
Line 
1#include "libtrace.h"
2#include "libtrace_int.h"
3#include "data-struct/deque.h"
4#include <assert.h>
5#include <stdlib.h>
6
7/* TODO hook up configuration option for sequentual packets again */
8
9static int init_combiner(libtrace_t *t, libtrace_combine_t *c) {
10        int i = 0;
11        assert(libtrace_get_perpkt_count(t) > 0);
12        libtrace_queue_t *queues;
13        c->queues = calloc(sizeof(libtrace_queue_t), libtrace_get_perpkt_count(t));
14        queues = c->queues;
15        for (i = 0; i < libtrace_get_perpkt_count(t); ++i) {
16                libtrace_deque_init(&queues[i], sizeof(libtrace_result_t));
17        }
18        return 0;
19}
20
21static void publish(libtrace_t *trace, int t_id, libtrace_combine_t *c, libtrace_result_t *res) {
22        libtrace_queue_t *queue = &((libtrace_queue_t*)c->queues)[t_id];
23        //while (libtrace_deque_get_size(&t->deque) >= 1000)
24        //      sched_yield();
25        libtrace_deque_push_back(queue, res); // Automatically locking for us :)
26
27        if (libtrace_deque_get_size(queue) >= trace->config.reporter_thold) {
28                trace_post_reporter(trace);
29        }
30}
31
32inline static void read_internal(libtrace_t *trace, libtrace_queue_t *queues, const bool final){
33        int i;
34        int live_count = 0;
35        bool live[libtrace_get_perpkt_count(trace)]; // Set if a trace is alive
36        uint64_t key[libtrace_get_perpkt_count(trace)]; // Cached keys
37        uint64_t min_key = UINT64_MAX;
38        uint64_t prev_min = 0;
39        int min_queue = -1;
40
41        /* Loop through check all are alive (have data) and find the smallest */
42        for (i = 0; i < libtrace_get_perpkt_count(trace); ++i) {
43                libtrace_queue_t *v = &queues[i];
44                if (libtrace_deque_get_size(v) != 0) {
45                        libtrace_result_t r;
46                        libtrace_deque_peek_front(v, (void *) &r);
47                        live_count++;
48                        live[i] = true;
49                        key[i] = r.key;
50                        if (i==0 || min_key > key[i]) {
51                                min_key = key[i];
52                                min_queue = i;
53                        }
54                } else {
55                        live[i] = false;
56                }
57        }
58
59        /* Now remove the smallest and loop - special case if all threads have
60         * joined we always flush what's left. Or the next smallest is the same
61         * value or less than the previous */
62        while ((live_count == libtrace_get_perpkt_count(trace)) || (live_count && final)
63               || (live_count && prev_min >= min_key)) {
64                /* Get the minimum queue and then do stuff */
65                libtrace_result_t r;
66                libtrace_generic_t gt = {.res = &r};
67
68                ASSERT_RET (libtrace_deque_pop_front(&queues[min_queue], (void *) &r), == 1);
69                trace->reporter(trace, MESSAGE_RESULT, gt, &trace->reporter_thread);
70
71                // Now update the one we just removed
72                if (libtrace_deque_get_size(&queues[min_queue]) )
73                {
74                        libtrace_deque_peek_front(&queues[min_queue], (void *) &r);
75                        key[min_queue] = r.key;
76                        if (key[min_queue] <= min_key) {
77                                // We are still the smallest, might be out of order though :(
78                                min_key = key[min_queue];
79                        } else {
80                                min_key = key[min_queue]; // Update our minimum
81                                // Check all find the smallest again - all are alive
82                                for (i = 0; i < libtrace_get_perpkt_count(trace); ++i) {
83                                        if (live[i] && min_key > key[i]) {
84                                                min_key = key[i];
85                                                min_queue = i;
86                                        }
87                                }
88                        }
89                } else {
90                        live[min_queue] = false;
91                        live_count--;
92                        prev_min = min_key;
93                        min_key = UINT64_MAX; // Update our minimum
94                        // Check all find the smallest again - all are alive
95                        for (i = 0; i < libtrace_get_perpkt_count(trace); ++i) {
96                                if (live[i] && min_key >= key[i]) {
97                                        min_key = key[i];
98                                        min_queue = i;
99                                }
100                        }
101                }
102        }
103}
104
105static void read(libtrace_t *trace, libtrace_combine_t *c) {
106        read_internal(trace, c->queues, false);
107}
108
109static void read_final(libtrace_t *trace, libtrace_combine_t *c) {
110        read_internal(trace, c->queues, true);
111}
112
113static void destroy(libtrace_t *trace, libtrace_combine_t *c) {
114        int i;
115        libtrace_queue_t *queues = c->queues;
116
117        for (i = 0; i < libtrace_get_perpkt_count(trace); i++) {
118                assert(libtrace_deque_get_size(&queues[i]) == 0);
119        }
120        free(queues);
121        queues = NULL;
122}
123
124
125static void pause(libtrace_t *trace, libtrace_combine_t *c) {
126        libtrace_queue_t *queues = c->queues;
127        int i;
128        for (i = 0; i < libtrace_get_perpkt_count(trace); i++) {
129                libtrace_deque_apply_function(&queues[i], (deque_data_fn) libtrace_make_result_safe);
130        }
131}
132
133DLLEXPORT const libtrace_combine_t combiner_ordered = {
134        init_combiner,  /* initialise */
135        destroy,                /* destroy */
136        publish,                /* publish */
137        read,                   /* read */
138        read_final,             /* read_final */
139        pause,                  /* pause */
140        NULL,                   /* queues */
141        {0}                             /* opts */
142};
Note: See TracBrowser for help on using the repository browser.