source: tools/traceanon/traceanon_parallel.c @ 7c95027

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 7c95027 was 7c95027, checked in by Richard Sanger <rsangerarj@…>, 6 years ago

Move combiners.h into libtrace_parallel.h and export libtrace_parallel.h
so that it is installed correctly.

  • Property mode set to 100644
File size: 12.7 KB
Line 
1#define _GNU_SOURCE
2#include "libtrace_parallel.h"
3#include <stdio.h>
4#include <unistd.h>
5#include <stdlib.h>
6#include <getopt.h>
7#include <stdbool.h>
8#include <stddef.h>
9#include <string.h>
10#include <time.h>
11#include <assert.h>
12#include "ipenc.h"
13#include <signal.h>
14
15bool enc_source = false;
16bool enc_dest   = false;
17enum enc_type_t enc_type = ENC_NONE;
18char *key = NULL;
19
20
21struct libtrace_t *trace = NULL;
22
23static void cleanup_signal(int signal)
24{
25        static int s = 0;
26        (void)signal;
27    //trace_interrupt();
28        // trace_pstop isn't really signal safe because its got lots of locks in it
29    trace_pstop(trace);
30    /*if (s == 0) {
31                if (trace_ppause(trace) == -1)
32                        trace_perror(trace, "Pause failed");
33        }
34        else {
35                if (trace_pstart(trace, NULL, NULL, NULL) == -1)
36                        trace_perror(trace, "Start failed");
37    }*/
38        s = !s;
39}
40
41
42
43static void usage(char *argv0)
44{
45        fprintf(stderr,"Usage:\n"
46        "%s flags inputfile outputfile\n"
47        "-s --encrypt-source    Encrypt the source addresses\n"
48        "-d --encrypt-dest      Encrypt the destination addresses\n"
49        "-c --cryptopan=key     Encrypt the addresses with the cryptopan\n"
50        "                       prefix preserving\n"
51        "-f --keyfile=file      A file containing the cryptopan key\n"
52        "-p --prefix=C.I.D.R/bits Substitute the prefix of the address\n"
53        "-H --libtrace-help     Print libtrace runtime documentation\n"
54        "-z --compress-level    Compress the output trace at the specified level\n"
55        "-Z --compress-type     Compress the output trace using the specified"
56        "                       compression algorithm\n"
57        ,argv0);
58        exit(1);
59}
60
61/* Incrementally update a checksum */
62static void update_in_cksum(uint16_t *csum, uint16_t old, uint16_t new)
63{
64        uint32_t sum = (~htons(*csum) & 0xFFFF) 
65                     + (~htons(old) & 0xFFFF) 
66                     + htons(new);
67        sum = (sum & 0xFFFF) + (sum >> 16);
68        *csum = htons(~(sum + (sum >> 16)));
69}
70
71static void update_in_cksum32(uint16_t *csum, uint32_t old, uint32_t new)
72{
73        update_in_cksum(csum,(uint16_t)(old>>16),(uint16_t)(new>>16));
74        update_in_cksum(csum,(uint16_t)(old&0xFFFF),(uint16_t)(new&0xFFFF));
75}
76
77/* Ok this is remarkably complicated
78 *
79 * We want to change one, or the other IP address, while preserving
80 * the checksum.  TCP and UDP both include the faux header in their
81 * checksum calculations, so you have to update them too.  ICMP is
82 * even worse -- it can include the original IP packet that caused the
83 * error!  So anonymise that too, but remember that it's travelling in
84 * the opposite direction so we need to encrypt the destination and
85 * source instead of the source and destination!
86 */
87static void encrypt_ips(struct libtrace_ip *ip,bool enc_source,bool enc_dest)
88{
89        struct libtrace_tcp *tcp;
90        struct libtrace_udp *udp;
91        struct libtrace_icmp *icmp;
92
93        tcp=trace_get_tcp_from_ip(ip,NULL);
94        udp=trace_get_udp_from_ip(ip,NULL);
95        icmp=trace_get_icmp_from_ip(ip,NULL);
96
97        if (enc_source) {
98                uint32_t old_ip=ip->ip_src.s_addr;
99                uint32_t new_ip=htonl(enc_ip(
100                                        htonl(ip->ip_src.s_addr)
101                                        ));
102                update_in_cksum32(&ip->ip_sum,old_ip,new_ip);
103                if (tcp) update_in_cksum32(&tcp->check,old_ip,new_ip);
104                if (udp) update_in_cksum32(&udp->check,old_ip,new_ip);
105                ip->ip_src.s_addr = new_ip;
106        }
107
108        if (enc_dest) {
109                uint32_t old_ip=ip->ip_dst.s_addr;
110                uint32_t new_ip=htonl(enc_ip(
111                                        htonl(ip->ip_dst.s_addr)
112                                        ));
113                update_in_cksum32(&ip->ip_sum,old_ip,new_ip);
114                if (tcp) update_in_cksum32(&tcp->check,old_ip,new_ip);
115                if (udp) update_in_cksum32(&udp->check,old_ip,new_ip);
116                ip->ip_dst.s_addr = new_ip;
117        }
118
119        if (icmp) {
120                /* These are error codes that return the IP packet
121                 * internally
122                 */
123               
124                if (icmp->type == 3 
125                                || icmp->type == 5 
126                                || icmp->type == 11) {
127                        char *ptr = (char *)icmp;
128                        encrypt_ips(
129                                (struct libtrace_ip*)(ptr+
130                                        sizeof(struct libtrace_icmp)),
131                                enc_dest,
132                                enc_source);
133                }
134
135                if (enc_source || enc_dest)
136                        icmp->checksum = 0;
137        }
138}
139
140
141UNUSED static uint64_t bad_hash(UNUSED libtrace_packet_t * pkt)
142{
143        return 0;
144}
145
146
147UNUSED static uint64_t rand_hash(UNUSED libtrace_packet_t * pkt)
148{
149        return rand();
150}
151
152
153static void* per_packet(libtrace_t *trace, libtrace_thread_t *t,
154                        int mesg, libtrace_generic_t data,
155                        libtrace_thread_t *sender UNUSED)
156{
157        struct libtrace_ip *ipptr;
158        libtrace_udp_t *udp = NULL;
159        libtrace_tcp_t *tcp = NULL;
160        libtrace_stat_t *stats = NULL;
161        switch (mesg) {
162        case MESSAGE_PACKET:
163                ipptr = trace_get_ip(data.pkt);
164
165                if (ipptr && (enc_source || enc_dest)) {
166                        encrypt_ips(ipptr,enc_source,enc_dest);
167                        ipptr->ip_sum = 0;
168                }
169
170                /* Replace checksums so that IP encryption cannot be
171                 * reversed */
172
173                /* XXX replace with nice use of trace_get_transport() */
174
175                udp = trace_get_udp(data.pkt);
176                if (udp && (enc_source || enc_dest)) {
177                        udp->check = 0;
178                } 
179
180                tcp = trace_get_tcp(data.pkt);
181                if (tcp && (enc_source || enc_dest)) {
182                        tcp->check = 0;
183                }
184
185                /* TODO: Encrypt IP's in ARP packets */
186               
187                // Send our result keyed with the time
188                // Arg don't copy packets
189                //libtrace_packet_t * packet_copy = trace_copy_packet(packet);
190                //libtrace_packet_t * packet_copy = trace_result_packet(trace, pkt);
191                //trace_publish_result(trace, trace_packet_get_order(pkt), pkt);
192
193                trace_publish_result(trace, t, trace_packet_get_order(data.pkt), data, RESULT_PACKET);
194                break;
195        case MESSAGE_STARTING:
196                enc_init(enc_type,key);
197                break;
198        case MESSAGE_TICK_INTERVAL:
199                trace_publish_result(trace, t, data.uint64, (libtrace_generic_t){0}, RESULT_TICK_INTERVAL);
200                break;
201        case MESSAGE_TICK_COUNT:
202                trace_publish_result(trace, t, data.uint64, (libtrace_generic_t){0}, RESULT_TICK_COUNT);
203                break;
204        case MESSAGE_STOPPING:
205                stats = trace_create_statistics();
206                trace_get_thread_statistics(trace, t, stats);
207                trace_print_statistics(stats, stderr, NULL);
208                free(stats);
209                stats = trace_get_statistics(trace, NULL);
210                trace_print_statistics(stats, stderr, NULL);
211                //fprintf(stderr, "tracestats_parallel:\t Stopping thread - publishing results\n");
212                break;
213        }
214        return NULL;
215}
216
217struct libtrace_out_t *writer = 0;
218
219static void write_out(libtrace_t *trace UNUSED, int mesg,
220                      libtrace_generic_t data,
221                      libtrace_thread_t *sender UNUSED) {
222        static uint64_t packet_count = 0; // TESTING PURPOSES, this is not going to work with a live format
223
224        switch (mesg) {
225        case MESSAGE_RESULT:
226                if (data.res->type == RESULT_PACKET) {
227                        libtrace_packet_t *packet = (libtrace_packet_t*) data.res->value.pkt;
228                        assert(data.res->key >= packet_count);
229                        packet_count = data.res->key;
230                        if (trace_write_packet(writer,packet)==-1) {
231                                trace_perror_output(writer,"writer");
232                                trace_interrupt();
233                        }
234                        trace_free_packet(trace, packet);
235
236                } else {
237                        assert(data.res->type == RESULT_TICK_COUNT || data.res->type == RESULT_TICK_INTERVAL);
238                        // Ignore it
239                }
240        }
241}
242
243
244int main(int argc, char *argv[]) 
245{
246        //struct libtrace_t *trace = 0;
247        struct sigaction sigact;
248        char *output = 0;
249        int level = -1;
250        char *compress_type_str=NULL;
251        trace_option_compresstype_t compress_type = TRACE_OPTION_COMPRESSTYPE_NONE;
252        char *config = NULL;
253        char *config_file = NULL;
254
255        if (argc<2)
256                usage(argv[0]);
257
258        while (1) {
259                int option_index;
260                struct option long_options[] = {
261                        { "encrypt-source",     0, 0, 's' },
262                        { "encrypt-dest",       0, 0, 'd' },
263                        { "cryptopan",          1, 0, 'c' },
264                        { "cryptopan-file",     1, 0, 'f' },
265                        { "prefix",             1, 0, 'p' },
266                        { "compress-level",     1, 0, 'z' },
267                        { "compress-type",      1, 0, 'Z' },
268                        { "libtrace-help",      0, 0, 'H' },
269                        { "config",             1, 0, 'u' },
270                    { "config-file",            1, 0, 'U' },
271                        { NULL,                 0, 0, 0   },
272                };
273
274                int c=getopt_long(argc, argv, "Z:z:sc:f:dp:Hu:U:",
275                                long_options, &option_index);
276
277                if (c==-1)
278                        break;
279
280                switch (c) {
281                        case 'Z': compress_type_str=optarg; break;         
282                        case 'z': level = atoi(optarg); break;
283                        case 's': enc_source=true; break;
284                        case 'd': enc_dest  =true; break;
285                        case 'c':
286                                  if (key!=NULL) {
287                                          fprintf(stderr,"You can only have one encryption type and one key\n");
288                                          usage(argv[0]);
289                                  }
290                                  key=strdup(optarg);
291                                  enc_type = ENC_CRYPTOPAN;
292                                  break;
293                        case 'f':
294                                  if(key != NULL) {
295                                    fprintf(stderr,"You can only have one encryption type and one key\n");
296                                    usage(argv[0]);
297                                  }
298                                  FILE * infile = fopen(optarg,"rb");
299                                  if(infile == NULL) {
300                                    perror("Failed to open cryptopan keyfile");
301                                    return 1;
302                                  }
303                                  key = (char *) malloc(sizeof(char *) * 32);
304                                  if(fread(key,1,32,infile) != 32) {
305                                    if(ferror(infile)) {
306                                      perror("Failed while reading cryptopan keyfile");
307                                    }
308                                  }
309                                  fclose(infile);
310                                  enc_type = ENC_CRYPTOPAN;
311                                  break;
312                        case 'p':
313                                  if (key!=NULL) {
314                                          fprintf(stderr,"You can only have one encryption type and one key\n");
315                                          usage(argv[0]);
316                                  }
317                                  key=strdup(optarg);
318                                  enc_type = ENC_PREFIX_SUBSTITUTION;
319                                  break;
320                        case 'H':
321                                  trace_help(); 
322                                  exit(1); 
323                                  break;
324                        case 'u':
325                                config = optarg;
326                                break;
327                        case 'U':
328                                config_file = optarg;
329                                break;
330                        default:
331                                fprintf(stderr,"unknown option: %c\n",c);
332                                usage(argv[0]);
333
334                }
335
336        }
337
338        if (compress_type_str == NULL && level >= 0) {
339                fprintf(stderr, "Compression level set, but no compression type was defined, setting to gzip\n");
340                compress_type = TRACE_OPTION_COMPRESSTYPE_ZLIB;
341        }
342
343        else if (compress_type_str == NULL) {
344                /* If a level or type is not specified, use the "none"
345                 * compression module */
346                compress_type = TRACE_OPTION_COMPRESSTYPE_NONE;
347        }
348
349        /* I decided to be fairly generous in what I accept for the
350         * compression type string */
351        else if (strncmp(compress_type_str, "gz", 2) == 0 ||
352                        strncmp(compress_type_str, "zlib", 4) == 0) {
353                compress_type = TRACE_OPTION_COMPRESSTYPE_ZLIB;
354        } else if (strncmp(compress_type_str, "bz", 2) == 0) {
355                compress_type = TRACE_OPTION_COMPRESSTYPE_BZ2;
356        } else if (strncmp(compress_type_str, "lzo", 3) == 0) {
357                compress_type = TRACE_OPTION_COMPRESSTYPE_LZO;
358        } else if (strncmp(compress_type_str, "no", 2) == 0) {
359                compress_type = TRACE_OPTION_COMPRESSTYPE_NONE;
360        } else {
361                fprintf(stderr, "Unknown compression type: %s\n",
362                        compress_type_str);
363                return 1;
364        }
365
366        /* open input uri */
367        trace = trace_create(argv[optind]);
368        if (trace_is_err(trace)) {
369                trace_perror(trace,"trace_create");
370                trace_destroy(trace);
371                return 1;
372        }
373
374        if (optind +1>= argc) {
375                /* no output specified, output in same format to
376                 * stdout
377                 */
378                output = strdup("erf:-");
379                writer = trace_create_output(output);
380        } else {
381                writer = trace_create_output(argv[optind +1]);
382        }
383        if (trace_is_err_output(writer)) {
384                trace_perror_output(writer,"trace_create_output");
385                trace_destroy_output(writer);
386                trace_destroy(trace);
387                return 1;
388        }
389       
390        /* Hopefully this will deal nicely with people who want to crank the
391         * compression level up to 11 :) */
392        if (level > 9) {
393                fprintf(stderr, "WARNING: Compression level > 9 specified, setting to 9 instead\n");
394                level = 9;
395        }
396
397        if (level >= 0 && trace_config_output(writer, 
398                        TRACE_OPTION_OUTPUT_COMPRESS, &level) == -1) {
399                trace_perror_output(writer, "Configuring compression level");
400                trace_destroy_output(writer);
401                trace_destroy(trace);
402                return 1;
403        }
404
405        if (trace_config_output(writer, TRACE_OPTION_OUTPUT_COMPRESSTYPE,
406                                &compress_type) == -1) {
407                trace_perror_output(writer, "Configuring compression type");
408                trace_destroy_output(writer);
409                trace_destroy(trace);
410                return 1;
411        }
412
413        if (trace_start_output(writer)==-1) {
414                trace_perror_output(writer,"trace_start_output");
415                trace_destroy_output(writer);
416                trace_destroy(trace);
417                return 1;
418        }
419
420        // OK parallel changes start here
421
422        /* Set a special mode flag that means the output is timestamped
423         * and ordered before its read into reduce. Seems like a good
424         * special case to have.
425         */
426
427        /* Apply config */
428        if (config) {
429                trace_set_configuration(trace, config);
430        }
431
432        if (config_file) {
433                FILE * f = fopen(optarg, "r");
434                if (f != NULL) {
435                        trace_set_configuration_file(trace, f);
436                        fclose(f);
437                } else {
438                        perror("Failed to open configuration file\n");
439                        usage(argv[0]);
440                }
441        }
442
443        trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_t){0});
444
445        //trace_set_hasher(trace, HASHER_CUSTOM, rand_hash, NULL);
446       
447        if (trace_pstart(trace, NULL, &per_packet, &write_out)==-1) {
448                trace_perror(trace,"trace_start");
449                trace_destroy_output(writer);
450                trace_destroy(trace);
451                return 1;
452        }
453
454        sigact.sa_handler = cleanup_signal;
455        sigemptyset(&sigact.sa_mask);
456        sigact.sa_flags = SA_RESTART;
457
458        sigaction(SIGINT, &sigact, NULL);
459        sigaction(SIGTERM, &sigact, NULL);
460
461        // Wait for the trace to finish
462        trace_join(trace);
463       
464        //trace_destroy_packet(packet);
465        //print_contention_stats(trace);
466        trace_destroy(trace);
467        trace_destroy_output(writer);
468        return 0;
469}
Note: See TracBrowser for help on using the repository browser.