source: examples/skeleton/parallel.c @ 0f5d4de

develop
Last change on this file since 0f5d4de was 0f5d4de, checked in by Shane Alcock <salcock@…>, 22 months ago

Fix bad global variable names in examples

The global names were clashing with names used for function
parameters.

  • Property mode set to 100644
File size: 9.7 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007-2015 The University of Waikato, Hamilton, New Zealand.
5 * Authors: Daniel Lawson
6 *          Perry Lorier
7 *          Shane Alcock
8 *          Richard Sanger
9 *
10 * All rights reserved.
11 *
12 * This code has been developed by the University of Waikato WAND
13 * research group. For further information please see http://www.wand.net.nz/
14 *
15 * libtrace is free software; you can redistribute it and/or modify
16 * it under the terms of the GNU General Public License as published by
17 * the Free Software Foundation; either version 2 of the License, or
18 * (at your option) any later version.
19 *
20 * libtrace is distributed in the hope that it will be useful,
21 * but WITHOUT ANY WARRANTY; without even the implied warranty of
22 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23 * GNU General Public License for more details.
24 *
25 * You should have received a copy of the GNU General Public License
26 * along with libtrace; if not, write to the Free Software
27 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
28 *
29 * $Id: test-rtclient.c,v 1.2 2006/02/27 03:41:12 perry Exp $
30 *
31 */
32#ifndef WIN32
33#  include <sys/time.h>
34#  include <netinet/in.h>
35#  include <netinet/in_systm.h>
36#  include <netinet/tcp.h>
37#  include <netinet/ip.h>
38#  include <netinet/ip_icmp.h>
39#  include <arpa/inet.h>
40#  include <sys/socket.h>
41#endif
42#include <stdio.h>
43#include <stdlib.h>
44#include <assert.h>
45#include <string.h>
46#include <sys/types.h>
47#include <time.h>
48#include <string.h>
49#include <signal.h>
50#include <unistd.h>
51
52#include "libtrace_parallel.h"
53
54volatile int done = 0;
55libtrace_t *inptrace = NULL;
56
57static void cleanup_signal(int sig) {
58        (void)sig;      /* avoid warnings about unused parameter */
59        done = 1;
60        if (inptrace)
61                trace_pstop(inptrace);
62}
63
64
65/* Thread local storage for the reporting thread */
66struct rstorage {
67        int replaceme;
68};
69
70/* Thread local storage for each processing thread */
71struct pstorage {
72        uint32_t replaceme;
73};
74
75static void *report_start(libtrace_t *trace,
76                libtrace_thread_t *t,
77                void *global) {
78
79        /* Create any local storage required by the reporter thread and
80         * return it. */
81        struct rstorage *rs = (struct rstorage *)malloc(sizeof(struct rstorage));
82        rs->replaceme = 0;
83
84        assert(trace);
85        assert(t);
86        assert(global);
87
88        return rs;
89}
90
91static void report_cb(libtrace_t *trace,
92                libtrace_thread_t *sender,
93                void *global, void *tls, libtrace_result_t *res) {
94
95        struct rstorage *rs = (struct rstorage *)tls;
96        assert(trace);
97        assert(sender);
98        assert(global);
99        assert(rs);
100        assert(res);
101
102        /* Process the result */
103
104        /* Make sure we free any packets included in the result */
105        if (res->type == RESULT_PACKET)
106                trace_free_packet(trace, res->value.pkt);
107}
108
109static void report_end(libtrace_t *trace, libtrace_thread_t *t,
110                void *global, void *tls) {
111
112        /* Free the local storage and print any final results */
113        struct rstorage *rs = (struct rstorage *)tls;
114        free(rs);
115        assert(trace);
116        assert(t);
117        assert(global);
118}
119
120static libtrace_packet_t *per_packet(libtrace_t *trace,
121                libtrace_thread_t *t,
122                void *global, void *tls, libtrace_packet_t *packet) {
123        struct pstorage *ps = (struct pstorage *)tls;
124        assert(trace);
125        assert(t);
126        assert(global);
127        assert(ps);
128        /* Do something with the packet */
129
130        /* In this example, we are just forwarding the packet to the reporter */
131        trace_publish_result(trace, t, 0, (libtrace_generic_t){.pkt = packet}, RESULT_PACKET);
132        return NULL;
133}
134
135static void *start_processing(libtrace_t *trace, libtrace_thread_t *t UNUSED,
136                void *global) {
137
138        /* Create any local storage required by the reporter thread and
139         * return it. */
140        struct pstorage *ps = (struct pstorage *)malloc(sizeof(struct pstorage));
141        ps->replaceme = 0;
142
143        assert(trace);
144        assert(t);
145        assert(global);
146        return ps;
147}
148
149static void stop_processing(libtrace_t *trace, libtrace_thread_t *t,
150                void *global, void *tls) {
151
152        struct pstorage *ps = (struct pstorage *)tls;
153
154        /* May want to do a final publish here... */
155
156        assert(trace);
157        assert(t);
158        assert(global);
159        free(ps);
160}
161
162static void process_tick(libtrace_t *trace, libtrace_thread_t *t,
163                void *global, void *tls, uint64_t tick) {
164
165        struct pstorage *ps = (struct pstorage *)tls;
166
167        /* Publish or ignore the tick, as appropriate */
168        assert(trace);
169        assert(t);
170        assert(global);
171        assert(ps);
172
173        if (tick) return;
174
175}
176
177static void pause_processing(libtrace_t *trace,
178                libtrace_thread_t *t,
179                void *global, void *tls) {
180
181        struct pstorage *ps = (struct pstorage *)tls;
182        assert(trace);
183        assert(t);
184        assert(global);
185        assert(ps);
186
187}
188
189static void resume_processing(libtrace_t *trace,
190                libtrace_thread_t *t,
191                void *global, void *tls) {
192
193        struct pstorage *ps = (struct pstorage *)tls;
194        assert(trace);
195        assert(t);
196        assert(global);
197        assert(ps);
198
199}
200
201static void custom_msg(libtrace_t *trace, libtrace_thread_t *t, void *global,
202                void *tls, int mesg, libtrace_generic_t data,
203                libtrace_thread_t *sender) {
204
205        struct pstorage *ps = (struct pstorage *)tls;
206        assert(trace);
207        assert(t);
208        assert(global);
209        assert(ps);
210
211        assert(mesg >= MESSAGE_USER);
212        assert(sizeof(data) == 8);
213
214        assert(sender || sender == NULL);
215}
216
217static void usage(char *prog) {
218        fprintf(stderr, "Usage for %s\n\n", prog);
219        fprintf(stderr, "%s [options] inputURI [inputURI ...]\n\n", prog);
220        fprintf(stderr, "Options:\n");
221        fprintf(stderr, "\t-t threads Set the number of processing threads\n");
222        fprintf(stderr, "\t-f expr    Discard all packets that do not match the BPF expression\n");
223
224        exit(0);
225}
226
227int main(int argc, char *argv[]) {
228        libtrace_callback_set_t *processing = NULL;
229        libtrace_callback_set_t *reporter = NULL;
230        libtrace_filter_t *filter = NULL;
231        char *filterstring = NULL;
232        int i, opt, retcode=0;
233        struct sigaction sigact;
234        int threads = 4;
235
236        /* TODO replace this with whatever global data your threads are
237         * likely to need. */
238        uint32_t global = 0;
239
240        if (argc<2) {
241                usage(argv[0]);
242        }
243
244        while ((opt = getopt(argc, argv, "f:t:")) != EOF) {
245                switch (opt) {
246                        case 'f':
247                                filterstring = optarg;
248                                break;
249                        case 't':
250                                threads = atoi(optarg);
251                                break;
252                        default:
253                                usage(argv[0]);
254                }
255        }
256
257        if (optind + 1 > argc) {
258                usage(argv[0]);
259                return 1;
260        }
261
262        if (filterstring) {
263                filter = trace_create_filter(filterstring);
264        }
265
266        sigact.sa_handler = cleanup_signal;
267        sigemptyset(&sigact.sa_mask);
268        sigact.sa_flags = SA_RESTART;
269
270        sigaction(SIGINT, &sigact, NULL);
271        sigaction(SIGTERM, &sigact, NULL);
272
273        processing = trace_create_callback_set();
274        trace_set_starting_cb(processing, start_processing);
275        trace_set_stopping_cb(processing, stop_processing);
276        trace_set_packet_cb(processing, per_packet);
277        trace_set_pausing_cb(processing, pause_processing);
278        trace_set_resuming_cb(processing, resume_processing);
279        trace_set_tick_count_cb(processing, process_tick);
280        trace_set_tick_interval_cb(processing, process_tick);
281        trace_set_user_message_cb(processing, custom_msg);
282
283        reporter = trace_create_callback_set();
284        trace_set_starting_cb(reporter, report_start);
285        trace_set_stopping_cb(reporter, report_end);
286        trace_set_result_cb(reporter, report_cb);
287
288        for (i = optind; i < argc; i++) {
289
290                inptrace = trace_create(argv[i]);
291
292                if (trace_is_err(inptrace)) {
293                        trace_perror(inptrace, "Opening trace file");
294                        retcode = -1;
295                        break;
296                }
297
298                if (filter && trace_config(inptrace, TRACE_OPTION_FILTER, filter) == -1) {
299                        trace_perror(inptrace, "trace_config(filter)");
300                        retcode = -1;
301                        break;
302                }
303
304                trace_set_perpkt_threads(inptrace, threads);
305                trace_set_combiner(inptrace, &combiner_ordered,
306                                (libtrace_generic_t) {0});
307                trace_set_hasher(inptrace, HASHER_BIDIRECTIONAL, NULL, NULL);
308
309                if (trace_pstart(inptrace, &global, processing, reporter)) {
310                        trace_perror(inptrace, "Starting trace");
311                        break;
312                }
313
314                /* Wait for all threads to stop */
315                trace_join(inptrace);
316
317                if (trace_is_err(inptrace)) {
318                        trace_perror(inptrace, "Processing packets");
319                        retcode = -1;
320                        break;
321                }
322
323                if (done)
324                        break;
325        }
326
327        if (filter)
328                trace_destroy_filter(filter);
329        trace_destroy(inptrace);
330        trace_destroy_callback_set(processing);
331        trace_destroy_callback_set(reporter);
332        return retcode;
333}
Note: See TracBrowser for help on using the repository browser.