source: lib/combiner_unordered.c @ f01c479

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since f01c479 was f01c479, checked in by Richard Sanger <rsangerarj@…>, 6 years ago

Move reporter threashold check to after adding the next item, so it makes more sense
in the case the threashold is set to 1.

Add logic to the order combiner allowing multiple results with the same key
to be sent even if all queues are not full.

  • Property mode set to 100644
File size: 1.8 KB
Line 
1#include "libtrace.h"
2#include "libtrace_int.h"
3#include "data-struct/deque.h"
4#include <assert.h>
5#include <stdlib.h>
6
7static int init_combiner(libtrace_t *t, libtrace_combine_t *c) {
8        int i = 0;
9        assert(libtrace_get_perpkt_count(t) > 0);
10        libtrace_queue_t *queues;
11        c->queues = calloc(sizeof(libtrace_queue_t), libtrace_get_perpkt_count(t));
12        queues = c->queues;
13        for (i = 0; i < libtrace_get_perpkt_count(t); ++i) {
14                libtrace_deque_init(&queues[i], sizeof(libtrace_result_t));
15        }
16        return 0;
17}
18
19static void publish(libtrace_t *trace, int t_id, libtrace_combine_t *c, libtrace_result_t *res) {
20        libtrace_queue_t *queue = &((libtrace_queue_t*)c->queues)[t_id];
21        libtrace_deque_push_back(queue, res); // Automatically locking for us :)
22
23        if (libtrace_deque_get_size(queue) >= trace->config.reporter_thold) {
24                trace_post_reporter(trace);
25        }
26}
27
28static void read(libtrace_t *trace, libtrace_combine_t *c){
29        libtrace_queue_t *queues = c->queues;
30        int i;
31
32        /* Loop through and read all that are here */
33        for (i = 0; i < libtrace_get_perpkt_count(trace); ++i) {
34                libtrace_queue_t *v = &queues[i];
35                while (libtrace_deque_get_size(v) != 0) {
36                        libtrace_result_t r;
37                        libtrace_generic_t gt = {.res = &r};
38                        ASSERT_RET (libtrace_deque_pop_front(v, (void *) &r), == 1);
39                        trace->reporter(trace, MESSAGE_RESULT, gt, &trace->reporter_thread);
40                }
41        }
42}
43
44static void destroy(libtrace_t *trace, libtrace_combine_t *c) {
45        int i;
46        libtrace_queue_t *queues = c->queues;
47
48        for (i = 0; i < libtrace_get_perpkt_count(trace); i++) {
49                assert(libtrace_deque_get_size(&queues[i]) == 0);
50        }
51        free(queues);
52        queues = NULL;
53}
54
55DLLEXPORT const libtrace_combine_t combiner_unordered = {
56    init_combiner,      /* initialise */
57        destroy,                /* destroy */
58        publish,                /* publish */
59    read,                       /* read */
60    read,                       /* read_final */
61    read,                       /* pause */
62    NULL,                       /* queues */
63    {0}                         /* opts */
64};
Note: See TracBrowser for help on using the repository browser.