source: examples/parallel/timedemo.c @ 9346e4a

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 9346e4a was 7c95027, checked in by Richard Sanger <rsangerarj@…>, 7 years ago

Move combiners.h into libtrace_parallel.h and export libtrace_parallel.h
so that it is installed correctly.

  • Property mode set to 100644
File size: 7.4 KB
Line 
1/* A parallel libtrace program that prints a count of packets observered
2 * after a 10 seconds of the trace running.
3 *
4 * Using this approach allows results to be reported quickly for trracetime
5 * formats, even if data is not arriving on a given thread. While maintaining
6 * a consistant output when run on a file etc.
7 *
8 * Designed to demonstrate the correct usage of TICK_INTERVAL. Also note
9 * TICK_COUNT is not needed for this example.
10 *
11 * This example is based upon examples/tutorial/timedemo.c
12 */
13#include "libtrace_parallel.h"
14#include <stdio.h>
15#include <inttypes.h>
16#include <assert.h>
17#include <getopt.h>
18
19#define SECONDS_TO_ERF(sec) (((uint64_t)sec)<<32)
20#define ERF_TO_SECONDS(erf) (((uint64_t)erf)>>32)
21#define USEC_TO_ERF(usec) ((uint64_t)usec * 0xFFFFFFFFull)
22#define TV_TO_ERF(tv) ((((uint64_t)(tv).tv_sec) << 32) + ((((uint64_t)(tv).tv_usec)<< 32)/1000000))
23
24/* Due to the amount of error checking required in our main function, it
25 * is a lot simpler and tidier to place all the calls to various libtrace
26 * destroy functions into a separate function.
27 */
28static void libtrace_cleanup(libtrace_t *trace) {
29
30        /* It's very important to ensure that we aren't trying to destroy
31         * a NULL structure, so each of the destroy calls will only occur
32         * if the structure exists */
33        if (trace)
34                trace_destroy(trace);
35
36}
37
38/* Every time a packet becomes ready this function will be called. It will also
39 * be called when messages from the library is received. This function
40 * is run in parallel.
41 */
42static void* per_packet(libtrace_t *trace UNUSED, libtrace_thread_t *t UNUSED,
43                        int mesg, libtrace_generic_t data,
44                        libtrace_thread_t *sender UNUSED)
45{
46        /* __thread, says make this unique per each thread */
47        static __thread uint64_t count = 0; /* The number of packets in this 10sec interval */
48        static __thread uint64_t next_report = 0; /* The start of the next interval */
49        static __thread uint64_t offset = 0; /* Offset between trace time and system time */
50        uint64_t ts; /* The timestamp of the current packet */
51
52        switch (mesg) {
53        case MESSAGE_PACKET:
54                /* Get the timestamp for the current packet */
55                ts = trace_get_erf_timestamp(data.pkt);
56
57                /* Check whether we need to report a packet count or not.
58                 *
59                 * If the timestamp for the current packet is beyond the time when the
60                 * next report was due then we have to output our current count and
61                 * reset it to zero.
62                 *
63                 * Note that I use a while loop here to ensure that we correctly deal
64                 * with periods in which no packets are observed. This can still
65                 * happen because TICK_INTERVAL is not used for realtime playback
66                 * such as a file.
67                 */
68                while (next_report && ts > next_report) {
69                        libtrace_generic_t c;
70                        c.uint64 = count;
71                        /* Report the result for the current time interval
72                         * Each thread will report once for each given time
73                         * interval */
74                        trace_publish_result(trace, t, next_report, c, RESULT_USER);
75
76                        /* Reset the counter */
77                        count = 0;
78                        /* Determine when the next report is due */
79                        next_report += SECONDS_TO_ERF(10);
80                }
81
82                /* No matter what else happens during this function call, we still
83                 * need to increment our counter */
84                count += 1;
85
86                /* We have finished processing this packet return it */
87                return data.pkt;
88        case MESSAGE_TICK_INTERVAL:
89
90                 /* If we are a second passed when we should have reported last
91                  * we will do it now. We would be in this situation if we
92                  * haven't been receiving packets.
93                  * Make sure we dont report until we have seen the first packet
94                  */
95                while (next_report &&
96                       (data.uint64 - offset - SECONDS_TO_ERF(1) > next_report)) {
97                        libtrace_generic_t c;
98                        c.uint64 = count;
99                        /* Report the result for the current time interval */
100                        trace_publish_result(trace, t, next_report, c, RESULT_USER);
101
102                        /* Reset the counter */
103                        count = 0;
104                        /* Determine when the next report is due */
105                        next_report += SECONDS_TO_ERF(10);
106                }
107
108        /* !!! Fall through to check if we have the first packet yet !!! */
109        case MESSAGE_FIRST_PACKET: /* Some thread has seen its first packet */
110
111                if (next_report == 0) {
112                        uint64_t first_ts;
113                        /* Try get the timestamp of the first packet across all threads*/
114                        const libtrace_packet_t * tmp = NULL;
115                        const struct timeval *tv;
116
117                        /* Get the first packet across all threads */
118                        if (trace_get_first_packet(trace, NULL, &tmp, &tv) == 1) {
119                                /* We know this is the first packet across all threads */
120
121                                first_ts = trace_get_erf_timestamp(tmp);
122                                /* There might be a difference between system time
123                                 * and packet times. We need to account for this
124                                 * when interpreting TICK_INTERVAL messages */
125                                offset = TV_TO_ERF(*tv) - first_ts;
126                                /* We know our first reporting time now */
127                                next_report = first_ts + SECONDS_TO_ERF(10);
128                        }
129                }
130                return NULL;
131        default:
132                return NULL;
133        }
134        return NULL;
135}
136
137/* Every time a result (published using trace_publish_result()) becomes ready
138 * this function will be called. It will also be called when messages from the
139 * library is received. This function is only run on a single thread
140 */
141static void report_results(libtrace_t *trace UNUSED, int mesg,
142                           libtrace_generic_t data,
143                           libtrace_thread_t *sender UNUSED) {
144        static uint64_t count = 0; /* The count for the current interval */
145        static int reported = 0; /* The number of threads that have reported results for the interval */
146        static uint64_t currentkey = 0; /* The key, which is next_report from perpkt */
147
148        switch (mesg) {
149        case MESSAGE_RESULT:
150                if (data.res->type == RESULT_USER) {
151                        /* We should always get a result from each thread */
152                        if (currentkey)
153                                assert(data.res->key == currentkey);
154
155                        currentkey = data.res->key;
156                        reported++;
157                        /* Add on the packets */
158                        count += data.res->value.uint64;
159
160                        if (reported == libtrace_get_perpkt_count(trace)) {
161                                /* Print a timestamp for the report and the packet count */
162                                printf("%u \t%" PRIu64 "\n", (int) ERF_TO_SECONDS(data.res->key), count);
163                                /* Reset ready for the next batch of results */
164                                count = reported = 0;
165                                currentkey = data.res->key + SECONDS_TO_ERF(10);
166                        }
167                }
168                break;
169        case MESSAGE_STARTING:
170                /* Print heading when first started */
171                printf("Time\t\tPackets\n");
172                break;
173        }
174}
175
176int main(int argc, char *argv[])
177{
178        libtrace_t *trace = NULL;
179
180        /* Ensure we have at least one argument after the program name */
181        if (argc < 2) {
182                fprintf(stderr, "Usage: %s inputURI\n", argv[0]);
183                return 1;
184        }
185
186        trace = trace_create(argv[1]);
187
188        if (trace_is_err(trace)) {
189                trace_perror(trace,"Opening trace file");
190                libtrace_cleanup(trace);
191                return 1;
192        }
193
194        /* We want to push through results ASAP */
195        trace_set_reporter_thold(trace, 1);
196
197        /* If the trace is live send a tick message every second */
198        trace_set_tick_interval(trace, 1000);
199
200        /* The combiner sits between trace_publish_result() and the reporter
201         * function and determines how the results show be ordered and combined.
202         *
203         * Our results are ordered by timestamp and we want them to be returned
204         * in order so we use combiner_ordered.
205         *
206         * This typically the most usefull combiner to use.
207         */
208        trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_t){0});
209
210        if (trace_pstart(trace, NULL, per_packet, report_results) == -1) {
211                trace_perror(trace,"Starting trace");
212                libtrace_cleanup(trace);
213                return 1;
214        }
215
216        /* Wait for completion */
217        trace_join(trace);
218        if (trace_is_err(trace)) {
219                trace_perror(trace,"Reading packets");
220                libtrace_cleanup(trace);
221                return 1;
222        }
223
224        libtrace_cleanup(trace);
225        return 0;
226}
Note: See TracBrowser for help on using the repository browser.