source: examples/parallel/network_capture.c @ bf3c54e

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since bf3c54e was bf3c54e, checked in by Richard Sanger <rsangerarj@…>, 6 years ago

Add example capturing multiple streams of packets to disk

  • Property mode set to 100644
File size: 6.5 KB
Line 
1/* Network Capture
2 *
3 * Creates a file per stream and writes the result to disk
4 */
5/* Note we include libtrace_parallel.h rather then libtrace.h */
6#include "libtrace_parallel.h"
7#include <stdio.h>
8#include <assert.h>
9#include <pthread.h>
10#include <inttypes.h>
11#include <signal.h>
12#include <malloc.h>
13#include <getopt.h>
14#include <stdlib.h>
15#include <string.h>
16
17static char *output = NULL;
18
19static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
20static int count = 0;
21static libtrace_t *trace = NULL;
22
23static int compress_level=-1;
24static trace_option_compresstype_t compress_type = TRACE_OPTION_COMPRESSTYPE_NONE;
25
26static void stop(int signal UNUSED)
27{
28        if (trace)
29                trace_pstop(trace);
30}
31
32static void usage(char *argv0)
33{
34        fprintf(stderr,"Usage:\n"
35        "%s [options] inputfile outputfile\n"
36        "-t --threads   The number of threads to use\n"
37        "-S --snaplen   The snap length\n"
38        "-H --libtrace-help     Print libtrace runtime documentation\n"
39        "-z --compress-level    Compress the output trace at the specified level\n"
40        "-Z --compress-type     Compress the output trace using the specified"
41        "                       compression algorithm\n"
42        ,argv0);
43        exit(1);
44}
45
46
47static libtrace_out_t *create_output(int my_id) {
48        libtrace_out_t *out = NULL;
49        char name[1024];
50        const char * file_index = NULL;
51        const char * first_extension = NULL;
52
53        file_index = strrchr(output, '/');
54        if (file_index)
55                first_extension = strchr(file_index, '.');
56        else
57                first_extension = strchr(name, '.');
58
59        if (first_extension) {
60                snprintf(name, sizeof(name), "%.*s-%d%s", (int) (first_extension - output), output, my_id, first_extension);
61        } else {
62                snprintf(name, sizeof(name), "%s-%d", output, my_id);
63        }
64
65        out = trace_create_output(name);
66        assert(out);
67
68        if (compress_level >= 0 && trace_config_output(out,
69                        TRACE_OPTION_OUTPUT_COMPRESS, &compress_level) == -1) {
70                trace_perror_output(out, "Configuring compression level");
71                trace_destroy_output(out);
72                exit(-1);
73        }
74
75        if (trace_config_output(out, TRACE_OPTION_OUTPUT_COMPRESSTYPE,
76                                &compress_type) == -1) {
77                trace_perror_output(out, "Configuring compression type");
78                trace_destroy_output(out);
79                exit(-1);
80        }
81
82        if (trace_start_output(out)==-1) {
83                trace_perror_output(out,"trace_start_output");
84                trace_destroy_output(out);
85                exit(-1);
86        }
87        return out;
88}
89
90/* Every time a packet becomes ready this function will be called. It will also
91 * be called when messages from the library are received. This function
92 * is run in parallel.
93 */
94static void* per_packet(libtrace_t *trace, libtrace_thread_t *t,
95                        int mesg, libtrace_generic_t data,
96                        libtrace_thread_t *sender UNUSED)
97{
98        static __thread libtrace_out_t * out;
99        static __thread int my_id;
100        libtrace_stat_t *stats;
101
102        switch (mesg) {
103        case MESSAGE_PACKET:
104                trace_write_packet(out, data.pkt);
105                /* If we have finished processing this packet return it */
106                return data.pkt;
107        case MESSAGE_STARTING:
108                pthread_mutex_lock(&lock);
109                my_id = ++count;
110                pthread_mutex_unlock(&lock);
111                out = create_output(my_id);
112                break;
113        case MESSAGE_STOPPING:
114                stats = trace_create_statistics();
115                trace_get_thread_statistics(trace, t, stats);
116
117                pthread_mutex_lock(&lock);
118                fprintf(stderr, "Thread #%d statistics\n", my_id);
119                trace_print_statistics(stats, stderr, "\t%s: %"PRIu64"\n");
120                pthread_mutex_unlock(&lock);
121
122                free(stats);
123                trace_destroy_output(out);
124                break;
125        default:
126                return NULL;
127        }
128        return NULL;
129}
130
131int main(int argc, char *argv[])
132{
133        struct sigaction sigact;
134        libtrace_stat_t *stats;
135        int snaplen = -1;
136        int nb_threads = -1;
137        char *compress_type_str=NULL;
138
139        sigact.sa_handler = stop;
140        sigemptyset(&sigact.sa_mask);
141        sigact.sa_flags = SA_RESTART;
142
143        sigaction(SIGINT, &sigact, NULL);
144
145
146        if (argc<3) {
147                usage(argv[0]);
148                return 1;
149        }
150
151        /* Parse command line options */
152        while(1) {
153                int option_index;
154                struct option long_options[] = {
155                        { "libtrace-help", 0, 0, 'H' },
156                        { "threads",       1, 0, 't' },
157                        { "snaplen",       1, 0, 'S' },
158                        { "compress-level", 1, 0, 'z' },
159                        { "compress-type", 1, 0, 'Z' },
160                        { NULL,            0, 0, 0   },
161                };
162
163                int c=getopt_long(argc, argv, "t:S:Hz:Z:",
164                                  long_options, &option_index);
165
166                if (c==-1)
167                        break;
168
169                switch (c) {
170                case  't': nb_threads = atoi(optarg);
171                        break;
172                case 'S': snaplen=atoi(optarg);
173                        break;
174                case 'H':
175                        trace_help();
176                        exit(1);
177                        break;
178                case 'z':
179                        compress_level=atoi(optarg);
180                        if (compress_level<0 || compress_level>9) {
181                                usage(argv[0]);
182                                exit(1);
183                        }
184                        break;
185                case 'Z':
186                        compress_type_str=optarg;
187                        break;
188                default:
189                        fprintf(stderr,"Unknown option: %c\n",c);
190                        usage(argv[0]);
191                        return 1;
192                }
193        }
194
195        if (compress_type_str == NULL && compress_level >= 0) {
196                fprintf(stderr, "Compression level set, but no compression type was defined, setting to gzip\n");
197                compress_type = TRACE_OPTION_COMPRESSTYPE_ZLIB;
198        }
199
200        else if (compress_type_str == NULL) {
201                /* If a level or type is not specified, use the "none"
202                 * compression module */
203                compress_type = TRACE_OPTION_COMPRESSTYPE_NONE;
204        }
205
206        /* I decided to be fairly generous in what I accept for the
207         * compression type string */
208        else if (strncmp(compress_type_str, "gz", 2) == 0 ||
209                 strncmp(compress_type_str, "zlib", 4) == 0) {
210                compress_type = TRACE_OPTION_COMPRESSTYPE_ZLIB;
211        } else if (strncmp(compress_type_str, "bz", 2) == 0) {
212                compress_type = TRACE_OPTION_COMPRESSTYPE_BZ2;
213        } else if (strncmp(compress_type_str, "lzo", 3) == 0) {
214                compress_type = TRACE_OPTION_COMPRESSTYPE_LZO;
215        } else if (strncmp(compress_type_str, "no", 2) == 0) {
216                compress_type = TRACE_OPTION_COMPRESSTYPE_NONE;
217        } else {
218                fprintf(stderr, "Unknown compression type: %s\n",
219                        compress_type_str);
220                return 1;
221        }
222
223        if (argc-optind != 2) {
224                usage(argv[0]);
225                return 1;
226        }
227        trace = trace_create(argv[optind]);
228        output = argv[optind+1];
229
230        if (trace_is_err(trace)) {
231                trace_perror(trace,"Opening trace file");
232                return 1;
233        }
234
235        if (snaplen != -1)
236                trace_set_snaplen(trace, snaplen);
237        if(nb_threads != -1)
238                trace_set_tick_count(trace, (size_t) nb_threads);
239
240        /* We use a new version of trace_start(), trace_pstart()
241         * The reporter function argument is optional and can be NULL */
242        if (trace_pstart(trace, NULL, per_packet, NULL)) {
243                trace_perror(trace,"Starting trace");
244                trace_destroy(trace);
245                return 1;
246        }
247
248        /* Wait for the trace to finish */
249        trace_join(trace);
250
251        if (trace_is_err(trace)) {
252                trace_perror(trace,"Reading packets");
253                trace_destroy(trace);
254                return 1;
255        }
256
257        /* Print stats before we destroy the trace */
258        stats = trace_get_statistics(trace, NULL);
259        fprintf(stderr, "Overall statistics\n");
260        trace_print_statistics(stats, stderr, "\t%s: %"PRIu64"\n");
261
262        trace_destroy(trace);
263        return 0;
264}
Note: See TracBrowser for help on using the repository browser.