source: test/test-format-parallel-hasher.c @ 2498008

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 2498008 was 2498008, checked in by Richard Sanger <rsangerarj@…>, 6 years ago

Refactor the combining step to allow user defined functions here.

Remove the old trace_get_results, now instead simply provide a reporter function which gets called as soon as results are ready.
The combiner function used determines the order of these results and when they are released etc.
The combiner function can be selected from those built-in or a custom version can be defined results are provided when ready.
Quickly hacked the parallel tests to work with this update, these are still a bit messy.

Also some fixes some compile warnings.

  • Property mode set to 100644
File size: 6.9 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007 The University of Waikato, Hamilton, New Zealand.
5 * Authors: Daniel Lawson
6 *          Perry Lorier
7 *         
8 * All rights reserved.
9 *
10 * This code has been developed by the University of Waikato WAND
11 * research group. For further information please see http://www.wand.net.nz/
12 *
13 * libtrace is free software; you can redistribute it and/or modify
14 * it under the terms of the GNU General Public License as published by
15 * the Free Software Foundation; either version 2 of the License, or
16 * (at your option) any later version.
17 *
18 * libtrace is distributed in the hope that it will be useful,
19 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21 * GNU General Public License for more details.
22 *
23 * You should have received a copy of the GNU General Public License
24 * along with libtrace; if not, write to the Free Software
25 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26 *
27 * $Id: test-rtclient.c,v 1.2 2006/02/27 03:41:12 perry Exp $
28 *
29 */
30#ifndef WIN32
31#  include <sys/time.h>
32#  include <netinet/in.h>
33#  include <netinet/in_systm.h>
34#  include <netinet/tcp.h>
35#  include <netinet/ip.h>
36#  include <netinet/ip_icmp.h>
37#  include <arpa/inet.h>
38#  include <sys/socket.h>
39#endif
40#include <stdio.h>
41#include <stdlib.h>
42#include <assert.h>
43#include <string.h>
44#include <sys/types.h>
45#include <time.h>
46#include <string.h>
47#include <signal.h>
48#include <unistd.h>
49
50#include "dagformat.h"
51#include "libtrace.h"
52#include "data-struct/vector.h"
53
54void iferr(libtrace_t *trace,const char *msg)
55{
56        libtrace_err_t err = trace_get_err(trace);
57        if (err.err_num==0)
58                return;
59        printf("Error: %s: %s\n", msg, err.problem);
60        exit(1);
61}
62
63const char *lookup_uri(const char *type) {
64        if (strchr(type,':'))
65                return type;
66        if (!strcmp(type,"erf"))
67                return "erf:traces/100_packets.erf";
68        if (!strcmp(type,"rawerf"))
69                return "rawerf:traces/100_packets.erf";
70        if (!strcmp(type,"pcap"))
71                return "pcap:traces/100_packets.pcap";
72        if (!strcmp(type,"wtf"))
73                return "wtf:traces/wed.wtf";
74        if (!strcmp(type,"rtclient"))
75                return "rtclient:chasm";
76        if (!strcmp(type,"pcapfile"))
77                return "pcapfile:traces/100_packets.pcap";
78        if (!strcmp(type,"pcapfilens"))
79                return "pcapfile:traces/100_packetsns.pcap";
80        if (!strcmp(type, "duck"))
81                return "duck:traces/100_packets.duck";
82        if (!strcmp(type, "legacyatm"))
83                return "legacyatm:traces/legacyatm.gz";
84        if (!strcmp(type, "legacypos"))
85                return "legacypos:traces/legacypos.gz";
86        if (!strcmp(type, "legacyeth"))
87                return "legacyeth:traces/legacyeth.gz";
88        if (!strcmp(type, "tsh"))
89                return "tsh:traces/10_packets.tsh.gz";
90        return type;
91}
92
93
94struct TLS {
95        bool seen_start_message;
96        bool seen_stop_message;
97        bool seen_resumed_message;
98        bool seen_pausing_message;
99        int count;
100};
101
102static int totalpkts = 0;
103static int expected;
104static void report_result(libtrace_t *trace, libtrace_result_t *result, libtrace_message_t *mesg) {
105        static int totalthreads = 0;
106        if (result) {
107                assert(libtrace_result_get_key(result) == 0);
108                printf("%d,", (int) libtrace_result_get_value(result));
109                totalthreads++;
110                totalpkts += (int) libtrace_result_get_value(result);
111                assert(libtrace_result_get_value(result) == 25 ||
112                        libtrace_result_get_value(result) == expected - 25);
113        } else {
114                switch(mesg->code) {
115                        case MESSAGE_STARTING:
116                                // Should have two threads here
117                                assert(libtrace_get_perpkt_count(trace) == 2);
118                                printf("\tLooks like %d threads are being used!\n\tcounts(", libtrace_get_perpkt_count(trace));
119                                break;
120                        case MESSAGE_STOPPING:
121                                printf(")\n");
122                                assert(totalthreads == libtrace_get_perpkt_count(trace));
123                                break;
124                }
125        }
126}
127
128static int x;
129static void* per_packet(libtrace_t *trace, libtrace_packet_t *pkt, 
130                                                libtrace_message_t *mesg,
131                                                libtrace_thread_t *t) {
132        struct TLS *tls;
133        void* ret;
134        tls = trace_get_tls(t);
135
136        if (pkt) {
137                int a,*b,c=0;
138                assert(tls != NULL);
139                assert(!(tls->seen_stop_message));
140                tls->count++;
141                if (tls->count>100) {
142                        fprintf(stderr, "Too many packets someone should stop me!!\n");
143                        kill(getpid(), SIGTERM);
144                }
145                // Do some work to even out the load on cores
146                b = &c;
147                for (a = 0; a < 10000000; a++) {
148                        c += a**b;
149                }
150                x = c;
151        }
152        else switch (mesg->code) {
153                case MESSAGE_STARTING:
154                        assert(tls == NULL);
155                        tls = calloc(sizeof(struct TLS), 1);
156                        ret = trace_set_tls(t, tls);
157                        assert(ret == NULL);
158                        tls->seen_start_message = true;
159                        break;
160                case MESSAGE_STOPPING:
161                        assert(tls->seen_start_message);
162                        assert(tls != NULL);
163                        tls->seen_stop_message = true;
164                        trace_set_tls(t, NULL);
165
166                        // All threads publish to verify the thread count
167                        trace_publish_result(trace, t, (uint64_t) 0, (void *) tls->count, RESULT_NORMAL);
168                        trace_post_reporter(trace);
169                        free(tls);
170                        break;
171                case MESSAGE_TICK:
172                        assert(tls->seen_start_message );
173                        fprintf(stderr, "Not expecting a tick packet\n");
174                        kill(getpid(), SIGTERM);
175                        break;
176                case MESSAGE_PAUSING:
177                        assert(tls->seen_start_message);
178                        tls->seen_pausing_message = true;
179                        break;
180                case MESSAGE_RESUMING:
181                        assert(tls->seen_pausing_message  || tls->seen_start_message);
182                        tls->seen_resumed_message = true;
183                        break;
184        }
185        return pkt;
186}
187
188
189/**
190 * Sends the first 25 packets to thread 0, the next 75 to thread 1
191 * This is based on a few internal workings assumptions, which
192 * might change and still be valid even if this test fails!!.
193 */
194uint64_t hash25_75(const libtrace_packet_t* packet, void *data) {
195        int *count = (int *) data;
196        *count += 1;
197        if (*count <= 25)
198                return 0;
199        return 1;
200}
201
202/**
203 * Test that the hasher function works
204 */
205int test_hasher(const char *tracename) {
206        libtrace_t *trace;
207        int error = 0;
208        int i;
209        int hashercount = 0;
210        printf("Testing hasher function\n");
211
212        // Create the trace
213        trace = trace_create(tracename);
214        iferr(trace,tracename);
215
216        // Always use 2 threads for simplicity
217        i = 2;
218        trace_parallel_config(trace, TRACE_OPTION_SET_PERPKT_THREAD_COUNT, &i);
219        trace_set_hasher(trace, HASHER_CUSTOM, &hash25_75, &hashercount);
220
221        // Start it
222        trace_pstart(trace, NULL, per_packet, report_result);
223        iferr(trace,tracename);
224        /* Make sure traces survive a pause and restart */
225        trace_ppause(trace);
226        iferr(trace,tracename);
227        trace_pstart(trace, NULL, NULL, NULL);
228        iferr(trace,tracename);
229
230        /* Wait for all threads to stop */
231        trace_join(trace);
232
233        /* Now check we have all received all the packets */
234        if (error == 0) {
235                if (totalpkts == expected) {
236                        printf("success: %d packets read\n",expected);
237                } else {
238                        printf("failure: %d packets expected, %d seen\n",expected,totalpkts);
239                        error = 1;
240                }
241        } else {
242                iferr(trace,tracename);
243        }
244    trace_destroy(trace);
245    return error;
246}
247
248
249
250int main(int argc, char *argv[]) {
251        int error = 0;
252        const char *tracename;
253        expected = 100;
254
255        if (argc<2) {
256                fprintf(stderr,"usage: %s type\n",argv[0]);
257                return 1;
258        }
259
260        tracename = lookup_uri(argv[1]);
261
262        if (strcmp(argv[1],"rtclient")==0) expected=101;
263
264        error = test_hasher(tracename);
265
266    return error;
267}
Note: See TracBrowser for help on using the repository browser.