source: lib/combiner_unordered.c @ 2498008

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 2498008 was 2498008, checked in by Richard Sanger <rsangerarj@…>, 7 years ago

Refactor the combining step to allow user defined functions here.

Remove the old trace_get_results, now instead simply provide a reporter function which gets called as soon as results are ready.
The combiner function used determines the order of these results and when they are released etc.
The combiner function can be selected from those built-in or a custom version can be defined results are provided when ready.
Quickly hacked the parallel tests to work with this update, these are still a bit messy.

Also some fixes some compile warnings.

  • Property mode set to 100644
File size: 1.7 KB
Line 
1#include "libtrace.h"
2#include "libtrace_int.h"
3#include "data-struct/deque.h"
4#include <assert.h>
5#include <stdlib.h>
6
7static int init_combiner(libtrace_t *t, libtrace_combine_t *c) {
8        int i = 0;
9        assert(libtrace_get_perpkt_count(t) > 0);
10        libtrace_queue_t *queues;
11        c->queues = calloc(sizeof(libtrace_queue_t), libtrace_get_perpkt_count(t));
12        queues = c->queues;
13        for (i = 0; i < libtrace_get_perpkt_count(t); ++i) {
14                libtrace_deque_init(&queues[i], sizeof(libtrace_result_t));
15        }
16        return 0;
17}
18
19static void publish(libtrace_t *trace, int t_id, libtrace_combine_t *c, libtrace_result_t *res) {
20        libtrace_queue_t *queue = &((libtrace_queue_t*)c->queues)[t_id];
21        if (libtrace_deque_get_size(queue) >= trace->config.reporter_thold) {
22                trace_post_reporter(trace);
23        }
24        libtrace_deque_push_back(queue, res); // Automatically locking for us :)
25}
26
27static void read(libtrace_t *trace, libtrace_combine_t *c){
28        libtrace_queue_t *queues = c->queues;
29        int i;
30
31        /* Loop through and read all that are here */
32        for (i = 0; i < libtrace_get_perpkt_count(trace); ++i) {
33                libtrace_queue_t *v = &queues[i];
34                while (libtrace_deque_get_size(v) != 0) {
35                        libtrace_result_t r;
36                        ASSERT_RET (libtrace_deque_pop_front(v, (void *) &r), == 1);
37                        trace->reporter(trace, &r, NULL);
38                }
39        }
40}
41
42static void destroy(libtrace_t *trace, libtrace_combine_t *c) {
43        int i;
44        libtrace_queue_t *queues = c->queues;
45
46        for (i = 0; i < libtrace_get_perpkt_count(trace); i++) {
47                assert(libtrace_deque_get_size(&queues[i]) == 0);
48        }
49        free(queues);
50        queues = NULL;
51}
52
53const libtrace_combine_t combiner_unordered = {
54    init_combiner,      /* initialise */
55        destroy,                /* destroy */
56        publish,                /* publish */
57    read,                       /* read */
58    read,                       /* read_final */
59    read,                       /* pause */
60    NULL,                       /* queues */
61    0                           /* opts */
62};
Note: See TracBrowser for help on using the repository browser.