source: test/test-format-parallel-singlethreaded.c

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file was 135e3f6, checked in by Shane Alcock <salcock@…>, 5 years ago

Add usleep for first packet in all parallel tests

This ensures that the processing and reporting threads will still be
running when our test tries to pause them. Without this, our tests will
sometimes fail because we call pause on a trace that is finished (i.e. not
started).

Started noticing this with gcc 5.1.1, so might be due to better compiler
optimisations?

  • Property mode set to 100644
File size: 9.6 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007 The University of Waikato, Hamilton, New Zealand.
5 * Authors: Daniel Lawson
6 *          Perry Lorier
7 *         
8 * All rights reserved.
9 *
10 * This code has been developed by the University of Waikato WAND
11 * research group. For further information please see http://www.wand.net.nz/
12 *
13 * libtrace is free software; you can redistribute it and/or modify
14 * it under the terms of the GNU General Public License as published by
15 * the Free Software Foundation; either version 2 of the License, or
16 * (at your option) any later version.
17 *
18 * libtrace is distributed in the hope that it will be useful,
19 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21 * GNU General Public License for more details.
22 *
23 * You should have received a copy of the GNU General Public License
24 * along with libtrace; if not, write to the Free Software
25 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26 *
27 * $Id: test-rtclient.c,v 1.2 2006/02/27 03:41:12 perry Exp $
28 *
29 */
30#ifndef WIN32
31#  include <sys/time.h>
32#  include <netinet/in.h>
33#  include <netinet/in_systm.h>
34#  include <netinet/tcp.h>
35#  include <netinet/ip.h>
36#  include <netinet/ip_icmp.h>
37#  include <arpa/inet.h>
38#  include <sys/socket.h>
39#endif
40#include <stdio.h>
41#include <stdlib.h>
42#include <assert.h>
43#include <string.h>
44#include <sys/types.h>
45#include <time.h>
46#include <string.h>
47#include <signal.h>
48#include <unistd.h>
49
50#include "dagformat.h"
51#include "libtrace_parallel.h"
52#include "data-struct/vector.h"
53
54void iferr(libtrace_t *trace,const char *msg)
55{
56        libtrace_err_t err = trace_get_err(trace);
57        if (err.err_num==0)
58                return;
59        printf("Error: %s: %s\n", msg, err.problem);
60        exit(1);
61}
62
63const char *lookup_uri(const char *type) {
64        if (strchr(type,':'))
65                return type;
66        if (!strcmp(type,"erf"))
67                return "erf:traces/100_packets.erf";
68        if (!strcmp(type,"rawerf"))
69                return "rawerf:traces/100_packets.erf";
70        if (!strcmp(type,"pcap"))
71                return "pcap:traces/100_packets.pcap";
72        if (!strcmp(type,"wtf"))
73                return "wtf:traces/wed.wtf";
74        if (!strcmp(type,"rtclient"))
75                return "rtclient:chasm";
76        if (!strcmp(type,"pcapfile"))
77                return "pcapfile:traces/100_packets.pcap";
78        if (!strcmp(type,"pcapfilens"))
79                return "pcapfile:traces/100_packetsns.pcap";
80        if (!strcmp(type, "duck"))
81                return "duck:traces/100_packets.duck";
82        if (!strcmp(type, "legacyatm"))
83                return "legacyatm:traces/legacyatm.gz";
84        if (!strcmp(type, "legacypos"))
85                return "legacypos:traces/legacypos.gz";
86        if (!strcmp(type, "legacyeth"))
87                return "legacyeth:traces/legacyeth.gz";
88        if (!strcmp(type, "tsh"))
89                return "tsh:traces/10_packets.tsh.gz";
90        return type;
91}
92
93struct TLS {
94        bool seen_start_message;
95        bool seen_stop_message;
96        bool seen_resuming_message;
97        bool seen_pausing_message;
98        int count;
99};
100
101struct final {
102        int threads;
103        int packets;
104};
105
106static void *report_start(libtrace_t *trace UNUSED,
107                libtrace_thread_t *t UNUSED,
108                void *global) {
109        uint32_t *magic = (uint32_t *)global;
110        struct final *threadcounter =
111                        (struct final *)malloc(sizeof(struct final));
112
113        assert(*magic == 0xabcdef);
114
115        threadcounter->threads = 0;
116        threadcounter->packets = 0;
117        return threadcounter;
118}
119
120static void report_cb(libtrace_t *trace UNUSED,
121                libtrace_thread_t *sender UNUSED,
122                void *global, void *tls, libtrace_result_t *res) {
123
124        uint32_t *magic = (uint32_t *)global;
125        struct final *threadcounter = (struct final *)tls;
126
127        assert(*magic == 0xabcdef);
128        assert(res->key == 0);
129
130        threadcounter->threads ++;
131        threadcounter->packets += res->value.sint;
132        printf("%d\n", res->value.sint);
133}
134
135static void report_end(libtrace_t *trace, libtrace_thread_t *t UNUSED,
136                void *global, void *tls) {
137
138        uint32_t *magic = (uint32_t *)global;
139        struct final *threadcounter = (struct final *)tls;
140
141        assert(*magic == 0xabcdef);
142        assert(threadcounter->threads == trace_get_perpkt_threads(trace));
143        assert(threadcounter->packets == 100);
144
145        free(threadcounter);
146}
147
148static libtrace_packet_t *per_packet(libtrace_t *trace UNUSED,
149                libtrace_thread_t *t UNUSED,
150                void *global, void *tls, libtrace_packet_t *packet) {
151        struct TLS *storage = (struct TLS *)tls;
152        uint32_t *magic = (uint32_t *)global;
153        static __thread int count = 0;
154        int a,*b,c=0;
155
156        assert(storage != NULL);
157        assert(!storage->seen_stop_message);
158
159        if (storage->seen_pausing_message)
160                assert(storage->seen_resuming_message);
161
162        assert(*magic == 0xabcdef);
163
164        if (storage->count == 0)
165                usleep(100000);
166        storage->count ++;
167        count ++;
168
169        assert(count == storage->count);
170
171        if (count > 100) {
172                fprintf(stderr, "Too many packets -- someone should stop me!\n");
173                kill(getpid(), SIGTERM);
174        }
175
176        // Do some work to even out the load on cores
177        b = &c;
178        for (a = 0; a < 10000000; a++) {
179                c += a**b;
180        }
181
182        return packet;
183}
184
185static void *start_processing(libtrace_t *trace, libtrace_thread_t *t UNUSED,
186                void *global) {
187
188        static __thread bool seen_start_message = false;
189        uint32_t *magic = (uint32_t *)global;
190        struct TLS *storage = NULL;
191        assert(*magic == 0xabcdef);
192
193        assert(!seen_start_message);
194        assert(trace);
195
196        storage = (struct TLS *)malloc(sizeof(struct TLS));
197        storage->seen_start_message = true;
198        storage->seen_stop_message = false;
199        storage->seen_resuming_message = false;
200        storage->seen_pausing_message = false;
201        storage->count = 0;
202
203        seen_start_message = true;
204
205        return storage;
206}
207
208static void stop_processing(libtrace_t *trace, libtrace_thread_t *t,
209                void *global, void *tls) {
210
211        static __thread bool seen_stop_message = false;
212        struct TLS *storage = (struct TLS *)tls;
213        uint32_t *magic = (uint32_t *)global;
214
215        assert(storage != NULL);
216        assert(!storage->seen_stop_message);
217        assert(!seen_stop_message);
218        assert(storage->seen_start_message);
219        assert(*magic == 0xabcdef);
220
221        seen_stop_message = true;
222        storage->seen_stop_message = true;
223
224        trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_t){.sint = storage->count}, RESULT_USER);
225        trace_post_reporter(trace);
226        free(storage);
227}
228
229static void process_tick(libtrace_t *trace UNUSED, libtrace_thread_t *t UNUSED,
230                void *global UNUSED, void *tls UNUSED, uint64_t tick UNUSED) {
231
232        fprintf(stderr, "Not expecting a tick packet\n");
233        kill(getpid(), SIGTERM);
234}
235
236static void pause_processing(libtrace_t *trace UNUSED,
237                libtrace_thread_t *t UNUSED,
238                void *global, void *tls) {
239
240        static __thread bool seen_pause_message = false;
241        struct TLS *storage = (struct TLS *)tls;
242        uint32_t *magic = (uint32_t *)global;
243
244        assert(storage != NULL);
245        assert(!storage->seen_stop_message);
246        assert(storage->seen_start_message);
247        assert(*magic == 0xabcdef);
248
249        assert(seen_pause_message == storage->seen_pausing_message);
250
251        seen_pause_message = true;
252        storage->seen_pausing_message = true;
253}
254
255static void resume_processing(libtrace_t *trace UNUSED,
256                libtrace_thread_t *t UNUSED,
257                void *global, void *tls) {
258
259        static __thread bool seen_resume_message = false;
260        struct TLS *storage = (struct TLS *)tls;
261        uint32_t *magic = (uint32_t *)global;
262
263        assert(storage != NULL);
264        assert(!storage->seen_stop_message);
265        assert(storage->seen_start_message);
266        assert(*magic == 0xabcdef);
267
268        assert(seen_resume_message == storage->seen_resuming_message);
269
270        seen_resume_message = true;
271        storage->seen_resuming_message = true;
272}
273
274int main(int argc, char *argv[]) {
275        int error = 0;
276        const char *tracename;
277        libtrace_t *trace;
278        libtrace_callback_set_t *processing = NULL;
279        libtrace_callback_set_t *reporter = NULL;
280        uint32_t global = 0xabcdef;
281
282        if (argc<2) {
283                fprintf(stderr,"usage: %s type\n",argv[0]);
284                return 1;
285        }
286
287        tracename = lookup_uri(argv[1]);
288
289        trace = trace_create(tracename);
290        iferr(trace,tracename);
291
292        processing = trace_create_callback_set();
293        trace_set_starting_cb(processing, start_processing);
294        trace_set_stopping_cb(processing, stop_processing);
295        trace_set_packet_cb(processing, per_packet);
296        trace_set_pausing_cb(processing, pause_processing);
297        trace_set_resuming_cb(processing, resume_processing);
298        trace_set_tick_count_cb(processing, process_tick);
299        trace_set_tick_interval_cb(processing, process_tick);
300
301        reporter = trace_create_callback_set();
302        trace_set_starting_cb(reporter, report_start);
303        trace_set_stopping_cb(reporter, report_end);
304        trace_set_result_cb(reporter, report_cb);
305
306        /* Limit this to just one thread */
307        trace_set_perpkt_threads(trace, 1);
308
309        trace_pstart(trace, &global, processing, reporter);
310        iferr(trace,tracename);
311
312        /* Make sure traces survive a pause */
313        trace_ppause(trace);
314        iferr(trace,tracename);
315        trace_pstart(trace, NULL, NULL, NULL);
316        iferr(trace,tracename);
317
318        /* Wait for all threads to stop */
319        trace_join(trace);
320
321        global = 0xffffffff;
322
323        /* Now check we have all received all the packets */
324        if (error != 0) {
325                iferr(trace,tracename);
326        }
327
328        trace_destroy(trace);
329        trace_destroy_callback_set(processing);
330        trace_destroy_callback_set(reporter);
331        return error;
332}
Note: See TracBrowser for help on using the repository browser.