source: tools/traceanon/traceanon_parallel.c @ fac8c46

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since fac8c46 was fac8c46, checked in by Richard Sanger <rsangerarj@…>, 7 years ago

Tidies up the pausing so that it now works as expected and a trace can easily be paused and restarted.
Ensures that packets will not be lost if pause is called on a file, any queued packets will be read (a message is sent allowing the user to drop these packets if they are unwanted).
Differentiates packets from other results in the queues to the reducer/reporter and makes a copy of the packets in result queues when pausing

  • this is needed to ensure that bad memory isn't referenced if a zero-copy trace is paused by closing sockets/associated data like in the case of ring:.

Fixed up the re-starting of traces which hadn't been finished to account for different configurations.
Adds a 'state' to libtrace to handle the state of parallel traces, rather than hacking around the existing 'started' boolean. Also provides two levels of checks for consistency if the trace is using existing that are checking started.

Various other bug fixes and tidy ups.

  • Property mode set to 100644
File size: 12.8 KB
Line 
1#define _GNU_SOURCE
2#include "libtrace.h"
3#include <stdio.h>
4#include <unistd.h>
5#include <stdlib.h>
6#include <getopt.h>
7#include <stdbool.h>
8#include <stddef.h>
9#include <string.h>
10#include <time.h>
11#include <assert.h>
12#include "ipenc.h"
13#include <data-struct/vector.h>
14#include <data-struct/message_queue.h>
15#include <signal.h>
16
17bool enc_source = false;
18bool enc_dest   = false;
19enum enc_type_t enc_type = ENC_NONE;
20char *key = NULL;
21
22
23struct libtrace_t *trace = NULL;
24
25static void cleanup_signal(int signal)
26{
27        static int s = 0;
28        (void)signal;
29        // trace_interrupt();
30        // trace_pstop isn't really signal safe because its got lots of locks in it
31        // trace_pstop(trace);
32        if (s == 0) {
33                if (trace_ppause(trace) == -1)
34                        trace_perror(trace, "Pause failed");
35        }
36        else {
37                if (trace_pstart(trace, NULL, NULL, NULL) == -1)
38                        trace_perror(trace, "Start failed");
39        }
40        s = !s;
41}
42
43
44
45static void usage(char *argv0)
46{
47        fprintf(stderr,"Usage:\n"
48        "%s flags inputfile outputfile\n"
49        "-s --encrypt-source    Encrypt the source addresses\n"
50        "-d --encrypt-dest      Encrypt the destination addresses\n"
51        "-c --cryptopan=key     Encrypt the addresses with the cryptopan\n"
52        "                       prefix preserving\n"
53        "-f --keyfile=file      A file containing the cryptopan key\n"
54        "-p --prefix=C.I.D.R/bits Substitute the prefix of the address\n"
55        "-H --libtrace-help     Print libtrace runtime documentation\n"
56        "-z --compress-level    Compress the output trace at the specified level\n"
57        "-Z --compress-type     Compress the output trace using the specified"
58        "                       compression algorithm\n"
59        ,argv0);
60        exit(1);
61}
62
63/* Incrementally update a checksum */
64static void update_in_cksum(uint16_t *csum, uint16_t old, uint16_t new)
65{
66        uint32_t sum = (~htons(*csum) & 0xFFFF) 
67                     + (~htons(old) & 0xFFFF) 
68                     + htons(new);
69        sum = (sum & 0xFFFF) + (sum >> 16);
70        *csum = htons(~(sum + (sum >> 16)));
71}
72
73static void update_in_cksum32(uint16_t *csum, uint32_t old, uint32_t new)
74{
75        update_in_cksum(csum,(uint16_t)(old>>16),(uint16_t)(new>>16));
76        update_in_cksum(csum,(uint16_t)(old&0xFFFF),(uint16_t)(new&0xFFFF));
77}
78
79/* Ok this is remarkably complicated
80 *
81 * We want to change one, or the other IP address, while preserving
82 * the checksum.  TCP and UDP both include the faux header in their
83 * checksum calculations, so you have to update them too.  ICMP is
84 * even worse -- it can include the original IP packet that caused the
85 * error!  So anonymise that too, but remember that it's travelling in
86 * the opposite direction so we need to encrypt the destination and
87 * source instead of the source and destination!
88 */
89static void encrypt_ips(struct libtrace_ip *ip,bool enc_source,bool enc_dest)
90{
91        struct libtrace_tcp *tcp;
92        struct libtrace_udp *udp;
93        struct libtrace_icmp *icmp;
94
95        tcp=trace_get_tcp_from_ip(ip,NULL);
96        udp=trace_get_udp_from_ip(ip,NULL);
97        icmp=trace_get_icmp_from_ip(ip,NULL);
98
99        if (enc_source) {
100                uint32_t old_ip=ip->ip_src.s_addr;
101                uint32_t new_ip=htonl(enc_ip(
102                                        htonl(ip->ip_src.s_addr)
103                                        ));
104                update_in_cksum32(&ip->ip_sum,old_ip,new_ip);
105                if (tcp) update_in_cksum32(&tcp->check,old_ip,new_ip);
106                if (udp) update_in_cksum32(&udp->check,old_ip,new_ip);
107                ip->ip_src.s_addr = new_ip;
108        }
109
110        if (enc_dest) {
111                uint32_t old_ip=ip->ip_dst.s_addr;
112                uint32_t new_ip=htonl(enc_ip(
113                                        htonl(ip->ip_dst.s_addr)
114                                        ));
115                update_in_cksum32(&ip->ip_sum,old_ip,new_ip);
116                if (tcp) update_in_cksum32(&tcp->check,old_ip,new_ip);
117                if (udp) update_in_cksum32(&udp->check,old_ip,new_ip);
118                ip->ip_dst.s_addr = new_ip;
119        }
120
121        if (icmp) {
122                /* These are error codes that return the IP packet
123                 * internally
124                 */
125               
126                if (icmp->type == 3 
127                                || icmp->type == 5 
128                                || icmp->type == 11) {
129                        char *ptr = (char *)icmp;
130                        encrypt_ips(
131                                (struct libtrace_ip*)(ptr+
132                                        sizeof(struct libtrace_icmp)),
133                                enc_dest,
134                                enc_source);
135                }
136
137                if (enc_source || enc_dest)
138                        icmp->checksum = 0;
139        }
140}
141
142
143static uint64_t bad_hash(libtrace_packet_t * pkt)
144{
145        return 0;
146}
147
148
149static uint64_t rand_hash(libtrace_packet_t * pkt)
150{
151        return rand();
152}
153
154
155static void* per_packet(libtrace_t *trace, libtrace_packet_t *pkt, libtrace_message_t *mesg, libtrace_thread_t *t)
156{
157        int i;
158       
159        if (pkt) {
160                struct libtrace_ip *ipptr;
161                libtrace_udp_t *udp = NULL;
162                libtrace_tcp_t *tcp = NULL;
163
164                ipptr = trace_get_ip(pkt);
165
166                if (ipptr && (enc_source || enc_dest)) {
167                        encrypt_ips(ipptr,enc_source,enc_dest);
168                        ipptr->ip_sum = 0;
169                }
170
171                /* Replace checksums so that IP encryption cannot be
172                 * reversed */
173
174                /* XXX replace with nice use of trace_get_transport() */
175
176                udp = trace_get_udp(pkt);
177                if (udp && (enc_source || enc_dest)) {
178                        udp->check = 0;
179                } 
180
181                tcp = trace_get_tcp(pkt);
182                if (tcp && (enc_source || enc_dest)) {
183                        tcp->check = 0;
184                }
185
186                /* TODO: Encrypt IP's in ARP packets */
187               
188                // Send our result keyed with the time
189                // Arg don't copy packets
190                //libtrace_packet_t * packet_copy = trace_copy_packet(packet);
191                //libtrace_packet_t * packet_copy = trace_result_packet(trace, pkt);
192                //trace_publish_result(trace, trace_packet_get_order(pkt), pkt);
193                trace_publish_packet(trace, pkt);
194                //return ;
195        }
196        if (mesg) {
197                // printf ("%d.%06d READ #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet));
198                switch (mesg->code) {
199                        case MESSAGE_STARTED:
200                                enc_init(enc_type,key);
201                }
202        }
203        return NULL;
204}
205
206int main(int argc, char *argv[]) 
207{
208        //struct libtrace_t *trace = 0;
209        struct libtrace_packet_t *packet/* = trace_create_packet()*/;
210        struct libtrace_out_t *writer = 0;
211        struct sigaction sigact;
212        char *output = 0;
213        int level = -1;
214        char *compress_type_str=NULL;
215        trace_option_compresstype_t compress_type = TRACE_OPTION_COMPRESSTYPE_NONE;
216
217
218        if (argc<2)
219                usage(argv[0]);
220
221        while (1) {
222                int option_index;
223                struct option long_options[] = {
224                        { "encrypt-source",     0, 0, 's' },
225                        { "encrypt-dest",       0, 0, 'd' },
226                        { "cryptopan",          1, 0, 'c' },
227                        { "cryptopan-file",     1, 0, 'f' },
228                        { "prefix",             1, 0, 'p' },
229                        { "compress-level",     1, 0, 'z' },
230                        { "compress-type",      1, 0, 'Z' },
231                        { "libtrace-help",      0, 0, 'H' },
232                        { NULL,                 0, 0, 0   },
233                };
234
235                int c=getopt_long(argc, argv, "Z:z:sc:f:dp:H",
236                                long_options, &option_index);
237
238                if (c==-1)
239                        break;
240
241                switch (c) {
242                        case 'Z': compress_type_str=optarg; break;         
243                        case 'z': level = atoi(optarg); break;
244                        case 's': enc_source=true; break;
245                        case 'd': enc_dest  =true; break;
246                        case 'c':
247                                  if (key!=NULL) {
248                                          fprintf(stderr,"You can only have one encryption type and one key\n");
249                                          usage(argv[0]);
250                                  }
251                                  key=strdup(optarg);
252                                  enc_type = ENC_CRYPTOPAN;
253                                  break;
254                        case 'f':
255                                  if(key != NULL) {
256                                    fprintf(stderr,"You can only have one encryption type and one key\n");
257                                    usage(argv[0]);
258                                  }
259                                  FILE * infile = fopen(optarg,"rb");
260                                  if(infile == NULL) {
261                                    perror("Failed to open cryptopan keyfile");
262                                    return 1;
263                                  }
264                                  key = (char *) malloc(sizeof(char *) * 32);
265                                  if(fread(key,1,32,infile) != 32) {
266                                    if(ferror(infile)) {
267                                      perror("Failed while reading cryptopan keyfile");
268                                    }
269                                  }
270                                  fclose(infile);
271                                  enc_type = ENC_CRYPTOPAN;
272                                  break;
273                        case 'p':
274                                  if (key!=NULL) {
275                                          fprintf(stderr,"You can only have one encryption type and one key\n");
276                                          usage(argv[0]);
277                                  }
278                                  key=strdup(optarg);
279                                  enc_type = ENC_PREFIX_SUBSTITUTION;
280                                  break;
281                        case 'H':
282                                  trace_help(); 
283                                  exit(1); 
284                                  break;
285                        default:
286                                fprintf(stderr,"unknown option: %c\n",c);
287                                usage(argv[0]);
288
289                }
290
291        }
292
293        if (compress_type_str == NULL && level >= 0) {
294                fprintf(stderr, "Compression level set, but no compression type was defined, setting to gzip\n");
295                compress_type = TRACE_OPTION_COMPRESSTYPE_ZLIB;
296        }
297
298        else if (compress_type_str == NULL) {
299                /* If a level or type is not specified, use the "none"
300                 * compression module */
301                compress_type = TRACE_OPTION_COMPRESSTYPE_NONE;
302        }
303
304        /* I decided to be fairly generous in what I accept for the
305         * compression type string */
306        else if (strncmp(compress_type_str, "gz", 2) == 0 ||
307                        strncmp(compress_type_str, "zlib", 4) == 0) {
308                compress_type = TRACE_OPTION_COMPRESSTYPE_ZLIB;
309        } else if (strncmp(compress_type_str, "bz", 2) == 0) {
310                compress_type = TRACE_OPTION_COMPRESSTYPE_BZ2;
311        } else if (strncmp(compress_type_str, "lzo", 3) == 0) {
312                compress_type = TRACE_OPTION_COMPRESSTYPE_LZO;
313        } else if (strncmp(compress_type_str, "no", 2) == 0) {
314                compress_type = TRACE_OPTION_COMPRESSTYPE_NONE;
315        } else {
316                fprintf(stderr, "Unknown compression type: %s\n",
317                        compress_type_str);
318                return 1;
319        }
320       
321
322       
323
324        /* open input uri */
325        trace = trace_create(argv[optind]);
326        if (trace_is_err(trace)) {
327                trace_perror(trace,"trace_create");
328                trace_destroy(trace);
329                return 1;
330        }
331
332        if (optind +1>= argc) {
333                /* no output specified, output in same format to
334                 * stdout
335                 */
336                output = strdup("erf:-");
337                writer = trace_create_output(output);
338        } else {
339                writer = trace_create_output(argv[optind +1]);
340        }
341        if (trace_is_err_output(writer)) {
342                trace_perror_output(writer,"trace_create_output");
343                trace_destroy_output(writer);
344                trace_destroy(trace);
345                return 1;
346        }
347       
348        /* Hopefully this will deal nicely with people who want to crank the
349         * compression level up to 11 :) */
350        if (level > 9) {
351                fprintf(stderr, "WARNING: Compression level > 9 specified, setting to 9 instead\n");
352                level = 9;
353        }
354
355        if (level >= 0 && trace_config_output(writer, 
356                        TRACE_OPTION_OUTPUT_COMPRESS, &level) == -1) {
357                trace_perror_output(writer, "Configuring compression level");
358                trace_destroy_output(writer);
359                trace_destroy(trace);
360                return 1;
361        }
362
363        if (trace_config_output(writer, TRACE_OPTION_OUTPUT_COMPRESSTYPE,
364                                &compress_type) == -1) {
365                trace_perror_output(writer, "Configuring compression type");
366                trace_destroy_output(writer);
367                trace_destroy(trace);
368                return 1;
369        }
370
371        // OK parallel changes start here
372
373        /* Set a special mode flag that means the output is timestamped
374         * and ordered before its read into reduce. Seems like a good
375         * special case to have.
376         */
377         
378        int i = 1;
379        trace_parallel_config(trace, TRACE_OPTION_SEQUENTIAL, &i);
380        //trace_parallel_config(trace, TRACE_OPTION_SET_PERPKT_BUFFER_SIZE, &i);
381        //trace_parallel_config(trace, TRACE_OPTION_USE_DEDICATED_HASHER, &i);
382        //trace_parallel_config(trace, TRACE_OPTION_USE_SLIDING_WINDOW_BUFFER, &i);
383        i = 2;
384        trace_parallel_config(trace, TRACE_OPTION_SET_PERPKT_THREAD_COUNT, &i);
385       
386        if (trace_pstart(trace, NULL, &per_packet, NULL)==-1) {
387                trace_perror(trace,"trace_start");
388                trace_destroy_output(writer);
389                trace_destroy(trace);
390                return 1;
391        }
392       
393        if (trace_start_output(writer)==-1) {
394                trace_perror_output(writer,"trace_start_output");
395                trace_destroy_output(writer);
396                trace_destroy(trace);
397                return 1;
398        }
399
400        sigact.sa_handler = cleanup_signal;
401        sigemptyset(&sigact.sa_mask);
402        sigact.sa_flags = SA_RESTART;
403
404        sigaction(SIGINT, &sigact, NULL);
405        sigaction(SIGTERM, &sigact, NULL);
406       
407        // Read in the resulting packets and then free them when done
408        libtrace_vector_t res;
409        int res_size = 0;
410        libtrace_vector_init(&res, sizeof(libtrace_result_t));
411        uint64_t packet_count = 0;
412        while (!trace_finished(trace)) {
413                // Read messages
414                libtrace_message_t message;
415               
416                // We just release and do work currently, maybe if something
417                // interesting comes through we'd deal with that
418                libtrace_thread_get_message(trace, &message);
419               
420                while (libtrace_thread_try_get_message(trace, &message) != LIBTRACE_MQ_FAILED) { }
421               
422                if ((res_size = trace_get_results(trace, &res)) == 0)
423                        ;/*sched_yield();*/
424               
425                for (i = 0 ; i < res_size ; i++) {
426                        libtrace_result_t result;
427                        assert(libtrace_vector_get(&res, i, (void *) &result) == 1);
428                        packet = libtrace_result_get_value(&result);
429                        assert(libtrace_result_get_key(&result) == packet_count);
430                        packet_count++;
431                        if (trace_write_packet(writer,packet)==-1) {
432                                trace_perror_output(writer,"writer");
433                                trace_interrupt();
434                                break;
435                        }
436                        //trace_destroy_packet(packet);
437                        trace_free_result_packet(trace, packet);
438                }
439        }
440        trace_join(trace);
441       
442        // Grab everything that's left here
443        res_size = trace_get_results(trace, &res);
444       
445        for (i = 0 ; i < res_size ; i++) {
446                libtrace_result_t result;
447                assert(libtrace_vector_get(&res, i, (void *) &result) == 1);
448                packet = libtrace_result_get_value(&result);
449                if (libtrace_result_get_key(&result) != packet_count)
450                        printf ("Got a %"PRIu64" but expected a %"PRIu64" %d\n", libtrace_result_get_key(&result), packet_count, res_size);
451                assert(libtrace_result_get_key(&result) == packet_count);
452               
453                packet_count++;
454                if (trace_write_packet(writer,packet)==-1) {
455                        trace_perror_output(writer,"writer");
456                        trace_interrupt();
457                        break;
458                }
459                trace_destroy_packet(packet);
460        }
461        libtrace_vector_destroy(&res);
462       
463        //trace_destroy_packet(packet);
464        //print_contention_stats(trace);
465        trace_destroy(trace);
466        trace_destroy_output(writer);
467        return 0;
468}
Note: See TracBrowser for help on using the repository browser.