source: examples/parallel/network_capture.c @ 624c2da

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 624c2da was 624c2da, checked in by Shane Alcock <salcock@…>, 5 years ago

Update network_capture example to match new API

  • Property mode set to 100644
File size: 7.4 KB
Line 
1/* Network Capture
2 *
3 * Creates a file per stream and writes the result to disk. These files can
4 * later be merged using tracemerge to create a single ordered trace file.
5 *
6 * An alternative approach if we want a single output file is to use the
7 * ordered combiner and publish the packets through to a reporter thread that
8 * does the writing to disk.
9 *
10 * Defaults to using 4 threads, but this can be changed using the -t option.
11 */
12/* Note we include libtrace_parallel.h rather then libtrace.h */
13#include "libtrace_parallel.h"
14#include <stdio.h>
15#include <assert.h>
16#include <pthread.h>
17#include <inttypes.h>
18#include <signal.h>
19#include <malloc.h>
20#include <getopt.h>
21#include <stdlib.h>
22#include <string.h>
23
24static char *output = NULL;
25
26static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
27static int count = 0;
28static libtrace_t *trace = NULL;
29
30static int compress_level=-1;
31static trace_option_compresstype_t compress_type = TRACE_OPTION_COMPRESSTYPE_NONE;
32
33static void stop(int signal UNUSED)
34{
35        if (trace)
36                trace_pstop(trace);
37}
38
39static void usage(char *argv0)
40{
41        fprintf(stderr,"Usage:\n"
42        "%s [options] inputfile outputfile\n"
43        "-t --threads   The number of threads to use\n"
44        "-S --snaplen   The snap length\n"
45        "-H --libtrace-help     Print libtrace runtime documentation\n"
46        "-z --compress-level    Compress the output trace at the specified level\n"
47        "-Z --compress-type     Compress the output trace using the specified"
48        "                       compression algorithm\n"
49        ,argv0);
50        exit(1);
51}
52
53/* Creates an output trace and configures it according to our preferences */
54static libtrace_out_t *create_output(int my_id) {
55        libtrace_out_t *out = NULL;
56        char name[1024];
57        const char * file_index = NULL;
58        const char * first_extension = NULL;
59
60        file_index = strrchr(output, '/');
61        if (file_index)
62                first_extension = strchr(file_index, '.');
63        else
64                first_extension = strchr(name, '.');
65
66        if (first_extension) {
67                snprintf(name, sizeof(name), "%.*s-%d%s", (int) (first_extension - output), output, my_id, first_extension);
68        } else {
69                snprintf(name, sizeof(name), "%s-%d", output, my_id);
70        }
71
72        out = trace_create_output(name);
73        assert(out);
74
75        if (compress_level >= 0 && trace_config_output(out,
76                        TRACE_OPTION_OUTPUT_COMPRESS, &compress_level) == -1) {
77                trace_perror_output(out, "Configuring compression level");
78                trace_destroy_output(out);
79                exit(-1);
80        }
81
82        if (trace_config_output(out, TRACE_OPTION_OUTPUT_COMPRESSTYPE,
83                                &compress_type) == -1) {
84                trace_perror_output(out, "Configuring compression type");
85                trace_destroy_output(out);
86                exit(-1);
87        }
88
89        if (trace_start_output(out)==-1) {
90                trace_perror_output(out,"trace_start_output");
91                trace_destroy_output(out);
92                exit(-1);
93        }
94        return out;
95}
96
97
98static libtrace_packet_t *per_packet(libtrace_t *trace UNUSED,
99                libtrace_thread_t *t UNUSED,
100                void *global UNUSED, void *tls, libtrace_packet_t *packet) {
101
102        /* Retrieve our output trace from the thread local storage */
103        libtrace_out_t *output = (libtrace_out_t *)tls;
104
105        /* Write the packet to disk */
106        trace_write_packet(output, packet);
107
108        /* Return the packet as we are finished with it */
109        return packet;
110
111}
112
113/* Creates an output file for this thread */
114static void *init_process(libtrace_t *trace UNUSED, libtrace_thread_t *t UNUSED,
115                void *global UNUSED) {
116
117        int my_id = 0;
118        libtrace_out_t *out;
119
120        pthread_mutex_lock(&lock);
121        my_id = ++count;
122        pthread_mutex_unlock(&lock);
123        out = create_output(my_id);
124
125        return out;
126}
127
128/* Closes the output file for this thread */
129static void stop_process(libtrace_t *trace UNUSED, libtrace_thread_t *t UNUSED,
130                void *global UNUSED, void *tls) {
131
132        /* Retrieve our output trace from the thread local storage */
133        libtrace_out_t *output = (libtrace_out_t *)tls;
134
135        trace_destroy_output(output);
136}
137
138int main(int argc, char *argv[])
139{
140        struct sigaction sigact;
141        libtrace_stat_t *stats;
142        int snaplen = -1;
143        int nb_threads = 4;     
144        char *compress_type_str=NULL;
145        libtrace_callback_set_t *processing = NULL;
146
147        sigact.sa_handler = stop;
148        sigemptyset(&sigact.sa_mask);
149        sigact.sa_flags = SA_RESTART;
150
151        sigaction(SIGINT, &sigact, NULL);
152
153
154        if (argc<3) {
155                usage(argv[0]);
156                return 1;
157        }
158
159        /* Parse command line options */
160        while(1) {
161                int option_index;
162                struct option long_options[] = {
163                        { "libtrace-help", 0, 0, 'H' },
164                        { "threads",       1, 0, 't' },
165                        { "snaplen",       1, 0, 'S' },
166                        { "compress-level", 1, 0, 'z' },
167                        { "compress-type", 1, 0, 'Z' },
168                        { NULL,            0, 0, 0   },
169                };
170
171                int c=getopt_long(argc, argv, "t:S:Hz:Z:",
172                                  long_options, &option_index);
173
174                if (c==-1)
175                        break;
176
177                switch (c) {
178                case  't': nb_threads = atoi(optarg);
179                        break;
180                case 'S': snaplen=atoi(optarg);
181                        break;
182                case 'H':
183                        trace_help();
184                        exit(1);
185                        break;
186                case 'z':
187                        compress_level=atoi(optarg);
188                        if (compress_level<0 || compress_level>9) {
189                                usage(argv[0]);
190                                exit(1);
191                        }
192                        break;
193                case 'Z':
194                        compress_type_str=optarg;
195                        break;
196                default:
197                        fprintf(stderr,"Unknown option: %c\n",c);
198                        usage(argv[0]);
199                        return 1;
200                }
201        }
202
203        if (compress_type_str == NULL && compress_level >= 0) {
204                fprintf(stderr, "Compression level set, but no compression type was defined, setting to gzip\n");
205                compress_type = TRACE_OPTION_COMPRESSTYPE_ZLIB;
206        }
207
208        else if (compress_type_str == NULL) {
209                /* If a level or type is not specified, use the "none"
210                 * compression module */
211                compress_type = TRACE_OPTION_COMPRESSTYPE_NONE;
212        }
213
214        /* I decided to be fairly generous in what I accept for the
215         * compression type string */
216        else if (strncmp(compress_type_str, "gz", 2) == 0 ||
217                 strncmp(compress_type_str, "zlib", 4) == 0) {
218                compress_type = TRACE_OPTION_COMPRESSTYPE_ZLIB;
219        } else if (strncmp(compress_type_str, "bz", 2) == 0) {
220                compress_type = TRACE_OPTION_COMPRESSTYPE_BZ2;
221        } else if (strncmp(compress_type_str, "lzo", 3) == 0) {
222                compress_type = TRACE_OPTION_COMPRESSTYPE_LZO;
223        } else if (strncmp(compress_type_str, "no", 2) == 0) {
224                compress_type = TRACE_OPTION_COMPRESSTYPE_NONE;
225        } else {
226                fprintf(stderr, "Unknown compression type: %s\n",
227                        compress_type_str);
228                return 1;
229        }
230
231        if (argc-optind != 2) {
232                usage(argv[0]);
233                return 1;
234        }
235        trace = trace_create(argv[optind]);
236        output = argv[optind+1];
237
238        if (trace_is_err(trace)) {
239                trace_perror(trace,"Opening trace file");
240                return 1;
241        }
242
243        /* Set up our callbacks */
244        processing = trace_create_callback_set();
245        trace_set_starting_cb(processing, init_process);
246        trace_set_stopping_cb(processing, stop_process);
247        trace_set_packet_cb(processing, per_packet);
248
249        if (snaplen != -1)
250                trace_set_snaplen(trace, snaplen);
251        if(nb_threads != -1)
252                trace_set_perpkt_threads(trace, nb_threads);
253
254        /* We use a new version of trace_start(), trace_pstart()
255         * The reporter function argument is optional and can be NULL */
256        if (trace_pstart(trace, NULL, processing, NULL)) {
257                trace_perror(trace,"Starting trace");
258                trace_destroy(trace);
259                trace_destroy_callback_set(processing);
260                return 1;
261        }
262
263        /* Wait for the trace to finish */
264        trace_join(trace);
265
266        if (trace_is_err(trace)) {
267                trace_perror(trace,"Reading packets");
268                trace_destroy(trace);
269                trace_destroy_callback_set(processing);
270                return 1;
271        }
272
273        /* Print stats before we destroy the trace */
274        stats = trace_get_statistics(trace, NULL);
275        fprintf(stderr, "Overall statistics\n");
276        trace_print_statistics(stats, stderr, "\t%s: %"PRIu64"\n");
277
278        trace_destroy_callback_set(processing);
279        trace_destroy(trace);
280        return 0;
281}
Note: See TracBrowser for help on using the repository browser.