source: test/test-format-parallel-singlethreaded.c @ 2498008

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 2498008 was 2498008, checked in by Richard Sanger <rsangerarj@…>, 6 years ago

Refactor the combining step to allow user defined functions here.

Remove the old trace_get_results, now instead simply provide a reporter function which gets called as soon as results are ready.
The combiner function used determines the order of these results and when they are released etc.
The combiner function can be selected from those built-in or a custom version can be defined results are provided when ready.
Quickly hacked the parallel tests to work with this update, these are still a bit messy.

Also some fixes some compile warnings.

  • Property mode set to 100644
File size: 6.4 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007 The University of Waikato, Hamilton, New Zealand.
5 * Authors: Daniel Lawson
6 *          Perry Lorier
7 *         
8 * All rights reserved.
9 *
10 * This code has been developed by the University of Waikato WAND
11 * research group. For further information please see http://www.wand.net.nz/
12 *
13 * libtrace is free software; you can redistribute it and/or modify
14 * it under the terms of the GNU General Public License as published by
15 * the Free Software Foundation; either version 2 of the License, or
16 * (at your option) any later version.
17 *
18 * libtrace is distributed in the hope that it will be useful,
19 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21 * GNU General Public License for more details.
22 *
23 * You should have received a copy of the GNU General Public License
24 * along with libtrace; if not, write to the Free Software
25 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26 *
27 * $Id: test-rtclient.c,v 1.2 2006/02/27 03:41:12 perry Exp $
28 *
29 */
30#ifndef WIN32
31#  include <sys/time.h>
32#  include <netinet/in.h>
33#  include <netinet/in_systm.h>
34#  include <netinet/tcp.h>
35#  include <netinet/ip.h>
36#  include <netinet/ip_icmp.h>
37#  include <arpa/inet.h>
38#  include <sys/socket.h>
39#endif
40#include <stdio.h>
41#include <stdlib.h>
42#include <assert.h>
43#include <string.h>
44#include <sys/types.h>
45#include <time.h>
46#include <string.h>
47#include <signal.h>
48#include <unistd.h>
49
50#include "dagformat.h"
51#include "libtrace.h"
52#include "data-struct/vector.h"
53
54void iferr(libtrace_t *trace,const char *msg)
55{
56        libtrace_err_t err = trace_get_err(trace);
57        if (err.err_num==0)
58                return;
59        printf("Error: %s: %s\n", msg, err.problem);
60        exit(1);
61}
62
63const char *lookup_uri(const char *type) {
64        if (strchr(type,':'))
65                return type;
66        if (!strcmp(type,"erf"))
67                return "erf:traces/100_packets.erf";
68        if (!strcmp(type,"rawerf"))
69                return "rawerf:traces/100_packets.erf";
70        if (!strcmp(type,"pcap"))
71                return "pcap:traces/100_packets.pcap";
72        if (!strcmp(type,"wtf"))
73                return "wtf:traces/wed.wtf";
74        if (!strcmp(type,"rtclient"))
75                return "rtclient:chasm";
76        if (!strcmp(type,"pcapfile"))
77                return "pcapfile:traces/100_packets.pcap";
78        if (!strcmp(type,"pcapfilens"))
79                return "pcapfile:traces/100_packetsns.pcap";
80        if (!strcmp(type, "duck"))
81                return "duck:traces/100_packets.duck";
82        if (!strcmp(type, "legacyatm"))
83                return "legacyatm:traces/legacyatm.gz";
84        if (!strcmp(type, "legacypos"))
85                return "legacypos:traces/legacypos.gz";
86        if (!strcmp(type, "legacyeth"))
87                return "legacyeth:traces/legacyeth.gz";
88        if (!strcmp(type, "tsh"))
89                return "tsh:traces/10_packets.tsh.gz";
90        return type;
91}
92
93
94struct TLS {
95        bool seen_start_message;
96        bool seen_stop_message;
97        bool seen_resuming_message;
98        bool seen_pausing_message;
99        int count;
100};
101
102static int totalpkts = 0;
103static void report_result(libtrace_t *trace, libtrace_result_t *result, libtrace_message_t *mesg) {
104        static int totalthreads = 0;
105        if (result) {
106                assert(libtrace_result_get_key(result) == 0);
107                printf("%d,", (int) libtrace_result_get_value(result));
108                totalthreads++;
109                totalpkts += (int) libtrace_result_get_value(result);
110        } else {
111                switch(mesg->code) {
112                        case MESSAGE_STARTING:
113                                // Should have a single thread here
114                                assert(libtrace_get_perpkt_count(trace) == 1);
115                                printf("\tLooks like %d threads are being used!\n\tcounts(", libtrace_get_perpkt_count(trace));
116                                break;
117                        case MESSAGE_STOPPING:
118                                printf(")\n");
119                                assert(totalthreads == libtrace_get_perpkt_count(trace));
120                                break;
121                }
122        }
123}
124
125static int x;
126static void* per_packet(libtrace_t *trace, libtrace_packet_t *pkt, 
127                                                libtrace_message_t *mesg,
128                                                libtrace_thread_t *t) {
129        struct TLS *tls;
130        void* ret;
131        tls = trace_get_tls(t);
132
133        if (pkt) {
134                int a,*b,c=0;
135                assert(tls != NULL);
136                assert(!(tls->seen_stop_message));
137                tls->count++;
138                if (tls->count>100) {
139                        fprintf(stderr, "Too many packets someone should stop me!!\n");
140                        kill(getpid(), SIGTERM);
141                }
142                // Do some work to even out the load on cores
143                b = &c;
144                for (a = 0; a < 10000000; a++) {
145                        c += a**b;
146                }
147                x = c;
148        }
149        else switch (mesg->code) {
150                case MESSAGE_STARTING:
151                        assert(tls == NULL);
152                        tls = calloc(sizeof(struct TLS), 1);
153                        ret = trace_set_tls(t, tls);
154                        assert(ret == NULL);
155                        tls->seen_start_message = true;
156                        break;
157                case MESSAGE_STOPPING:
158                        assert(tls->seen_start_message);
159                        assert(tls != NULL);
160                        tls->seen_stop_message = true;
161                        trace_set_tls(t, NULL);
162
163                        // All threads publish to verify the thread count
164                        trace_publish_result(trace, t, (uint64_t) 0, (void *) tls->count, RESULT_NORMAL);
165                        trace_post_reporter(trace);
166                        free(tls);
167                        break;
168                case MESSAGE_TICK:
169                        assert(tls->seen_start_message );
170                        fprintf(stderr, "Not expecting a tick packet\n");
171                        kill(getpid(), SIGTERM);
172                        break;
173                case MESSAGE_PAUSING:
174                        assert(tls->seen_start_message);
175                        tls->seen_pausing_message = true;
176                        break;
177                case MESSAGE_RESUMING:
178                        assert(tls->seen_pausing_message || tls->seen_start_message);
179                        tls->seen_resuming_message = true;
180                        break;
181        }
182        return pkt;
183}
184
185
186/**
187 * Test that the single threaded fallback works
188 */
189int test_single_threaded(const char *tracename, int expected) {
190        libtrace_t *trace;
191        int error = 0;
192        int i;
193        printf("Testing single threaded\n");
194
195        // Create the trace
196        trace = trace_create(tracename);
197        iferr(trace,tracename);
198
199        // Enable the single threaded fallback codepath
200        i = 1;
201        trace_parallel_config(trace, TRACE_OPTION_SET_PERPKT_THREAD_COUNT, &i);
202
203        // Start it
204        trace_pstart(trace, NULL, per_packet, report_result);
205        iferr(trace,tracename);
206
207        /* Make sure traces survive a pause and restart */
208        trace_ppause(trace);
209        iferr(trace,tracename);
210        trace_pstart(trace, NULL, NULL, NULL);
211        iferr(trace,tracename);
212
213        /* Wait for all threads to stop */
214        trace_join(trace);
215
216        /* Now check we have all received all the packets */
217        if (error == 0) {
218                if (totalpkts == expected) {
219                        printf("success: %d packets read\n",expected);
220                } else {
221                        printf("failure: %d packets expected, %d seen\n",expected,totalpkts);
222                        error = 1;
223                }
224        } else {
225                iferr(trace,tracename);
226        }
227    trace_destroy(trace);
228    return error;
229}
230
231int main(int argc, char *argv[]) {
232        int error = 0;
233        int expected = 100;
234        const char *tracename;
235
236        if (argc<2) {
237                fprintf(stderr,"usage: %s type\n",argv[0]);
238                return 1;
239        }
240
241        tracename = lookup_uri(argv[1]);
242
243        if (strcmp(argv[1],"rtclient")==0) expected=101;
244
245        error = test_single_threaded(tracename, expected);
246    return error;
247}
Note: See TracBrowser for help on using the repository browser.