source: lib/combiner_unordered.c @ 21c0d70

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 21c0d70 was a31e166, checked in by Shane Alcock <salcock@…>, 6 years ago

Rename libtrace_get_perpkt_count to trace_get_perpkt_threads

This is to be consistent with trace_set_perpkt_threads.

Remove 'packet' parameter from the first_packet callback.
trace_get_first_packet should be used to get the first packet instead.

  • Property mode set to 100644
File size: 2.6 KB
Line 
1#include "libtrace.h"
2#include "libtrace_int.h"
3#include "data-struct/deque.h"
4#include <assert.h>
5#include <stdlib.h>
6
7static int init_combiner(libtrace_t *t, libtrace_combine_t *c) {
8        int i = 0;
9        assert(trace_get_perpkt_threads(t) > 0);
10        libtrace_queue_t *queues;
11        c->queues = calloc(sizeof(libtrace_queue_t), trace_get_perpkt_threads(t));
12        queues = c->queues;
13        for (i = 0; i < trace_get_perpkt_threads(t); ++i) {
14                libtrace_deque_init(&queues[i], sizeof(libtrace_result_t));
15        }
16        return 0;
17}
18
19static void publish(libtrace_t *trace, int t_id, libtrace_combine_t *c, libtrace_result_t *res) {
20        libtrace_queue_t *queue = &((libtrace_queue_t*)c->queues)[t_id];
21        libtrace_deque_push_back(queue, res); // Automatically locking for us :)
22
23        if (libtrace_deque_get_size(queue) >= trace->config.reporter_thold) {
24                trace_post_reporter(trace);
25        }
26}
27
28static void read(libtrace_t *trace, libtrace_combine_t *c){
29        libtrace_queue_t *queues = c->queues;
30        int i;
31
32        /* Loop through and read all that are here */
33        for (i = 0; i < trace_get_perpkt_threads(trace); ++i) {
34                libtrace_queue_t *v = &queues[i];
35                while (libtrace_deque_get_size(v) != 0) {
36                        libtrace_result_t r;
37                        libtrace_generic_t gt = {.res = &r};
38                        ASSERT_RET (libtrace_deque_pop_front(v, (void *) &r), == 1);
39                        /* Ignore any ticks that we've already seen */
40                        if (r.type == RESULT_TICK_INTERVAL) {
41                                if (r.key <= c->last_ts_tick)
42                                        continue;
43                                c->last_ts_tick = r.key;
44                        }
45
46                        if (r.type == RESULT_TICK_COUNT) {
47                                if (r.key <= c->last_count_tick)
48                                        continue;
49                                c->last_count_tick = r.key;
50                        }
51                        send_message(trace, &trace->reporter_thread,
52                                MESSAGE_RESULT, gt, NULL);
53                }
54        }
55}
56
57static void destroy(libtrace_t *trace, libtrace_combine_t *c) {
58        int i;
59        libtrace_queue_t *queues = c->queues;
60
61        for (i = 0; i < trace_get_perpkt_threads(trace); i++) {
62                assert(libtrace_deque_get_size(&queues[i]) == 0);
63        }
64        free(queues);
65        queues = NULL;
66}
67
68DLLEXPORT const libtrace_combine_t combiner_unordered = {
69    init_combiner,      /* initialise */
70        destroy,                /* destroy */
71        publish,                /* publish */
72    read,                       /* read */
73    read,                       /* read_final */
74    read,                       /* pause */
75    NULL,                       /* queues */
76    0,                          /* last_count_tick */
77    0,                          /* last_ts_tick */
78    {0}                         /* opts */
79};
Note: See TracBrowser for help on using the repository browser.