source: examples/parallel/timedemo.c @ 0f5d4de

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 0f5d4de was ccb89e4, checked in by Shane Alcock <salcock@…>, 6 years ago

Update trivial and timedemo examples to use new parallel API

Also fixed a few documentation errors (spelling, grammar etc).

  • Property mode set to 100644
File size: 10.3 KB
Line 
1/* A parallel libtrace program that prints a count of packets observed
2 * every 10 seconds.
3 *
4 * Using this approach allows results to be reported promptly for live
5 * formats, even if data is not arriving on a given thread. This method also
6 * works perfectly fine when run against a trace file.
7 *
8 * Designed to demonstrate the correct usage of TICK_INTERVAL. TICK_COUNT can
9 * be used instead, which will trigger the result reporting based on seeing
10 * a fixed number of packets.
11 *
12 * This example is based upon examples/tutorial/timedemo.c
13 */
14#include "libtrace_parallel.h"
15#include <stdio.h>
16#include <inttypes.h>
17#include <assert.h>
18#include <getopt.h>
19#include <stdlib.h>
20
21#define SECONDS_TO_ERF(sec) (((uint64_t)sec)<<32)
22#define ERF_TO_SECONDS(erf) (((uint64_t)erf)>>32)
23#define USEC_TO_ERF(usec) ((uint64_t)usec * 0xFFFFFFFFull)
24#define TV_TO_ERF(tv) ((((uint64_t)(tv).tv_sec) << 32) + ((((uint64_t)(tv).tv_usec)<< 32)/1000000))
25
26struct localdata {
27        uint64_t nextreport;
28        uint64_t count;
29};
30
31/* Due to the amount of error checking required in our main function, it
32 * is a lot simpler and tidier to place all the calls to various libtrace
33 * destroy functions into a separate function.
34 */
35static void libtrace_cleanup(libtrace_t *trace,
36                libtrace_callback_set_t *processing,
37                libtrace_callback_set_t *reporter) {
38
39        /* It's very important to ensure that we aren't trying to destroy
40         * a NULL structure, so each of the destroy calls will only occur
41         * if the structure exists */
42        if (trace)
43                trace_destroy(trace);
44
45        if (processing)
46                trace_destroy_callback_set(processing);
47
48        if (reporter)
49                trace_destroy_callback_set(reporter);
50
51}
52
53/* Creates a localdata structure for a processing thread */
54static void *init_local(libtrace_t *trace UNUSED, libtrace_thread_t *t UNUSED,
55                void *global UNUSED) {
56
57        struct localdata *local = (struct localdata *)malloc(sizeof(struct
58                        localdata));
59        local->nextreport = 0;
60        local->count = 0;
61
62        return local;
63
64}
65
66/* Frees the localdata associated with a processing thread */
67static void fin_local(libtrace_t *trace UNUSED, libtrace_thread_t *t UNUSED,
68                void *global UNUSED, void *tls) {
69
70        free(tls);
71}
72
73static libtrace_packet_t *per_packet(libtrace_t *trace, libtrace_thread_t *t,
74                void *global UNUSED, void *tls, libtrace_packet_t *packet) {
75
76        uint64_t ts;
77        /* Cast our thread local storage to the right type */
78        struct localdata *local = (struct localdata *)tls;
79
80        /* Get the timestamp for the current packet */
81        ts = trace_get_erf_timestamp(packet);
82
83        /* Check whether we need to report a packet count or not.
84         *
85         * If the timestamp for the current packet is beyond the time when the
86         * next report was due then we have to output our current count and
87         * reset it to zero.
88         *
89         * Note that I use a while loop here to ensure that we correctly deal
90         * with periods in which no packets are observed. This can still
91         * happen because TICK_INTERVAL is not used for realtime playback
92         * such as a file.
93         */
94        while (local->nextreport && ts > local->nextreport) {
95                libtrace_generic_t c;
96                c.uint64 = local->count;
97                /* Report the result for the current time interval.
98                 * Each thread will report once for each given time
99                 * interval */
100                trace_publish_result(trace, t, local->nextreport, c,
101                                RESULT_USER);
102
103                /* Reset the counter */
104                local->count = 0;
105                /* Determine when the next report is due */
106                local->nextreport += SECONDS_TO_ERF(10);
107        }
108
109        /* No matter what else happens during this function call, we still
110         * need to increment our counter */
111        local->count += 1;
112
113        /* We have finished processing this packet so return it */
114        return packet;
115
116}
117
118/* As soon as any thread has seen a packet, we need to initialise the
119 * next reporting time for each of our processing threads */
120static void first_packet(libtrace_t *trace, libtrace_thread_t *t UNUSED,
121                void *global UNUSED, void *tls,
122                libtrace_thread_t *sender UNUSED) {
123
124        /* Cast our thread local storage to the right type */
125        struct localdata *local = (struct localdata *)tls;
126
127        if (local->nextreport == 0) {
128                uint64_t first_ts;
129                /* Get the timestamp of the first packet across all threads */
130                const libtrace_packet_t * tmp = NULL;
131                const struct timeval *tv;
132
133                /* Get the first packet across all threads */
134                if (trace_get_first_packet(trace, NULL, &tmp, &tv) == 1) {
135                        first_ts = trace_get_erf_timestamp(tmp);
136                        /* We know our first reporting time now */
137                        local->nextreport = first_ts + SECONDS_TO_ERF(10);
138                }
139        }
140}
141
142static void process_tick(libtrace_t *trace, libtrace_thread_t *t,
143                void *global UNUSED, void *tls, uint64_t tick) {
144
145        struct localdata *local = (struct localdata *)tls;
146
147        while (local->nextreport && tick > local->nextreport) {
148                libtrace_generic_t c;
149                c.uint64 = local->count;
150                /* If the tick is past the time that our next report is
151                 * due, flush our current counter to the reporting
152                 * thread. This ensures that we keep sending results even
153                 * if this thread receives no new packets
154                 */
155                trace_publish_result(trace, t, local->nextreport, c,
156                        RESULT_USER);
157
158                /* Reset the counter */
159                local->count = 0;
160                /* Determine when the next report is due */
161                local->nextreport += SECONDS_TO_ERF(10);
162        }
163
164}
165
166static inline void dump_results(struct localdata *local, uint64_t key) {
167
168        /* Using a while loop here, so that we can correctly handle any
169         * 10 second intervals where no packets were counted.
170         */
171        while (key >= local->nextreport) {
172                printf("%u \t%" PRIu64 "\n",
173                                (int) ERF_TO_SECONDS(local->nextreport),
174                                local->count);
175                local->count = 0;
176                local->nextreport += SECONDS_TO_ERF(10);
177        }
178}
179
180/* Process results sent to the reporter thread */
181static void report_results(libtrace_t *trace,
182                libtrace_thread_t *sender UNUSED,
183                void *global UNUSED, void *tls, libtrace_result_t *result) {
184
185        static __thread int reported = 0;
186        struct localdata *local = (struct localdata *)tls;
187
188
189        /* Set the initial reporting time and print the heading
190         * Note: we could do these in starting and first_packet callbacks
191         * but there is only one reporting thread so we can get away
192         * with this. */
193        if (local->nextreport == 0) {
194                printf("Time\t\tPackets\n");
195                local->nextreport = result->key;
196        }
197        assert(result->key == local->nextreport);
198
199        reported ++;
200        if (reported == trace_get_perpkt_threads(trace)) {
201                dump_results(local, result->key);
202                reported = 0;
203        }
204
205        local->count += result->value.uint64;
206
207}
208
209/* Dump the final value for the counter and free up our local data struct */
210static void end_reporter(libtrace_t *trace UNUSED, libtrace_thread_t *t UNUSED,
211                void *global UNUSED, void *tls) {
212
213        struct localdata *local = (struct localdata *)tls;
214
215        /* If we have any counted packets that haven't been reported, do
216         * so now.
217         */
218        if (local->count > 0)
219                dump_results(local, local->nextreport + 1);
220
221        free(local);
222}
223
224int main(int argc, char *argv[])
225{
226        libtrace_t *trace = NULL;
227        libtrace_callback_set_t *processing = NULL;
228        libtrace_callback_set_t *reporter = NULL;
229
230        /* Ensure we have at least one argument after the program name */
231        if (argc < 2) {
232                fprintf(stderr, "Usage: %s inputURI\n", argv[0]);
233                return 1;
234        }
235
236        trace = trace_create(argv[1]);
237
238        if (trace_is_err(trace)) {
239                trace_perror(trace,"Opening trace file");
240                libtrace_cleanup(trace, processing, reporter);
241                return 1;
242        }
243
244        /* Send every result to the reporter immediately, i.e. do not buffer
245         * them. */
246        trace_set_reporter_thold(trace, 1);
247
248        /* Sends a tick message once per second */
249        trace_set_tick_interval(trace, 1000);
250
251        /* The combiner sits between trace_publish_result() and the reporter
252         * function and determines how the results show be ordered and combined.
253         *
254         * Our results are ordered by timestamp and we want them to be returned
255         * in order so we use combiner_ordered.
256         *
257         * This is typically the most useful combiner to use.
258         */
259        trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_t){0});
260
261        /* Limit to 4 processing threads */
262        trace_set_perpkt_threads(trace, 4);
263
264        /* Set up our processing callbacks */
265        processing = trace_create_callback_set();
266        trace_set_starting_cb(processing, init_local);
267        trace_set_first_packet_cb(processing, first_packet);
268        trace_set_stopping_cb(processing, fin_local);
269        trace_set_packet_cb(processing, per_packet);
270        trace_set_tick_interval_cb(processing, process_tick);
271
272        /* Set up our reporting callbacks -- note that we re-use the init_local
273         * callback */
274        reporter = trace_create_callback_set();
275        trace_set_starting_cb(reporter, init_local);
276        trace_set_result_cb(reporter, report_results);
277        trace_set_stopping_cb(reporter, end_reporter);
278
279        /* Start everything going -- no global data required so set that
280         * to NULL */
281        if (trace_pstart(trace, NULL, processing, reporter) == -1) {
282                trace_perror(trace,"Starting trace");
283                libtrace_cleanup(trace, processing, reporter);
284                return 1;
285        }
286
287        /* Wait for completion */
288        trace_join(trace);
289        if (trace_is_err(trace)) {
290                trace_perror(trace,"Reading packets");
291                libtrace_cleanup(trace, processing, reporter);
292                return 1;
293        }
294
295        libtrace_cleanup(trace, processing, reporter);
296        return 0;
297}
Note: See TracBrowser for help on using the repository browser.