source: test/test-format-parallel-reporter.c

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file was 135e3f6, checked in by Shane Alcock <salcock@…>, 5 years ago

Add usleep for first packet in all parallel tests

This ensures that the processing and reporting threads will still be
running when our test tries to pause them. Without this, our tests will
sometimes fail because we call pause on a trace that is finished (i.e. not
started).

Started noticing this with gcc 5.1.1, so might be due to better compiler
optimisations?

  • Property mode set to 100644
File size: 9.8 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007 The University of Waikato, Hamilton, New Zealand.
5 * Authors: Daniel Lawson
6 *          Perry Lorier
7 *         
8 * All rights reserved.
9 *
10 * This code has been developed by the University of Waikato WAND
11 * research group. For further information please see http://www.wand.net.nz/
12 *
13 * libtrace is free software; you can redistribute it and/or modify
14 * it under the terms of the GNU General Public License as published by
15 * the Free Software Foundation; either version 2 of the License, or
16 * (at your option) any later version.
17 *
18 * libtrace is distributed in the hope that it will be useful,
19 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21 * GNU General Public License for more details.
22 *
23 * You should have received a copy of the GNU General Public License
24 * along with libtrace; if not, write to the Free Software
25 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26 *
27 * $Id: test-rtclient.c,v 1.2 2006/02/27 03:41:12 perry Exp $
28 *
29 */
30#ifndef WIN32
31#  include <sys/time.h>
32#  include <netinet/in.h>
33#  include <netinet/in_systm.h>
34#  include <netinet/tcp.h>
35#  include <netinet/ip.h>
36#  include <netinet/ip_icmp.h>
37#  include <arpa/inet.h>
38#  include <sys/socket.h>
39#endif
40#include <stdio.h>
41#include <stdlib.h>
42#include <assert.h>
43#include <string.h>
44#include <sys/types.h>
45#include <time.h>
46#include <string.h>
47#include <signal.h>
48#include <unistd.h>
49
50#include "dagformat.h"
51#include "libtrace_parallel.h"
52#include "data-struct/vector.h"
53
54void iferr(libtrace_t *trace,const char *msg)
55{
56        libtrace_err_t err = trace_get_err(trace);
57        if (err.err_num==0)
58                return;
59        printf("Error: %s: %s\n", msg, err.problem);
60        exit(1);
61}
62
63const char *lookup_uri(const char *type) {
64        if (strchr(type,':'))
65                return type;
66        if (!strcmp(type,"erf"))
67                return "erf:traces/100_packets.erf";
68        if (!strcmp(type,"rawerf"))
69                return "rawerf:traces/100_packets.erf";
70        if (!strcmp(type,"pcap"))
71                return "pcap:traces/100_packets.pcap";
72        if (!strcmp(type,"wtf"))
73                return "wtf:traces/wed.wtf";
74        if (!strcmp(type,"rtclient"))
75                return "rtclient:chasm";
76        if (!strcmp(type,"pcapfile"))
77                return "pcapfile:traces/100_packets.pcap";
78        if (!strcmp(type,"pcapfilens"))
79                return "pcapfile:traces/100_packetsns.pcap";
80        if (!strcmp(type, "duck"))
81                return "duck:traces/100_packets.duck";
82        if (!strcmp(type, "legacyatm"))
83                return "legacyatm:traces/legacyatm.gz";
84        if (!strcmp(type, "legacypos"))
85                return "legacypos:traces/legacypos.gz";
86        if (!strcmp(type, "legacyeth"))
87                return "legacyeth:traces/legacyeth.gz";
88        if (!strcmp(type, "tsh"))
89                return "tsh:traces/10_packets.tsh.gz";
90        return type;
91}
92
93struct TLS {
94        bool seen_start_message;
95        bool seen_stop_message;
96        bool seen_resuming_message;
97        bool seen_pausing_message;
98        int count;
99};
100
101struct final {
102        uint64_t last;
103        int packets;
104};
105
106static void *report_start(libtrace_t *trace UNUSED,
107                libtrace_thread_t *t UNUSED,
108                void *global) {
109        uint32_t *magic = (uint32_t *)global;
110        struct final *threadcounter =
111                        (struct final *)malloc(sizeof(struct final));
112
113        assert(*magic == 0xabcdef);
114
115        threadcounter->last = 0;
116        threadcounter->packets = 0;
117        return threadcounter;
118}
119
120static void report_cb(libtrace_t *trace UNUSED,
121                libtrace_thread_t *sender UNUSED,
122                void *global, void *tls, libtrace_result_t *res) {
123
124        uint32_t *magic = (uint32_t *)global;
125        struct final *threadcounter = (struct final *)tls;
126
127        assert(*magic == 0xabcdef);
128        assert(res->type == RESULT_PACKET);
129     
130        if (threadcounter->last != 0)
131                assert(threadcounter->last + 1 == res->key);
132        threadcounter->last = res->key;
133
134        threadcounter->packets += 1;
135        trace_free_packet(trace, res->value.pkt);
136}
137
138static void report_end(libtrace_t *trace UNUSED, libtrace_thread_t *t UNUSED,
139                void *global, void *tls) {
140
141        uint32_t *magic = (uint32_t *)global;
142        struct final *threadcounter = (struct final *)tls;
143
144        assert(*magic == 0xabcdef);
145        assert(threadcounter->packets == 100);
146
147        free(threadcounter);
148}
149
150static libtrace_packet_t *per_packet(libtrace_t *trace UNUSED,
151                libtrace_thread_t *t UNUSED,
152                void *global, void *tls, libtrace_packet_t *packet) {
153        struct TLS *storage = (struct TLS *)tls;
154        uint32_t *magic = (uint32_t *)global;
155        static __thread int count = 0;
156        int a,*b,c=0;
157
158        assert(storage != NULL);
159        assert(!storage->seen_stop_message);
160
161        if (storage->seen_pausing_message)
162                assert(storage->seen_resuming_message);
163
164        assert(*magic == 0xabcdef);
165
166        if (storage->count == 0)
167                usleep(100000);
168        storage->count ++;
169        count ++;
170
171        assert(count == storage->count);
172
173        if (count > 100) {
174                fprintf(stderr, "Too many packets -- someone should stop me!\n");
175                kill(getpid(), SIGTERM);
176        }
177
178        // Do some work to even out the load on cores
179        b = &c;
180        for (a = 0; a < 10000000; a++) {
181                c += a**b;
182        }
183
184        trace_publish_result(trace, t, trace_packet_get_order(packet),
185                        (libtrace_generic_t){.pkt=packet}, RESULT_PACKET);
186
187        return NULL;
188}
189
190static void *start_processing(libtrace_t *trace, libtrace_thread_t *t UNUSED,
191                void *global) {
192
193        static __thread bool seen_start_message = false;
194        uint32_t *magic = (uint32_t *)global;
195        struct TLS *storage = NULL;
196        assert(*magic == 0xabcdef);
197
198        assert(!seen_start_message);
199        assert(trace);
200
201        storage = (struct TLS *)malloc(sizeof(struct TLS));
202        storage->seen_start_message = true;
203        storage->seen_stop_message = false;
204        storage->seen_resuming_message = false;
205        storage->seen_pausing_message = false;
206        storage->count = 0;
207
208        seen_start_message = true;
209
210        return storage;
211}
212
213static void stop_processing(libtrace_t *trace UNUSED,
214                libtrace_thread_t *t UNUSED,
215                void *global, void *tls) {
216
217        static __thread bool seen_stop_message = false;
218        struct TLS *storage = (struct TLS *)tls;
219        uint32_t *magic = (uint32_t *)global;
220
221        assert(storage != NULL);
222        assert(!storage->seen_stop_message);
223        assert(!seen_stop_message);
224        assert(storage->seen_start_message);
225        assert(*magic == 0xabcdef);
226
227        seen_stop_message = true;
228        storage->seen_stop_message = true;
229        free(storage);
230}
231
232static void process_tick(libtrace_t *trace UNUSED, libtrace_thread_t *t UNUSED,
233                void *global UNUSED, void *tls UNUSED, uint64_t tick UNUSED) {
234
235        fprintf(stderr, "Not expecting a tick packet\n");
236        kill(getpid(), SIGTERM);
237}
238
239static void pause_processing(libtrace_t *trace UNUSED,
240                libtrace_thread_t *t UNUSED,
241                void *global, void *tls) {
242
243        static __thread bool seen_pause_message = false;
244        struct TLS *storage = (struct TLS *)tls;
245        uint32_t *magic = (uint32_t *)global;
246
247        assert(storage != NULL);
248        assert(!storage->seen_stop_message);
249        assert(storage->seen_start_message);
250        assert(*magic == 0xabcdef);
251
252        assert(seen_pause_message == storage->seen_pausing_message);
253
254        seen_pause_message = true;
255        storage->seen_pausing_message = true;
256}
257
258static void resume_processing(libtrace_t *trace UNUSED,
259                libtrace_thread_t *t UNUSED,
260                void *global, void *tls) {
261
262        static __thread bool seen_resume_message = false;
263        struct TLS *storage = (struct TLS *)tls;
264        uint32_t *magic = (uint32_t *)global;
265
266        assert(storage != NULL);
267        assert(!storage->seen_stop_message);
268        assert(storage->seen_start_message);
269        assert(*magic == 0xabcdef);
270
271        assert(seen_resume_message == storage->seen_resuming_message);
272
273        seen_resume_message = true;
274        storage->seen_resuming_message = true;
275}
276
277int main(int argc, char *argv[]) {
278        int error = 0;
279        const char *tracename;
280        libtrace_t *trace;
281        libtrace_callback_set_t *processing = NULL;
282        libtrace_callback_set_t *reporter = NULL;
283        uint32_t global = 0xabcdef;
284
285        if (argc<2) {
286                fprintf(stderr,"usage: %s type\n",argv[0]);
287                return 1;
288        }
289
290        tracename = lookup_uri(argv[1]);
291
292        trace = trace_create(tracename);
293        iferr(trace,tracename);
294
295        processing = trace_create_callback_set();
296        trace_set_starting_cb(processing, start_processing);
297        trace_set_stopping_cb(processing, stop_processing);
298        trace_set_packet_cb(processing, per_packet);
299        trace_set_pausing_cb(processing, pause_processing);
300        trace_set_resuming_cb(processing, resume_processing);
301        trace_set_tick_count_cb(processing, process_tick);
302        trace_set_tick_interval_cb(processing, process_tick);
303
304        reporter = trace_create_callback_set();
305        trace_set_starting_cb(reporter, report_start);
306        trace_set_stopping_cb(reporter, report_end);
307        trace_set_result_cb(reporter, report_cb);
308
309
310        /* Test ordered combiner */
311        trace_set_perpkt_threads(trace, 4);
312        trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_t){0});
313
314        trace_pstart(trace, &global, processing, reporter);
315        iferr(trace,tracename);
316
317        /* Make sure traces survive a pause */
318        trace_ppause(trace);
319        iferr(trace,tracename);
320        trace_pstart(trace, NULL, NULL, NULL);
321        iferr(trace,tracename);
322
323        /* Wait for all threads to stop */
324        trace_join(trace);
325
326        global = 0xffffffff;
327
328        /* Now check we have all received all the packets */
329        if (error != 0) {
330                iferr(trace,tracename);
331        }
332
333        trace_destroy(trace);
334        trace_destroy_callback_set(processing);
335        trace_destroy_callback_set(reporter);
336        return error;
337}
Note: See TracBrowser for help on using the repository browser.