source: lib/combiner_unordered.c @ 62b3c4e

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 62b3c4e was 62b3c4e, checked in by Richard Sanger <rsangerarj@…>, 7 years ago

Update the reporter method to match with the per_pkt method.

We now treat a result as a type of message.

  • Property mode set to 100644
File size: 1.8 KB
Line 
1#include "libtrace.h"
2#include "libtrace_int.h"
3#include "data-struct/deque.h"
4#include <assert.h>
5#include <stdlib.h>
6
7static int init_combiner(libtrace_t *t, libtrace_combine_t *c) {
8        int i = 0;
9        assert(libtrace_get_perpkt_count(t) > 0);
10        libtrace_queue_t *queues;
11        c->queues = calloc(sizeof(libtrace_queue_t), libtrace_get_perpkt_count(t));
12        queues = c->queues;
13        for (i = 0; i < libtrace_get_perpkt_count(t); ++i) {
14                libtrace_deque_init(&queues[i], sizeof(libtrace_result_t));
15        }
16        return 0;
17}
18
19static void publish(libtrace_t *trace, int t_id, libtrace_combine_t *c, libtrace_result_t *res) {
20        libtrace_queue_t *queue = &((libtrace_queue_t*)c->queues)[t_id];
21        if (libtrace_deque_get_size(queue) >= trace->config.reporter_thold) {
22                trace_post_reporter(trace);
23        }
24        libtrace_deque_push_back(queue, res); // Automatically locking for us :)
25}
26
27static void read(libtrace_t *trace, libtrace_combine_t *c){
28        libtrace_queue_t *queues = c->queues;
29        int i;
30
31        /* Loop through and read all that are here */
32        for (i = 0; i < libtrace_get_perpkt_count(trace); ++i) {
33                libtrace_queue_t *v = &queues[i];
34                while (libtrace_deque_get_size(v) != 0) {
35                        libtrace_result_t r;
36                        libtrace_generic_t gt = {.res = &r};
37                        ASSERT_RET (libtrace_deque_pop_front(v, (void *) &r), == 1);
38                        trace->reporter(trace, MESSAGE_RESULT, gt, &trace->reporter_thread);
39                }
40        }
41}
42
43static void destroy(libtrace_t *trace, libtrace_combine_t *c) {
44        int i;
45        libtrace_queue_t *queues = c->queues;
46
47        for (i = 0; i < libtrace_get_perpkt_count(trace); i++) {
48                assert(libtrace_deque_get_size(&queues[i]) == 0);
49        }
50        free(queues);
51        queues = NULL;
52}
53
54DLLEXPORT const libtrace_combine_t combiner_unordered = {
55    init_combiner,      /* initialise */
56        destroy,                /* destroy */
57        publish,                /* publish */
58    read,                       /* read */
59    read,                       /* read_final */
60    read,                       /* pause */
61    NULL,                       /* queues */
62    {0}                         /* opts */
63};
Note: See TracBrowser for help on using the repository browser.