source: test/test-format-parallel-singlethreaded-hasher.c @ 2498008

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 2498008 was 2498008, checked in by Richard Sanger <rsangerarj@…>, 6 years ago

Refactor the combining step to allow user defined functions here.

Remove the old trace_get_results, now instead simply provide a reporter function which gets called as soon as results are ready.
The combiner function used determines the order of these results and when they are released etc.
The combiner function can be selected from those built-in or a custom version can be defined results are provided when ready.
Quickly hacked the parallel tests to work with this update, these are still a bit messy.

Also some fixes some compile warnings.

  • Property mode set to 100644
File size: 6.9 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007 The University of Waikato, Hamilton, New Zealand.
5 * Authors: Daniel Lawson
6 *          Perry Lorier
7 *         
8 * All rights reserved.
9 *
10 * This code has been developed by the University of Waikato WAND
11 * research group. For further information please see http://www.wand.net.nz/
12 *
13 * libtrace is free software; you can redistribute it and/or modify
14 * it under the terms of the GNU General Public License as published by
15 * the Free Software Foundation; either version 2 of the License, or
16 * (at your option) any later version.
17 *
18 * libtrace is distributed in the hope that it will be useful,
19 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21 * GNU General Public License for more details.
22 *
23 * You should have received a copy of the GNU General Public License
24 * along with libtrace; if not, write to the Free Software
25 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26 *
27 * $Id: test-rtclient.c,v 1.2 2006/02/27 03:41:12 perry Exp $
28 *
29 */
30#ifndef WIN32
31#  include <sys/time.h>
32#  include <netinet/in.h>
33#  include <netinet/in_systm.h>
34#  include <netinet/tcp.h>
35#  include <netinet/ip.h>
36#  include <netinet/ip_icmp.h>
37#  include <arpa/inet.h>
38#  include <sys/socket.h>
39#endif
40#include <stdio.h>
41#include <stdlib.h>
42#include <assert.h>
43#include <string.h>
44#include <sys/types.h>
45#include <time.h>
46#include <string.h>
47#include <signal.h>
48#include <unistd.h>
49
50#include "dagformat.h"
51#include "libtrace.h"
52#include "data-struct/vector.h"
53
54void iferr(libtrace_t *trace,const char *msg)
55{
56        libtrace_err_t err = trace_get_err(trace);
57        if (err.err_num==0)
58                return;
59        printf("Error: %s: %s\n", msg, err.problem);
60        exit(1);
61}
62
63const char *lookup_uri(const char *type) {
64        if (strchr(type,':'))
65                return type;
66        if (!strcmp(type,"erf"))
67                return "erf:traces/100_packets.erf";
68        if (!strcmp(type,"rawerf"))
69                return "rawerf:traces/100_packets.erf";
70        if (!strcmp(type,"pcap"))
71                return "pcap:traces/100_packets.pcap";
72        if (!strcmp(type,"wtf"))
73                return "wtf:traces/wed.wtf";
74        if (!strcmp(type,"rtclient"))
75                return "rtclient:chasm";
76        if (!strcmp(type,"pcapfile"))
77                return "pcapfile:traces/100_packets.pcap";
78        if (!strcmp(type,"pcapfilens"))
79                return "pcapfile:traces/100_packetsns.pcap";
80        if (!strcmp(type, "duck"))
81                return "duck:traces/100_packets.duck";
82        if (!strcmp(type, "legacyatm"))
83                return "legacyatm:traces/legacyatm.gz";
84        if (!strcmp(type, "legacypos"))
85                return "legacypos:traces/legacypos.gz";
86        if (!strcmp(type, "legacyeth"))
87                return "legacyeth:traces/legacyeth.gz";
88        if (!strcmp(type, "tsh"))
89                return "tsh:traces/10_packets.tsh.gz";
90        return type;
91}
92
93
94struct TLS {
95        bool seen_start_message;
96        bool seen_stop_message;
97        bool seen_resuming_message;
98        bool seen_pausing_message;
99        int count;
100};
101
102static int totalpkts = 0;
103static int expected;
104static void report_result(libtrace_t *trace, libtrace_result_t *result, libtrace_message_t *mesg) {
105        static int totalthreads = 0;
106        if (result) {
107                assert(libtrace_result_get_key(result) == 0);
108                printf("%d,", (int) libtrace_result_get_value(result));
109                totalthreads++;
110                totalpkts += (int) libtrace_result_get_value(result);
111        } else {
112                switch(mesg->code) {
113                        case MESSAGE_STARTING:
114                                // Should have a single thread here
115                                assert(libtrace_get_perpkt_count(trace) == 1);
116                                printf("\tLooks like %d threads are being used!\n\tcounts(", libtrace_get_perpkt_count(trace));
117                                break;
118                        case MESSAGE_STOPPING:
119                                printf(")\n");
120                                assert(totalthreads == libtrace_get_perpkt_count(trace));
121                                break;
122                }
123        }
124}
125
126static int x;
127static void* per_packet(libtrace_t *trace, libtrace_packet_t *pkt, 
128                                                libtrace_message_t *mesg,
129                                                libtrace_thread_t *t) {
130        struct TLS *tls;
131        void* ret;
132        tls = trace_get_tls(t);
133
134        if (pkt) {
135                int a,*b,c=0;
136                assert(tls != NULL);
137                assert(!(tls->seen_stop_message));
138                tls->count++;
139                if (tls->count>100) {
140                        fprintf(stderr, "Too many packets someone should stop me!!\n");
141                        kill(getpid(), SIGTERM);
142                }
143                // Do some work to even out the load on cores
144                b = &c;
145                for (a = 0; a < 10000000; a++) {
146                        c += a**b;
147                }
148                x = c;
149        }
150        else switch (mesg->code) {
151                case MESSAGE_STARTING:
152                        assert(tls == NULL);
153                        tls = calloc(sizeof(struct TLS), 1);
154                        ret = trace_set_tls(t, tls);
155                        assert(ret == NULL);
156                        tls->seen_start_message = true;
157                        break;
158                case MESSAGE_STOPPING:
159                        assert(tls->seen_start_message);
160                        assert(tls != NULL);
161                        tls->seen_stop_message = true;
162                        trace_set_tls(t, NULL);
163
164                        // All threads publish to verify the thread count
165                        trace_publish_result(trace, t, (uint64_t) 0, (void *) tls->count, RESULT_NORMAL);
166                        trace_post_reporter(trace);
167                        free(tls);
168                        break;
169                case MESSAGE_TICK:
170                        assert(tls->seen_start_message );
171                        fprintf(stderr, "Not expecting a tick packet\n");
172                        kill(getpid(), SIGTERM);
173                        break;
174                case MESSAGE_PAUSING:
175                        assert(tls->seen_start_message);
176                        tls->seen_pausing_message = true;
177                        break;
178                case MESSAGE_RESUMING:
179                        assert(tls->seen_pausing_message || tls->seen_start_message);
180                        tls->seen_resuming_message = true;
181                        break;
182        }
183        return pkt;
184}
185
186/**
187 * Sends the first 25 packets to thread 0, the next 75 to thread 1
188 * This is based on a few internal workings assumptions, which
189 * might change and still be valid even if this test fails!!.
190 */
191uint64_t hash25_75(const libtrace_packet_t* packet, void *data) {
192        int *count = (int *) data;
193        *count += 1;
194        if (*count <= 25)
195                return 0;
196        return 1;
197}
198
199/**
200 * Test that the hasher function works in single threaded mode
201 * It might not be called but this ensures consistency
202 */
203int test_hasher_singlethreaded(const char *tracename) {
204        libtrace_t *trace;
205        int error = 0;
206        int i;
207        int hashercount = 0;
208        printf("Testing hasher singlethreaded function\n");
209
210        // Create the trace
211        trace = trace_create(tracename);
212        iferr(trace,tracename);
213
214        // Use 1 thread with a hasher
215        i = 1;
216        trace_parallel_config(trace, TRACE_OPTION_SET_PERPKT_THREAD_COUNT, &i);
217        trace_set_hasher(trace, HASHER_CUSTOM, &hash25_75, &hashercount);
218
219        // Start it
220        trace_pstart(trace, NULL, per_packet, report_result);
221        iferr(trace,tracename);
222
223        /* Make sure traces survive a pause and restart */
224        trace_ppause(trace);
225        iferr(trace,tracename);
226        trace_pstart(trace, NULL, NULL, NULL);
227        iferr(trace,tracename);
228
229        /* Wait for all threads to stop */
230        trace_join(trace);
231
232        /* Now check we have all received all the packets */
233        if (error == 0) {
234                if (totalpkts == expected) {
235                        printf("success: %d packets read\n",expected);
236                } else {
237                        printf("failure: %d packets expected, %d seen\n",expected,totalpkts);
238                        error = 1;
239                }
240        } else {
241                iferr(trace,tracename);
242        }
243    trace_destroy(trace);
244    return error;
245}
246
247
248int main(int argc, char *argv[]) {
249        int error = 0;
250        const char *tracename;
251        expected = 100;
252
253        if (argc<2) {
254                fprintf(stderr,"usage: %s type\n",argv[0]);
255                return 1;
256        }
257
258        tracename = lookup_uri(argv[1]);
259
260        if (strcmp(argv[1],"rtclient")==0) expected=101;
261
262        error = test_hasher_singlethreaded(tracename);
263    return error;
264}
Note: See TracBrowser for help on using the repository browser.