source: lib/combiner_unordered.c @ 322c516

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 322c516 was f625817, checked in by Shane Alcock <salcock@…>, 6 years ago

Reworked callback API and removed old per_msg and reporter functions

Updated tracertstats to use the new callback API.

Extended the callback approach to the reporter thread as well as the per
packet threads.

Added libtrace_callback_set_t structure, which is used to register the
user callback functions.

Added callback functionality for MESSAGE_RESULT (needed now that reporter
threads also do callbacks) and MESSAGE_USER (for user-defined messages). The
MESSAGE_USER callback is essentially the same as the old per_msg function
style.

Updated combiners to use send_message to pass results to the reporter thread.
send_message itself is now no longer static, so that combiners can use it.

Disabled building of tracestats_parallel as it was using the older version
of the callback API. Will update in a future commit.

  • Property mode set to 100644
File size: 2.6 KB
Line 
1#include "libtrace.h"
2#include "libtrace_int.h"
3#include "data-struct/deque.h"
4#include <assert.h>
5#include <stdlib.h>
6
7static int init_combiner(libtrace_t *t, libtrace_combine_t *c) {
8        int i = 0;
9        assert(libtrace_get_perpkt_count(t) > 0);
10        libtrace_queue_t *queues;
11        c->queues = calloc(sizeof(libtrace_queue_t), libtrace_get_perpkt_count(t));
12        queues = c->queues;
13        for (i = 0; i < libtrace_get_perpkt_count(t); ++i) {
14                libtrace_deque_init(&queues[i], sizeof(libtrace_result_t));
15        }
16        return 0;
17}
18
19static void publish(libtrace_t *trace, int t_id, libtrace_combine_t *c, libtrace_result_t *res) {
20        libtrace_queue_t *queue = &((libtrace_queue_t*)c->queues)[t_id];
21        libtrace_deque_push_back(queue, res); // Automatically locking for us :)
22
23        if (libtrace_deque_get_size(queue) >= trace->config.reporter_thold) {
24                trace_post_reporter(trace);
25        }
26}
27
28static void read(libtrace_t *trace, libtrace_combine_t *c){
29        libtrace_queue_t *queues = c->queues;
30        int i;
31
32        /* Loop through and read all that are here */
33        for (i = 0; i < libtrace_get_perpkt_count(trace); ++i) {
34                libtrace_queue_t *v = &queues[i];
35                while (libtrace_deque_get_size(v) != 0) {
36                        libtrace_result_t r;
37                        libtrace_generic_t gt = {.res = &r};
38                        ASSERT_RET (libtrace_deque_pop_front(v, (void *) &r), == 1);
39                        /* Ignore any ticks that we've already seen */
40                        if (r.type == RESULT_TICK_INTERVAL) {
41                                if (r.key <= c->last_ts_tick)
42                                        continue;
43                                c->last_ts_tick = r.key;
44                        }
45
46                        if (r.type == RESULT_TICK_COUNT) {
47                                if (r.key <= c->last_count_tick)
48                                        continue;
49                                c->last_count_tick = r.key;
50                        }
51                        send_message(trace, &trace->reporter_thread,
52                                MESSAGE_RESULT, gt, NULL);
53                }
54        }
55}
56
57static void destroy(libtrace_t *trace, libtrace_combine_t *c) {
58        int i;
59        libtrace_queue_t *queues = c->queues;
60
61        for (i = 0; i < libtrace_get_perpkt_count(trace); i++) {
62                assert(libtrace_deque_get_size(&queues[i]) == 0);
63        }
64        free(queues);
65        queues = NULL;
66}
67
68DLLEXPORT const libtrace_combine_t combiner_unordered = {
69    init_combiner,      /* initialise */
70        destroy,                /* destroy */
71        publish,                /* publish */
72    read,                       /* read */
73    read,                       /* read_final */
74    read,                       /* pause */
75    NULL,                       /* queues */
76    0,                          /* last_count_tick */
77    0,                          /* last_ts_tick */
78    {0}                         /* opts */
79};
Note: See TracBrowser for help on using the repository browser.