source: test/test-format-parallel.c @ 2498008

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 2498008 was 2498008, checked in by Richard Sanger <rsangerarj@…>, 6 years ago

Refactor the combining step to allow user defined functions here.

Remove the old trace_get_results, now instead simply provide a reporter function which gets called as soon as results are ready.
The combiner function used determines the order of these results and when they are released etc.
The combiner function can be selected from those built-in or a custom version can be defined results are provided when ready.
Quickly hacked the parallel tests to work with this update, these are still a bit messy.

Also some fixes some compile warnings.

  • Property mode set to 100644
File size: 6.4 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007 The University of Waikato, Hamilton, New Zealand.
5 * Authors: Daniel Lawson
6 *          Perry Lorier
7 *         
8 * All rights reserved.
9 *
10 * This code has been developed by the University of Waikato WAND
11 * research group. For further information please see http://www.wand.net.nz/
12 *
13 * libtrace is free software; you can redistribute it and/or modify
14 * it under the terms of the GNU General Public License as published by
15 * the Free Software Foundation; either version 2 of the License, or
16 * (at your option) any later version.
17 *
18 * libtrace is distributed in the hope that it will be useful,
19 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21 * GNU General Public License for more details.
22 *
23 * You should have received a copy of the GNU General Public License
24 * along with libtrace; if not, write to the Free Software
25 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26 *
27 * $Id: test-rtclient.c,v 1.2 2006/02/27 03:41:12 perry Exp $
28 *
29 */
30#ifndef WIN32
31#  include <sys/time.h>
32#  include <netinet/in.h>
33#  include <netinet/in_systm.h>
34#  include <netinet/tcp.h>
35#  include <netinet/ip.h>
36#  include <netinet/ip_icmp.h>
37#  include <arpa/inet.h>
38#  include <sys/socket.h>
39#endif
40#include <stdio.h>
41#include <stdlib.h>
42#include <assert.h>
43#include <string.h>
44#include <sys/types.h>
45#include <time.h>
46#include <string.h>
47#include <signal.h>
48#include <unistd.h>
49
50#include "dagformat.h"
51#include "libtrace.h"
52#include "data-struct/vector.h"
53
54void iferr(libtrace_t *trace,const char *msg)
55{
56        libtrace_err_t err = trace_get_err(trace);
57        if (err.err_num==0)
58                return;
59        printf("Error: %s: %s\n", msg, err.problem);
60        exit(1);
61}
62
63const char *lookup_uri(const char *type) {
64        if (strchr(type,':'))
65                return type;
66        if (!strcmp(type,"erf"))
67                return "erf:traces/100_packets.erf";
68        if (!strcmp(type,"rawerf"))
69                return "rawerf:traces/100_packets.erf";
70        if (!strcmp(type,"pcap"))
71                return "pcap:traces/100_packets.pcap";
72        if (!strcmp(type,"wtf"))
73                return "wtf:traces/wed.wtf";
74        if (!strcmp(type,"rtclient"))
75                return "rtclient:chasm";
76        if (!strcmp(type,"pcapfile"))
77                return "pcapfile:traces/100_packets.pcap";
78        if (!strcmp(type,"pcapfilens"))
79                return "pcapfile:traces/100_packetsns.pcap";
80        if (!strcmp(type, "duck"))
81                return "duck:traces/100_packets.duck";
82        if (!strcmp(type, "legacyatm"))
83                return "legacyatm:traces/legacyatm.gz";
84        if (!strcmp(type, "legacypos"))
85                return "legacypos:traces/legacypos.gz";
86        if (!strcmp(type, "legacyeth"))
87                return "legacyeth:traces/legacyeth.gz";
88        if (!strcmp(type, "tsh"))
89                return "tsh:traces/10_packets.tsh.gz";
90        return type;
91}
92
93struct TLS {
94        bool seen_start_message;
95        bool seen_stop_message;
96        bool seen_resuming_message;
97        bool seen_pausing_message;
98        int count;
99};
100
101static int totalpkts = 0;
102static void report_result(libtrace_t *trace, libtrace_result_t *result, libtrace_message_t *mesg) {
103        static int totalthreads = 0;
104        if (result) {
105                assert(libtrace_result_get_key(result) == 0);
106                printf("%d,", (int) libtrace_result_get_value(result));
107                totalthreads++;
108                totalpkts += (int) libtrace_result_get_value(result);
109        } else {
110                switch(mesg->code) {
111                        case MESSAGE_STARTING:
112                                printf("\tLooks like %d threads are being used!\n\tcounts(", libtrace_get_perpkt_count(trace));
113                                break;
114                        case MESSAGE_STOPPING:
115                                printf(")\n");
116                                assert(totalthreads == libtrace_get_perpkt_count(trace));
117                                break;
118                }
119        }
120}
121
122int x;
123static void* per_packet(libtrace_t *trace, libtrace_packet_t *pkt,
124                                                libtrace_message_t *mesg,
125                                                libtrace_thread_t *t) {
126        struct TLS *tls;
127        void* ret;
128        // Test internal TLS against __thread
129        static __thread bool seen_start_message = false;
130        static __thread bool seen_stop_message = false;
131        static __thread bool seen_paused_message = false;
132        static __thread bool seen_pausing_message = false;
133        static __thread int count = 0;
134        tls = trace_get_tls(t);
135
136        if (pkt) {
137                int a,*b,c=0;
138                assert(tls != NULL);
139                assert(!seen_stop_message);
140                count++;
141                tls->count++;
142                if (count>100) {
143                        fprintf(stderr, "Too many packets someone should stop me!!\n");
144                        kill(getpid(), SIGTERM);
145                }
146                // Do some work to even out the load on cores
147                b = &c;
148                for (a = 0; a < 10000000; a++) {
149                        c += a**b;
150                }
151                x = c;
152        }
153        else switch (mesg->code) {
154                case MESSAGE_STARTING:
155                        assert(!seen_start_message || seen_paused_message);
156                        assert(tls == NULL);
157                        tls = calloc(sizeof(struct TLS), 1);
158                        ret = trace_set_tls(t, tls);
159                        assert(ret == NULL);
160                        seen_start_message = true;
161                        tls->seen_start_message = true;
162                        break;
163                case MESSAGE_STOPPING:
164                        assert(seen_start_message);
165                        assert(tls != NULL);
166                        assert(tls->seen_start_message);
167                        assert(tls->count == count);
168                        seen_stop_message = true;
169                        tls->seen_stop_message = true;
170                        free(tls);
171                        trace_set_tls(t, NULL);
172
173                        // All threads publish to verify the thread count
174                        trace_publish_result(trace, t, (uint64_t) 0, (void *) count, RESULT_NORMAL);
175                        trace_post_reporter(trace);
176                        break;
177                case MESSAGE_TICK:
178                        assert(seen_start_message);
179                        fprintf(stderr, "Not expecting a tick packet\n");
180                        kill(getpid(), SIGTERM);
181                        break;
182                case MESSAGE_PAUSING:
183                        assert(seen_start_message);
184                        seen_pausing_message = true;
185                        tls->seen_pausing_message = true;
186                        break;
187                case MESSAGE_RESUMING:
188                        assert(tls->seen_pausing_message  || tls->seen_start_message );
189                        seen_paused_message = true;
190                        tls->seen_resuming_message = true;
191                        break;
192        }
193        return pkt;
194}
195
196int main(int argc, char *argv[]) {
197        int error = 0;
198        int expected = 100;
199        const char *tracename;
200        libtrace_t *trace;
201
202        if (argc<2) {
203                fprintf(stderr,"usage: %s type\n",argv[0]);
204                return 1;
205        }
206
207        tracename = lookup_uri(argv[1]);
208
209        trace = trace_create(tracename);
210        iferr(trace,tracename);
211
212        if (strcmp(argv[1],"rtclient")==0) expected=101;
213
214        trace_pstart(trace, NULL, per_packet, report_result);
215        iferr(trace,tracename);
216
217        /* Make sure traces survive a pause */
218        trace_ppause(trace);
219        iferr(trace,tracename);
220        trace_pstart(trace, NULL, NULL, NULL);
221        iferr(trace,tracename);
222
223        /* Wait for all threads to stop */
224        trace_join(trace);
225
226        /* Now check we have all received all the packets */
227        if (error == 0) {
228                if (totalpkts == expected) {
229                        printf("success: %d packets read\n",expected);
230                } else {
231                        printf("failure: %d packets expected, %d seen\n",expected,totalpkts);
232                        error = 1;
233                }
234        } else {
235                iferr(trace,tracename);
236        }
237
238    trace_destroy(trace);
239    return error;
240}
Note: See TracBrowser for help on using the repository browser.