source: examples/parallel/timedemo.c @ 16cb2a2

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 16cb2a2 was 16cb2a2, checked in by Richard Sanger <rsangerarj@…>, 7 years ago

Add some example programs.

trivial_skeleton.c
timedemo.c - Shows how to deal with tracetime results (say printing statistics every X seconds) using TICK_INTERVAL

  • Property mode set to 100644
File size: 7.4 KB
Line 
1/* A parallel libtrace program that prints a count of packets observered
2 * after a 10 seconds of the trace running.
3 *
4 * Using this approach allows results to be reported quickly for trracetime
5 * formats, even if data is not arriving on a given thread. While maintaining
6 * a consistant output when run on a file etc.
7 *
8 * Designed to demonstrate the correct usage of TICK_INTERVAL. Also note
9 * TICK_COUNT is not needed for this example.
10 *
11 * This example is based upon examples/tutorial/timedemo.c
12 */
13#include "libtrace_parallel.h"
14#include "combiners.h"
15#include <stdio.h>
16#include <inttypes.h>
17#include <assert.h>
18#include <getopt.h>
19
20#define SECONDS_TO_ERF(sec) (((uint64_t)sec)<<32)
21#define ERF_TO_SECONDS(erf) (((uint64_t)erf)>>32)
22#define USEC_TO_ERF(usec) ((uint64_t)usec * 0xFFFFFFFFull)
23#define TV_TO_ERF(tv) ((((uint64_t)(tv).tv_sec) << 32) + ((((uint64_t)(tv).tv_usec)<< 32)/1000000))
24
25/* Due to the amount of error checking required in our main function, it
26 * is a lot simpler and tidier to place all the calls to various libtrace
27 * destroy functions into a separate function.
28 */
29static void libtrace_cleanup(libtrace_t *trace) {
30
31        /* It's very important to ensure that we aren't trying to destroy
32         * a NULL structure, so each of the destroy calls will only occur
33         * if the structure exists */
34        if (trace)
35                trace_destroy(trace);
36
37}
38
39/* Every time a packet becomes ready this function will be called. It will also
40 * be called when messages from the library is received. This function
41 * is run in parallel.
42 */
43static void* per_packet(libtrace_t *trace UNUSED, libtrace_thread_t *t UNUSED,
44                        int mesg, libtrace_generic_t data,
45                        libtrace_thread_t *sender UNUSED)
46{
47        /* __thread, says make this unique per each thread */
48        static __thread uint64_t count = 0; /* The number of packets in this 10sec interval */
49        static __thread uint64_t next_report = 0; /* The start of the next interval */
50        static __thread uint64_t offset = 0; /* Offset between trace time and system time */
51        uint64_t ts; /* The timestamp of the current packet */
52
53        switch (mesg) {
54        case MESSAGE_PACKET:
55                /* Get the timestamp for the current packet */
56                ts = trace_get_erf_timestamp(data.pkt);
57
58                /* Check whether we need to report a packet count or not.
59                 *
60                 * If the timestamp for the current packet is beyond the time when the
61                 * next report was due then we have to output our current count and
62                 * reset it to zero.
63                 *
64                 * Note that I use a while loop here to ensure that we correctly deal
65                 * with periods in which no packets are observed. This can still
66                 * happen because TICK_INTERVAL is not used for realtime playback
67                 * such as a file.
68                 */
69                while (next_report && ts > next_report) {
70                        libtrace_generic_t c;
71                        c.uint64 = count;
72                        /* Report the result for the current time interval
73                         * Each thread will report once for each given time
74                         * interval */
75                        trace_publish_result(trace, t, next_report, c, RESULT_USER);
76
77                        /* Reset the counter */
78                        count = 0;
79                        /* Determine when the next report is due */
80                        next_report += SECONDS_TO_ERF(10);
81                }
82
83                /* No matter what else happens during this function call, we still
84                 * need to increment our counter */
85                count += 1;
86
87                /* We have finished processing this packet return it */
88                return data.pkt;
89        case MESSAGE_TICK_INTERVAL:
90
91                 /* If we are a second passed when we should have reported last
92                  * we will do it now. We would be in this situation if we
93                  * haven't been receiving packets.
94                  * Make sure we dont report until we have seen the first packet
95                  */
96                while (next_report &&
97                       (data.uint64 - offset - SECONDS_TO_ERF(1) > next_report)) {
98                        libtrace_generic_t c;
99                        c.uint64 = count;
100                        /* Report the result for the current time interval */
101                        trace_publish_result(trace, t, next_report, c, RESULT_USER);
102
103                        /* Reset the counter */
104                        count = 0;
105                        /* Determine when the next report is due */
106                        next_report += SECONDS_TO_ERF(10);
107                }
108
109        /* !!! Fall through to check if we have the first packet yet !!! */
110        case MESSAGE_FIRST_PACKET: /* Some thread has seen its first packet */
111
112                if (next_report == 0) {
113                        uint64_t first_ts;
114                        /* Try get the timestamp of the first packet across all threads*/
115                        const libtrace_packet_t * tmp = NULL;
116                        const struct timeval *tv;
117
118                        /* Get the first packet across all threads */
119                        if (trace_get_first_packet(trace, NULL, &tmp, &tv) == 1) {
120                                /* We know this is the first packet across all threads */
121
122                                first_ts = trace_get_erf_timestamp(tmp);
123                                /* There might be a difference between system time
124                                 * and packet times. We need to account for this
125                                 * when interpreting TICK_INTERVAL messages */
126                                offset = TV_TO_ERF(*tv) - first_ts;
127                                /* We know our first reporting time now */
128                                next_report = first_ts + SECONDS_TO_ERF(10);
129                        }
130                }
131                return NULL;
132        default:
133                return NULL;
134        }
135        return NULL;
136}
137
138/* Every time a result (published using trace_publish_result()) becomes ready
139 * this function will be called. It will also be called when messages from the
140 *library is received. This function is only run on a single thread */
141 */
142static void report_results(libtrace_t *trace UNUSED, int mesg,
143                           libtrace_generic_t data,
144                           libtrace_thread_t *sender UNUSED) {
145        static uint64_t count = 0; /* The count for the current interval */
146        static int reported = 0; /* The number of threads that have reported results for the interval */
147        static uint64_t currentkey = 0; /* The key, which is next_report from perpkt */
148
149        switch (mesg) {
150        case MESSAGE_RESULT:
151                if (data.res->type == RESULT_USER) {
152                        /* We should always get a result from each thread */
153                        if (currentkey)
154                                assert(data.res->key == currentkey);
155
156                        currentkey = data.res->key;
157                        reported++;
158                        /* Add on the packets */
159                        count += data.res->value.uint64;
160
161                        if (reported == libtrace_get_perpkt_count(trace)) {
162                                /* Print a timestamp for the report and the packet count */
163                                printf("%u \t%" PRIu64 "\n", (int) ERF_TO_SECONDS(data.res->key), count);
164                                /* Reset ready for the next batch of results */
165                                count = reported = 0;
166                                currentkey = data.res->key + SECONDS_TO_ERF(10);
167                        }
168                }
169                break;
170        case MESSAGE_STARTING:
171                /* Print heading when first started */
172                printf("Time\t\tPackets\n");
173                break;
174        }
175}
176
177int main(int argc, char *argv[])
178{
179        libtrace_t *trace = NULL;
180
181        /* Ensure we have at least one argument after the program name */
182        if (argc < 2) {
183                fprintf(stderr, "Usage: %s inputURI\n", argv[0]);
184                return 1;
185        }
186
187        trace = trace_create(argv[1]);
188
189        if (trace_is_err(trace)) {
190                trace_perror(trace,"Opening trace file");
191                libtrace_cleanup(trace);
192                return 1;
193        }
194
195        /* We want to push through results ASAP */
196        trace_set_reporter_thold(trace, 1);
197
198        /* If the trace is live send a tick message every second */
199        trace_set_tick_interval(trace, 1000);
200
201        /* The combiner sits between trace_publish_result() and the reporter
202         * function and determines how the results show be ordered and combined.
203         *
204         * Our results are ordered by timestamp and we want them to be returned
205         * in order so we use combiner_ordered.
206         *
207         * This typically the most usefull combiner to use.
208         */
209        trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_t){0});
210
211        if (trace_pstart(trace, NULL, per_packet, report_results) == -1) {
212                trace_perror(trace,"Starting trace");
213                libtrace_cleanup(trace);
214                return 1;
215        }
216
217        /* Wait for completion */
218        trace_join(trace);
219        if (trace_is_err(trace)) {
220                trace_perror(trace,"Reading packets");
221                libtrace_cleanup(trace);
222                return 1;
223        }
224
225        libtrace_cleanup(trace);
226        return 0;
227}
Note: See TracBrowser for help on using the repository browser.