1 | /* A parallel libtrace program that prints a count of packets observed |
---|
2 | * every 10 seconds. |
---|
3 | * |
---|
4 | * Using this approach allows results to be reported promptly for live |
---|
5 | * formats, even if data is not arriving on a given thread. This method also |
---|
6 | * works perfectly fine when run against a trace file. |
---|
7 | * |
---|
8 | * Designed to demonstrate the correct usage of TICK_INTERVAL. TICK_COUNT can |
---|
9 | * be used instead, which will trigger the result reporting based on seeing |
---|
10 | * a fixed number of packets. |
---|
11 | * |
---|
12 | * This example is based upon examples/tutorial/timedemo.c |
---|
13 | */ |
---|
14 | #include "libtrace_parallel.h" |
---|
15 | #include <stdio.h> |
---|
16 | #include <inttypes.h> |
---|
17 | #include <assert.h> |
---|
18 | #include <getopt.h> |
---|
19 | #include <stdlib.h> |
---|
20 | |
---|
21 | #define SECONDS_TO_ERF(sec) (((uint64_t)sec)<<32) |
---|
22 | #define ERF_TO_SECONDS(erf) (((uint64_t)erf)>>32) |
---|
23 | #define USEC_TO_ERF(usec) ((uint64_t)usec * 0xFFFFFFFFull) |
---|
24 | #define TV_TO_ERF(tv) ((((uint64_t)(tv).tv_sec) << 32) + ((((uint64_t)(tv).tv_usec)<< 32)/1000000)) |
---|
25 | |
---|
26 | struct localdata { |
---|
27 | uint64_t nextreport; |
---|
28 | uint64_t count; |
---|
29 | }; |
---|
30 | |
---|
31 | /* Due to the amount of error checking required in our main function, it |
---|
32 | * is a lot simpler and tidier to place all the calls to various libtrace |
---|
33 | * destroy functions into a separate function. |
---|
34 | */ |
---|
35 | static void libtrace_cleanup(libtrace_t *trace, |
---|
36 | libtrace_callback_set_t *processing, |
---|
37 | libtrace_callback_set_t *reporter) { |
---|
38 | |
---|
39 | /* It's very important to ensure that we aren't trying to destroy |
---|
40 | * a NULL structure, so each of the destroy calls will only occur |
---|
41 | * if the structure exists */ |
---|
42 | if (trace) |
---|
43 | trace_destroy(trace); |
---|
44 | |
---|
45 | if (processing) |
---|
46 | trace_destroy_callback_set(processing); |
---|
47 | |
---|
48 | if (reporter) |
---|
49 | trace_destroy_callback_set(reporter); |
---|
50 | |
---|
51 | } |
---|
52 | |
---|
53 | /* Creates a localdata structure for a processing thread */ |
---|
54 | static void *init_local(libtrace_t *trace UNUSED, libtrace_thread_t *t UNUSED, |
---|
55 | void *global UNUSED) { |
---|
56 | |
---|
57 | struct localdata *local = (struct localdata *)malloc(sizeof(struct |
---|
58 | localdata)); |
---|
59 | local->nextreport = 0; |
---|
60 | local->count = 0; |
---|
61 | |
---|
62 | return local; |
---|
63 | |
---|
64 | } |
---|
65 | |
---|
66 | /* Frees the localdata associated with a processing thread */ |
---|
67 | static void fin_local(libtrace_t *trace UNUSED, libtrace_thread_t *t UNUSED, |
---|
68 | void *global UNUSED, void *tls) { |
---|
69 | |
---|
70 | free(tls); |
---|
71 | } |
---|
72 | |
---|
73 | static libtrace_packet_t *per_packet(libtrace_t *trace, libtrace_thread_t *t, |
---|
74 | void *global UNUSED, void *tls, libtrace_packet_t *packet) { |
---|
75 | |
---|
76 | uint64_t ts; |
---|
77 | /* Cast our thread local storage to the right type */ |
---|
78 | struct localdata *local = (struct localdata *)tls; |
---|
79 | |
---|
80 | /* Get the timestamp for the current packet */ |
---|
81 | ts = trace_get_erf_timestamp(packet); |
---|
82 | |
---|
83 | /* Check whether we need to report a packet count or not. |
---|
84 | * |
---|
85 | * If the timestamp for the current packet is beyond the time when the |
---|
86 | * next report was due then we have to output our current count and |
---|
87 | * reset it to zero. |
---|
88 | * |
---|
89 | * Note that I use a while loop here to ensure that we correctly deal |
---|
90 | * with periods in which no packets are observed. This can still |
---|
91 | * happen because TICK_INTERVAL is not used for realtime playback |
---|
92 | * such as a file. |
---|
93 | */ |
---|
94 | while (local->nextreport && ts > local->nextreport) { |
---|
95 | libtrace_generic_t c; |
---|
96 | c.uint64 = local->count; |
---|
97 | /* Report the result for the current time interval. |
---|
98 | * Each thread will report once for each given time |
---|
99 | * interval */ |
---|
100 | trace_publish_result(trace, t, local->nextreport, c, |
---|
101 | RESULT_USER); |
---|
102 | |
---|
103 | /* Reset the counter */ |
---|
104 | local->count = 0; |
---|
105 | /* Determine when the next report is due */ |
---|
106 | local->nextreport += SECONDS_TO_ERF(10); |
---|
107 | } |
---|
108 | |
---|
109 | /* No matter what else happens during this function call, we still |
---|
110 | * need to increment our counter */ |
---|
111 | local->count += 1; |
---|
112 | |
---|
113 | /* We have finished processing this packet so return it */ |
---|
114 | return packet; |
---|
115 | |
---|
116 | } |
---|
117 | |
---|
118 | /* As soon as any thread has seen a packet, we need to initialise the |
---|
119 | * next reporting time for each of our processing threads */ |
---|
120 | static void first_packet(libtrace_t *trace, libtrace_thread_t *t UNUSED, |
---|
121 | void *global UNUSED, void *tls, |
---|
122 | libtrace_thread_t *sender UNUSED) { |
---|
123 | |
---|
124 | /* Cast our thread local storage to the right type */ |
---|
125 | struct localdata *local = (struct localdata *)tls; |
---|
126 | |
---|
127 | if (local->nextreport == 0) { |
---|
128 | uint64_t first_ts; |
---|
129 | /* Get the timestamp of the first packet across all threads */ |
---|
130 | const libtrace_packet_t * tmp = NULL; |
---|
131 | const struct timeval *tv; |
---|
132 | |
---|
133 | /* Get the first packet across all threads */ |
---|
134 | if (trace_get_first_packet(trace, NULL, &tmp, &tv) == 1) { |
---|
135 | first_ts = trace_get_erf_timestamp(tmp); |
---|
136 | /* We know our first reporting time now */ |
---|
137 | local->nextreport = first_ts + SECONDS_TO_ERF(10); |
---|
138 | } |
---|
139 | } |
---|
140 | } |
---|
141 | |
---|
142 | static void process_tick(libtrace_t *trace, libtrace_thread_t *t, |
---|
143 | void *global UNUSED, void *tls, uint64_t tick) { |
---|
144 | |
---|
145 | struct localdata *local = (struct localdata *)tls; |
---|
146 | |
---|
147 | while (local->nextreport && tick > local->nextreport) { |
---|
148 | libtrace_generic_t c; |
---|
149 | c.uint64 = local->count; |
---|
150 | /* If the tick is past the time that our next report is |
---|
151 | * due, flush our current counter to the reporting |
---|
152 | * thread. This ensures that we keep sending results even |
---|
153 | * if this thread receives no new packets |
---|
154 | */ |
---|
155 | trace_publish_result(trace, t, local->nextreport, c, |
---|
156 | RESULT_USER); |
---|
157 | |
---|
158 | /* Reset the counter */ |
---|
159 | local->count = 0; |
---|
160 | /* Determine when the next report is due */ |
---|
161 | local->nextreport += SECONDS_TO_ERF(10); |
---|
162 | } |
---|
163 | |
---|
164 | } |
---|
165 | |
---|
166 | static inline void dump_results(struct localdata *local, uint64_t key) { |
---|
167 | |
---|
168 | /* Using a while loop here, so that we can correctly handle any |
---|
169 | * 10 second intervals where no packets were counted. |
---|
170 | */ |
---|
171 | while (key >= local->nextreport) { |
---|
172 | printf("%u \t%" PRIu64 "\n", |
---|
173 | (int) ERF_TO_SECONDS(local->nextreport), |
---|
174 | local->count); |
---|
175 | local->count = 0; |
---|
176 | local->nextreport += SECONDS_TO_ERF(10); |
---|
177 | } |
---|
178 | } |
---|
179 | |
---|
180 | /* Process results sent to the reporter thread */ |
---|
181 | static void report_results(libtrace_t *trace, |
---|
182 | libtrace_thread_t *sender UNUSED, |
---|
183 | void *global UNUSED, void *tls, libtrace_result_t *result) { |
---|
184 | |
---|
185 | static __thread int reported = 0; |
---|
186 | struct localdata *local = (struct localdata *)tls; |
---|
187 | |
---|
188 | |
---|
189 | /* Set the initial reporting time and print the heading |
---|
190 | * Note: we could do these in starting and first_packet callbacks |
---|
191 | * but there is only one reporting thread so we can get away |
---|
192 | * with this. */ |
---|
193 | if (local->nextreport == 0) { |
---|
194 | printf("Time\t\tPackets\n"); |
---|
195 | local->nextreport = result->key; |
---|
196 | } |
---|
197 | assert(result->key == local->nextreport); |
---|
198 | |
---|
199 | reported ++; |
---|
200 | if (reported == trace_get_perpkt_threads(trace)) { |
---|
201 | dump_results(local, result->key); |
---|
202 | reported = 0; |
---|
203 | } |
---|
204 | |
---|
205 | local->count += result->value.uint64; |
---|
206 | |
---|
207 | } |
---|
208 | |
---|
209 | /* Dump the final value for the counter and free up our local data struct */ |
---|
210 | static void end_reporter(libtrace_t *trace UNUSED, libtrace_thread_t *t UNUSED, |
---|
211 | void *global UNUSED, void *tls) { |
---|
212 | |
---|
213 | struct localdata *local = (struct localdata *)tls; |
---|
214 | |
---|
215 | /* If we have any counted packets that haven't been reported, do |
---|
216 | * so now. |
---|
217 | */ |
---|
218 | if (local->count > 0) |
---|
219 | dump_results(local, local->nextreport + 1); |
---|
220 | |
---|
221 | free(local); |
---|
222 | } |
---|
223 | |
---|
224 | int main(int argc, char *argv[]) |
---|
225 | { |
---|
226 | libtrace_t *trace = NULL; |
---|
227 | libtrace_callback_set_t *processing = NULL; |
---|
228 | libtrace_callback_set_t *reporter = NULL; |
---|
229 | |
---|
230 | /* Ensure we have at least one argument after the program name */ |
---|
231 | if (argc < 2) { |
---|
232 | fprintf(stderr, "Usage: %s inputURI\n", argv[0]); |
---|
233 | return 1; |
---|
234 | } |
---|
235 | |
---|
236 | trace = trace_create(argv[1]); |
---|
237 | |
---|
238 | if (trace_is_err(trace)) { |
---|
239 | trace_perror(trace,"Opening trace file"); |
---|
240 | libtrace_cleanup(trace, processing, reporter); |
---|
241 | return 1; |
---|
242 | } |
---|
243 | |
---|
244 | /* Send every result to the reporter immediately, i.e. do not buffer |
---|
245 | * them. */ |
---|
246 | trace_set_reporter_thold(trace, 1); |
---|
247 | |
---|
248 | /* Sends a tick message once per second */ |
---|
249 | trace_set_tick_interval(trace, 1000); |
---|
250 | |
---|
251 | /* The combiner sits between trace_publish_result() and the reporter |
---|
252 | * function and determines how the results show be ordered and combined. |
---|
253 | * |
---|
254 | * Our results are ordered by timestamp and we want them to be returned |
---|
255 | * in order so we use combiner_ordered. |
---|
256 | * |
---|
257 | * This is typically the most useful combiner to use. |
---|
258 | */ |
---|
259 | trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_t){0}); |
---|
260 | |
---|
261 | /* Limit to 4 processing threads */ |
---|
262 | trace_set_perpkt_threads(trace, 4); |
---|
263 | |
---|
264 | /* Set up our processing callbacks */ |
---|
265 | processing = trace_create_callback_set(); |
---|
266 | trace_set_starting_cb(processing, init_local); |
---|
267 | trace_set_first_packet_cb(processing, first_packet); |
---|
268 | trace_set_stopping_cb(processing, fin_local); |
---|
269 | trace_set_packet_cb(processing, per_packet); |
---|
270 | trace_set_tick_interval_cb(processing, process_tick); |
---|
271 | |
---|
272 | /* Set up our reporting callbacks -- note that we re-use the init_local |
---|
273 | * callback */ |
---|
274 | reporter = trace_create_callback_set(); |
---|
275 | trace_set_starting_cb(reporter, init_local); |
---|
276 | trace_set_result_cb(reporter, report_results); |
---|
277 | trace_set_stopping_cb(reporter, end_reporter); |
---|
278 | |
---|
279 | /* Start everything going -- no global data required so set that |
---|
280 | * to NULL */ |
---|
281 | if (trace_pstart(trace, NULL, processing, reporter) == -1) { |
---|
282 | trace_perror(trace,"Starting trace"); |
---|
283 | libtrace_cleanup(trace, processing, reporter); |
---|
284 | return 1; |
---|
285 | } |
---|
286 | |
---|
287 | /* Wait for completion */ |
---|
288 | trace_join(trace); |
---|
289 | if (trace_is_err(trace)) { |
---|
290 | trace_perror(trace,"Reading packets"); |
---|
291 | libtrace_cleanup(trace, processing, reporter); |
---|
292 | return 1; |
---|
293 | } |
---|
294 | |
---|
295 | libtrace_cleanup(trace, processing, reporter); |
---|
296 | return 0; |
---|
297 | } |
---|