source: test/test-format-parallel-singlethreaded-hasher.c @ 135e3f6

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 135e3f6 was 135e3f6, checked in by Shane Alcock <salcock@…>, 6 years ago

Add usleep for first packet in all parallel tests

This ensures that the processing and reporting threads will still be
running when our test tries to pause them. Without this, our tests will
sometimes fail because we call pause on a trace that is finished (i.e. not
started).

Started noticing this with gcc 5.1.1, so might be due to better compiler
optimisations?

  • Property mode set to 100644
File size: 10.1 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007 The University of Waikato, Hamilton, New Zealand.
5 * Authors: Daniel Lawson
6 *          Perry Lorier
7 *         
8 * All rights reserved.
9 *
10 * This code has been developed by the University of Waikato WAND
11 * research group. For further information please see http://www.wand.net.nz/
12 *
13 * libtrace is free software; you can redistribute it and/or modify
14 * it under the terms of the GNU General Public License as published by
15 * the Free Software Foundation; either version 2 of the License, or
16 * (at your option) any later version.
17 *
18 * libtrace is distributed in the hope that it will be useful,
19 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21 * GNU General Public License for more details.
22 *
23 * You should have received a copy of the GNU General Public License
24 * along with libtrace; if not, write to the Free Software
25 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26 *
27 * $Id: test-rtclient.c,v 1.2 2006/02/27 03:41:12 perry Exp $
28 *
29 */
30#ifndef WIN32
31#  include <sys/time.h>
32#  include <netinet/in.h>
33#  include <netinet/in_systm.h>
34#  include <netinet/tcp.h>
35#  include <netinet/ip.h>
36#  include <netinet/ip_icmp.h>
37#  include <arpa/inet.h>
38#  include <sys/socket.h>
39#endif
40#include <stdio.h>
41#include <stdlib.h>
42#include <assert.h>
43#include <string.h>
44#include <sys/types.h>
45#include <time.h>
46#include <string.h>
47#include <signal.h>
48#include <unistd.h>
49
50#include "dagformat.h"
51#include "libtrace_parallel.h"
52#include "data-struct/vector.h"
53
54void iferr(libtrace_t *trace,const char *msg)
55{
56        libtrace_err_t err = trace_get_err(trace);
57        if (err.err_num==0)
58                return;
59        printf("Error: %s: %s\n", msg, err.problem);
60        exit(1);
61}
62
63const char *lookup_uri(const char *type) {
64        if (strchr(type,':'))
65                return type;
66        if (!strcmp(type,"erf"))
67                return "erf:traces/100_packets.erf";
68        if (!strcmp(type,"rawerf"))
69                return "rawerf:traces/100_packets.erf";
70        if (!strcmp(type,"pcap"))
71                return "pcap:traces/100_packets.pcap";
72        if (!strcmp(type,"wtf"))
73                return "wtf:traces/wed.wtf";
74        if (!strcmp(type,"rtclient"))
75                return "rtclient:chasm";
76        if (!strcmp(type,"pcapfile"))
77                return "pcapfile:traces/100_packets.pcap";
78        if (!strcmp(type,"pcapfilens"))
79                return "pcapfile:traces/100_packetsns.pcap";
80        if (!strcmp(type, "duck"))
81                return "duck:traces/100_packets.duck";
82        if (!strcmp(type, "legacyatm"))
83                return "legacyatm:traces/legacyatm.gz";
84        if (!strcmp(type, "legacypos"))
85                return "legacypos:traces/legacypos.gz";
86        if (!strcmp(type, "legacyeth"))
87                return "legacyeth:traces/legacyeth.gz";
88        if (!strcmp(type, "tsh"))
89                return "tsh:traces/10_packets.tsh.gz";
90        return type;
91}
92
93struct TLS {
94        bool seen_start_message;
95        bool seen_stop_message;
96        bool seen_resuming_message;
97        bool seen_pausing_message;
98        int count;
99};
100
101struct final {
102        int threads;
103        int packets;
104};
105
106static void *report_start(libtrace_t *trace UNUSED,
107                libtrace_thread_t *t UNUSED,
108                void *global) {
109        uint32_t *magic = (uint32_t *)global;
110        struct final *threadcounter =
111                        (struct final *)malloc(sizeof(struct final));
112
113        assert(*magic == 0xabcdef);
114
115        threadcounter->threads = 0;
116        threadcounter->packets = 0;
117        return threadcounter;
118}
119
120static void report_cb(libtrace_t *trace UNUSED,
121                libtrace_thread_t *sender UNUSED,
122                void *global, void *tls, libtrace_result_t *res) {
123
124        uint32_t *magic = (uint32_t *)global;
125        struct final *threadcounter = (struct final *)tls;
126
127        assert(*magic == 0xabcdef);
128        assert(res->key == 0);
129
130        threadcounter->threads ++;
131        threadcounter->packets += res->value.sint;
132
133        assert(res->value.sint == 100);
134        printf("%d\n", res->value.sint);
135}
136
137static void report_end(libtrace_t *trace, libtrace_thread_t *t UNUSED,
138                void *global, void *tls) {
139
140        uint32_t *magic = (uint32_t *)global;
141        struct final *threadcounter = (struct final *)tls;
142
143        assert(*magic == 0xabcdef);
144        assert(threadcounter->threads == trace_get_perpkt_threads(trace));
145        assert(threadcounter->packets == 100);
146
147        free(threadcounter);
148}
149
150static libtrace_packet_t *per_packet(libtrace_t *trace UNUSED,
151                libtrace_thread_t *t UNUSED,
152                void *global, void *tls, libtrace_packet_t *packet) {
153        struct TLS *storage = (struct TLS *)tls;
154        uint32_t *magic = (uint32_t *)global;
155        static __thread int count = 0;
156        int a,*b,c=10;
157
158        assert(storage != NULL);
159        assert(!storage->seen_stop_message);
160
161        if (storage->seen_pausing_message)
162                assert(storage->seen_resuming_message);
163
164        assert(*magic == 0xabcdef);
165
166        if (storage->count == 0)
167                usleep(100000);
168        storage->count ++;
169        count ++;
170
171        assert(count == storage->count);
172
173        if (count > 100) {
174                fprintf(stderr, "Too many packets -- someone should stop me!\n");
175                kill(getpid(), SIGTERM);
176        }
177
178        // Do some work to even out the load on cores
179        b = &c;
180        for (a = 0; a < 50000000; a++) {
181                c += a**b;
182        }
183
184        return packet;
185}
186
187static void *start_processing(libtrace_t *trace, libtrace_thread_t *t UNUSED,
188                void *global) {
189
190        static __thread bool seen_start_message = false;
191        uint32_t *magic = (uint32_t *)global;
192        struct TLS *storage = NULL;
193        assert(*magic == 0xabcdef);
194
195        assert(!seen_start_message);
196        assert(trace);
197
198        storage = (struct TLS *)malloc(sizeof(struct TLS));
199        storage->seen_start_message = true;
200        storage->seen_stop_message = false;
201        storage->seen_resuming_message = false;
202        storage->seen_pausing_message = false;
203        storage->count = 0;
204
205        seen_start_message = true;
206
207        return storage;
208}
209
210static void stop_processing(libtrace_t *trace, libtrace_thread_t *t,
211                void *global, void *tls) {
212
213        static __thread bool seen_stop_message = false;
214        struct TLS *storage = (struct TLS *)tls;
215        uint32_t *magic = (uint32_t *)global;
216
217        assert(storage != NULL);
218        assert(!storage->seen_stop_message);
219        assert(!seen_stop_message);
220        assert(storage->seen_start_message);
221        assert(*magic == 0xabcdef);
222
223        seen_stop_message = true;
224        storage->seen_stop_message = true;
225
226        assert(storage->count == 100);
227
228        trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_t){.sint = storage->count}, RESULT_USER);
229        trace_post_reporter(trace);
230        free(storage);
231}
232
233static void process_tick(libtrace_t *trace UNUSED, libtrace_thread_t *t UNUSED,
234                void *global UNUSED, void *tls UNUSED, uint64_t tick UNUSED) {
235
236        fprintf(stderr, "Not expecting a tick packet\n");
237        kill(getpid(), SIGTERM);
238}
239
240static void pause_processing(libtrace_t *trace UNUSED,
241                libtrace_thread_t *t UNUSED,
242                void *global, void *tls) {
243
244        static __thread bool seen_pause_message = false;
245        struct TLS *storage = (struct TLS *)tls;
246        uint32_t *magic = (uint32_t *)global;
247
248        assert(storage != NULL);
249        assert(!storage->seen_stop_message);
250        assert(storage->seen_start_message);
251        assert(*magic == 0xabcdef);
252
253        assert(seen_pause_message == storage->seen_pausing_message);
254
255        seen_pause_message = true;
256        storage->seen_pausing_message = true;
257}
258
259static void resume_processing(libtrace_t *trace UNUSED,
260                libtrace_thread_t *t UNUSED,
261                void *global, void *tls) {
262
263        static __thread bool seen_resume_message = false;
264        struct TLS *storage = (struct TLS *)tls;
265        uint32_t *magic = (uint32_t *)global;
266
267        assert(storage != NULL);
268        assert(!storage->seen_stop_message);
269        assert(storage->seen_start_message);
270        assert(*magic == 0xabcdef);
271
272        assert(seen_resume_message == storage->seen_resuming_message);
273
274        seen_resume_message = true;
275        storage->seen_resuming_message = true;
276}
277
278uint64_t custom_hash(const libtrace_packet_t *packet UNUSED, void *data) {
279        int *count = (int *)data;
280        *count += 1;
281
282        /* Just throw the first 25 packets to thread 0 and the rest to thread
283         * 1.
284         */
285        if (*count <= 25)
286                return 0;
287        return 1;
288}
289
290int main(int argc, char *argv[]) {
291        int error = 0;
292        const char *tracename;
293        libtrace_t *trace;
294        libtrace_callback_set_t *processing = NULL;
295        libtrace_callback_set_t *reporter = NULL;
296        uint32_t global = 0xabcdef;
297        int hashercount = 0;
298
299        if (argc<2) {
300                fprintf(stderr,"usage: %s type\n",argv[0]);
301                return 1;
302        }
303
304        tracename = lookup_uri(argv[1]);
305
306        trace = trace_create(tracename);
307        iferr(trace,tracename);
308
309        processing = trace_create_callback_set();
310        trace_set_starting_cb(processing, start_processing);
311        trace_set_stopping_cb(processing, stop_processing);
312        trace_set_packet_cb(processing, per_packet);
313        trace_set_pausing_cb(processing, pause_processing);
314        trace_set_resuming_cb(processing, resume_processing);
315        trace_set_tick_count_cb(processing, process_tick);
316        trace_set_tick_interval_cb(processing, process_tick);
317
318        reporter = trace_create_callback_set();
319        trace_set_starting_cb(reporter, report_start);
320        trace_set_stopping_cb(reporter, report_end);
321        trace_set_result_cb(reporter, report_cb);
322
323
324        /* Set up our hasher and our single thread */
325        trace_set_perpkt_threads(trace, 1);
326        trace_set_hasher(trace, HASHER_CUSTOM, &custom_hash, &hashercount);
327
328        trace_pstart(trace, &global, processing, reporter);
329        iferr(trace,tracename);
330
331        /* Make sure traces survive a pause */
332        trace_ppause(trace);
333        iferr(trace,tracename);
334        trace_pstart(trace, NULL, NULL, NULL);
335        iferr(trace,tracename);
336
337        /* Wait for all threads to stop */
338        trace_join(trace);
339
340        global = 0xffffffff;
341
342        /* Now check we have all received all the packets */
343        if (error != 0) {
344                iferr(trace,tracename);
345        }
346
347        trace_destroy(trace);
348        trace_destroy_callback_set(processing);
349        trace_destroy_callback_set(reporter);
350        return error;
351}
Note: See TracBrowser for help on using the repository browser.