source: lib/combiner_sorted.c @ 2498008

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 2498008 was 2498008, checked in by Richard Sanger <rsangerarj@…>, 7 years ago

Refactor the combining step to allow user defined functions here.

Remove the old trace_get_results, now instead simply provide a reporter function which gets called as soon as results are ready.
The combiner function used determines the order of these results and when they are released etc.
The combiner function can be selected from those built-in or a custom version can be defined results are provided when ready.
Quickly hacked the parallel tests to work with this update, these are still a bit messy.

Also some fixes some compile warnings.

  • Property mode set to 100644
File size: 2.9 KB
Line 
1#include "libtrace.h"
2#include "libtrace_int.h"
3#include "data-struct/vector.h"
4#include <assert.h>
5#include <stdlib.h>
6
7static int init_combiner(libtrace_t *t, libtrace_combine_t *c) {
8        int i = 0;
9        assert(libtrace_get_perpkt_count(t) > 0);
10        libtrace_vector_t *queues;
11        c->queues = calloc(sizeof(libtrace_vector_t), libtrace_get_perpkt_count(t));
12        queues = c->queues;
13        for (i = 0; i < libtrace_get_perpkt_count(t); ++i) {
14                libtrace_vector_init(&queues[i], sizeof(libtrace_result_t));
15        }
16        return 0;
17}
18
19static void publish(libtrace_t *trace UNUSED, int t_id, libtrace_combine_t *c, libtrace_result_t *res) {
20        libtrace_vector_t *vec = &((libtrace_vector_t*)c->queues)[t_id];
21        libtrace_vector_push_back(vec, res);
22}
23
24static void read(libtrace_t *trace UNUSED, libtrace_combine_t *c UNUSED){
25        return;
26}
27
28static int compare_result(const void* p1, const void* p2)
29{
30        if (libtrace_result_get_key((libtrace_result_t *) p1) < libtrace_result_get_key((libtrace_result_t *) p2))
31                return -1;
32        if (libtrace_result_get_key((libtrace_result_t *) p1) == libtrace_result_get_key((libtrace_result_t *) p2))
33                return 0;
34        else
35                return 1;
36}
37
38
39/** Used below in trace_make_results_packets_safe*/
40static void do_copy_result_packet(void *data)
41{
42        libtrace_result_t *res = (libtrace_result_t *)data;
43        if (res->type == RESULT_PACKET) {
44                // Duplicate the packet in standard malloc'd memory and free the
45                // original, This is a 1:1 exchange so is ocache count remains unchanged.
46                libtrace_packet_t *oldpkt, *dup;
47                oldpkt = (libtrace_packet_t *) res->value;
48                dup = trace_copy_packet(oldpkt);
49                res->value = (void *)dup;
50                trace_destroy_packet(oldpkt);
51        }
52}
53
54static void pause(libtrace_t *trace, libtrace_combine_t *c) {
55        libtrace_vector_t *queues = c->queues;
56        int i;
57        for (i = 0; i < libtrace_get_perpkt_count(trace); ++i) {
58                libtrace_vector_apply_function(&queues[i], &do_copy_result_packet);
59        }
60}
61
62static void read_final(libtrace_t *trace, libtrace_combine_t *c) {
63        libtrace_vector_t *queues = c->queues;
64        int i;
65        size_t a;
66        // Combine all results into queue 1
67        for (i = 1; i < libtrace_get_perpkt_count(trace); ++i)
68        {
69                libtrace_vector_append(&queues[0],&queues[i]);
70        }
71        // Sort them
72        libtrace_vector_qsort(&queues[0], compare_result);
73
74        for (a = 0; a < libtrace_vector_get_size(&queues[0]); ++a) {
75                libtrace_result_t r;
76                ASSERT_RET (libtrace_vector_get(&queues[0], a,  (void *) &r), == 1);
77                trace->reporter(trace, &r, NULL);
78        }
79        libtrace_vector_empty(&queues[0]);
80}
81
82static void destroy(libtrace_t *trace, libtrace_combine_t *c) {
83        int i;
84        libtrace_vector_t *queues = c->queues;
85
86        for (i = 0; i < libtrace_get_perpkt_count(trace); i++) {
87                assert(libtrace_vector_get_size(&queues[i]) == 0);
88                libtrace_vector_destroy(&queues[i]);
89        }
90        free(queues);
91        queues = NULL;
92}
93
94const libtrace_combine_t combiner_sorted = {
95    init_combiner,      /* initialise */
96        destroy,                /* destroy */
97        publish,                /* publish */
98    read,                       /* read */
99    read_final,                 /* read_final */
100    pause,                      /* pause */
101    NULL,                       /* queues */
102    0                           /* opts */
103};
Note: See TracBrowser for help on using the repository browser.