source: tools/traceanon/traceanon_parallel.c @ 5b4d121

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 5b4d121 was 5b4d121, checked in by Richard Sanger <rsangerarj@…>, 6 years ago

Adds a configuration parser to make it easy to change the parallel configuration.
Adds more configuration options (Tidies some verbose debugging output).
Implements tick packets for the hasher thread case.
Some other minor bug fixes

  • Property mode set to 100644
File size: 12.2 KB
Line 
1#define _GNU_SOURCE
2#include "libtrace.h"
3#include <stdio.h>
4#include <unistd.h>
5#include <stdlib.h>
6#include <getopt.h>
7#include <stdbool.h>
8#include <stddef.h>
9#include <string.h>
10#include <time.h>
11#include <assert.h>
12#include "ipenc.h"
13#include <data-struct/vector.h>
14#include <data-struct/message_queue.h>
15#include <signal.h>
16
17bool enc_source = false;
18bool enc_dest   = false;
19enum enc_type_t enc_type = ENC_NONE;
20char *key = NULL;
21
22
23struct libtrace_t *trace = NULL;
24
25static void cleanup_signal(int signal)
26{
27        static int s = 0;
28        (void)signal;
29    //trace_interrupt();
30        // trace_pstop isn't really signal safe because its got lots of locks in it
31    trace_pstop(trace);
32    /*if (s == 0) {
33                if (trace_ppause(trace) == -1)
34                        trace_perror(trace, "Pause failed");
35        }
36        else {
37                if (trace_pstart(trace, NULL, NULL, NULL) == -1)
38                        trace_perror(trace, "Start failed");
39    }*/
40        s = !s;
41}
42
43
44
45static void usage(char *argv0)
46{
47        fprintf(stderr,"Usage:\n"
48        "%s flags inputfile outputfile\n"
49        "-s --encrypt-source    Encrypt the source addresses\n"
50        "-d --encrypt-dest      Encrypt the destination addresses\n"
51        "-c --cryptopan=key     Encrypt the addresses with the cryptopan\n"
52        "                       prefix preserving\n"
53        "-f --keyfile=file      A file containing the cryptopan key\n"
54        "-p --prefix=C.I.D.R/bits Substitute the prefix of the address\n"
55        "-H --libtrace-help     Print libtrace runtime documentation\n"
56        "-z --compress-level    Compress the output trace at the specified level\n"
57        "-Z --compress-type     Compress the output trace using the specified"
58        "                       compression algorithm\n"
59        ,argv0);
60        exit(1);
61}
62
63/* Incrementally update a checksum */
64static void update_in_cksum(uint16_t *csum, uint16_t old, uint16_t new)
65{
66        uint32_t sum = (~htons(*csum) & 0xFFFF) 
67                     + (~htons(old) & 0xFFFF) 
68                     + htons(new);
69        sum = (sum & 0xFFFF) + (sum >> 16);
70        *csum = htons(~(sum + (sum >> 16)));
71}
72
73static void update_in_cksum32(uint16_t *csum, uint32_t old, uint32_t new)
74{
75        update_in_cksum(csum,(uint16_t)(old>>16),(uint16_t)(new>>16));
76        update_in_cksum(csum,(uint16_t)(old&0xFFFF),(uint16_t)(new&0xFFFF));
77}
78
79/* Ok this is remarkably complicated
80 *
81 * We want to change one, or the other IP address, while preserving
82 * the checksum.  TCP and UDP both include the faux header in their
83 * checksum calculations, so you have to update them too.  ICMP is
84 * even worse -- it can include the original IP packet that caused the
85 * error!  So anonymise that too, but remember that it's travelling in
86 * the opposite direction so we need to encrypt the destination and
87 * source instead of the source and destination!
88 */
89static void encrypt_ips(struct libtrace_ip *ip,bool enc_source,bool enc_dest)
90{
91        struct libtrace_tcp *tcp;
92        struct libtrace_udp *udp;
93        struct libtrace_icmp *icmp;
94
95        tcp=trace_get_tcp_from_ip(ip,NULL);
96        udp=trace_get_udp_from_ip(ip,NULL);
97        icmp=trace_get_icmp_from_ip(ip,NULL);
98
99        if (enc_source) {
100                uint32_t old_ip=ip->ip_src.s_addr;
101                uint32_t new_ip=htonl(enc_ip(
102                                        htonl(ip->ip_src.s_addr)
103                                        ));
104                update_in_cksum32(&ip->ip_sum,old_ip,new_ip);
105                if (tcp) update_in_cksum32(&tcp->check,old_ip,new_ip);
106                if (udp) update_in_cksum32(&udp->check,old_ip,new_ip);
107                ip->ip_src.s_addr = new_ip;
108        }
109
110        if (enc_dest) {
111                uint32_t old_ip=ip->ip_dst.s_addr;
112                uint32_t new_ip=htonl(enc_ip(
113                                        htonl(ip->ip_dst.s_addr)
114                                        ));
115                update_in_cksum32(&ip->ip_sum,old_ip,new_ip);
116                if (tcp) update_in_cksum32(&tcp->check,old_ip,new_ip);
117                if (udp) update_in_cksum32(&udp->check,old_ip,new_ip);
118                ip->ip_dst.s_addr = new_ip;
119        }
120
121        if (icmp) {
122                /* These are error codes that return the IP packet
123                 * internally
124                 */
125               
126                if (icmp->type == 3 
127                                || icmp->type == 5 
128                                || icmp->type == 11) {
129                        char *ptr = (char *)icmp;
130                        encrypt_ips(
131                                (struct libtrace_ip*)(ptr+
132                                        sizeof(struct libtrace_icmp)),
133                                enc_dest,
134                                enc_source);
135                }
136
137                if (enc_source || enc_dest)
138                        icmp->checksum = 0;
139        }
140}
141
142
143UNUSED static uint64_t bad_hash(UNUSED libtrace_packet_t * pkt)
144{
145        return 0;
146}
147
148
149UNUSED static uint64_t rand_hash(UNUSED libtrace_packet_t * pkt)
150{
151        return rand();
152}
153
154
155static void* per_packet(libtrace_t *trace, libtrace_packet_t *pkt, libtrace_message_t *mesg, UNUSED libtrace_thread_t *t)
156{
157       
158        if (pkt) {
159                struct libtrace_ip *ipptr;
160                libtrace_udp_t *udp = NULL;
161                libtrace_tcp_t *tcp = NULL;
162
163                ipptr = trace_get_ip(pkt);
164
165                if (ipptr && (enc_source || enc_dest)) {
166                        encrypt_ips(ipptr,enc_source,enc_dest);
167                        ipptr->ip_sum = 0;
168                }
169
170                /* Replace checksums so that IP encryption cannot be
171                 * reversed */
172
173                /* XXX replace with nice use of trace_get_transport() */
174
175                udp = trace_get_udp(pkt);
176                if (udp && (enc_source || enc_dest)) {
177                        udp->check = 0;
178                } 
179
180                tcp = trace_get_tcp(pkt);
181                if (tcp && (enc_source || enc_dest)) {
182                        tcp->check = 0;
183                }
184
185                /* TODO: Encrypt IP's in ARP packets */
186               
187                // Send our result keyed with the time
188                // Arg don't copy packets
189                //libtrace_packet_t * packet_copy = trace_copy_packet(packet);
190                //libtrace_packet_t * packet_copy = trace_result_packet(trace, pkt);
191                //trace_publish_result(trace, trace_packet_get_order(pkt), pkt);
192                trace_publish_result(trace, t, trace_packet_get_order(pkt), pkt, RESULT_PACKET);
193                //return ;
194        }
195        if (mesg) {
196                // printf ("%d.%06d READ #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet));
197                switch (mesg->code) {
198                        case MESSAGE_STARTING:
199                                enc_init(enc_type,key);
200                        break;
201                        case MESSAGE_TICK:
202                                trace_publish_result(trace, t, mesg->additional.uint64, NULL, RESULT_TICK);
203                }
204        }
205        return NULL;
206}
207
208struct libtrace_out_t *writer = 0;
209
210static void* write_out(libtrace_t *trace, libtrace_result_t *result, UNUSED libtrace_message_t *mesg) {
211        static uint64_t packet_count = 0; // TESTING PURPOSES, this is not going to work with a live format
212
213        if (result) {
214                if (result->type == RESULT_PACKET) {
215                        libtrace_packet_t *packet = (libtrace_packet_t*) libtrace_result_get_value(result);
216                        assert(libtrace_result_get_key(result) == packet_count++);
217                        if (trace_write_packet(writer,packet)==-1) {
218                                trace_perror_output(writer,"writer");
219                                trace_interrupt();
220                        }
221                        trace_free_result_packet(trace, packet);
222
223                } else {
224                        assert(result->type == RESULT_TICK);
225                        // Ignore it
226                }
227        }
228        return NULL;
229}
230
231
232int main(int argc, char *argv[]) 
233{
234        //struct libtrace_t *trace = 0;
235        struct sigaction sigact;
236        char *output = 0;
237        int level = -1;
238        char *compress_type_str=NULL;
239        trace_option_compresstype_t compress_type = TRACE_OPTION_COMPRESSTYPE_NONE;
240        struct user_configuration uc;
241        ZERO_USER_CONFIG(uc);
242
243        if (argc<2)
244                usage(argv[0]);
245
246        while (1) {
247                int option_index;
248                struct option long_options[] = {
249                        { "encrypt-source",     0, 0, 's' },
250                        { "encrypt-dest",       0, 0, 'd' },
251                        { "cryptopan",          1, 0, 'c' },
252                        { "cryptopan-file",     1, 0, 'f' },
253                        { "prefix",             1, 0, 'p' },
254                        { "compress-level",     1, 0, 'z' },
255                        { "compress-type",      1, 0, 'Z' },
256                        { "libtrace-help",      0, 0, 'H' },
257                        { "config",             1, 0, 'u' },
258                    { "config-file",            1, 0, 'U' },
259                        { NULL,                 0, 0, 0   },
260                };
261
262                int c=getopt_long(argc, argv, "Z:z:sc:f:dp:Hu:U:",
263                                long_options, &option_index);
264
265                if (c==-1)
266                        break;
267
268                switch (c) {
269                        case 'Z': compress_type_str=optarg; break;         
270                        case 'z': level = atoi(optarg); break;
271                        case 's': enc_source=true; break;
272                        case 'd': enc_dest  =true; break;
273                        case 'c':
274                                  if (key!=NULL) {
275                                          fprintf(stderr,"You can only have one encryption type and one key\n");
276                                          usage(argv[0]);
277                                  }
278                                  key=strdup(optarg);
279                                  enc_type = ENC_CRYPTOPAN;
280                                  break;
281                        case 'f':
282                                  if(key != NULL) {
283                                    fprintf(stderr,"You can only have one encryption type and one key\n");
284                                    usage(argv[0]);
285                                  }
286                                  FILE * infile = fopen(optarg,"rb");
287                                  if(infile == NULL) {
288                                    perror("Failed to open cryptopan keyfile");
289                                    return 1;
290                                  }
291                                  key = (char *) malloc(sizeof(char *) * 32);
292                                  if(fread(key,1,32,infile) != 32) {
293                                    if(ferror(infile)) {
294                                      perror("Failed while reading cryptopan keyfile");
295                                    }
296                                  }
297                                  fclose(infile);
298                                  enc_type = ENC_CRYPTOPAN;
299                                  break;
300                        case 'p':
301                                  if (key!=NULL) {
302                                          fprintf(stderr,"You can only have one encryption type and one key\n");
303                                          usage(argv[0]);
304                                  }
305                                  key=strdup(optarg);
306                                  enc_type = ENC_PREFIX_SUBSTITUTION;
307                                  break;
308                        case 'H':
309                                  trace_help(); 
310                                  exit(1); 
311                                  break;
312                        case 'u':
313                                  parse_user_config(&uc, optarg);
314                                  break;
315                        case 'U':;
316                                FILE * f = fopen(optarg, "r");
317                                if (f != NULL) {
318                                        parse_user_config_file(&uc, f);
319                                } else {
320                                        perror("Failed to open configuration file\n");
321                                        usage(argv[0]);
322                                }
323                                break;
324                        default:
325                                fprintf(stderr,"unknown option: %c\n",c);
326                                usage(argv[0]);
327
328                }
329
330        }
331
332        if (compress_type_str == NULL && level >= 0) {
333                fprintf(stderr, "Compression level set, but no compression type was defined, setting to gzip\n");
334                compress_type = TRACE_OPTION_COMPRESSTYPE_ZLIB;
335        }
336
337        else if (compress_type_str == NULL) {
338                /* If a level or type is not specified, use the "none"
339                 * compression module */
340                compress_type = TRACE_OPTION_COMPRESSTYPE_NONE;
341        }
342
343        /* I decided to be fairly generous in what I accept for the
344         * compression type string */
345        else if (strncmp(compress_type_str, "gz", 2) == 0 ||
346                        strncmp(compress_type_str, "zlib", 4) == 0) {
347                compress_type = TRACE_OPTION_COMPRESSTYPE_ZLIB;
348        } else if (strncmp(compress_type_str, "bz", 2) == 0) {
349                compress_type = TRACE_OPTION_COMPRESSTYPE_BZ2;
350        } else if (strncmp(compress_type_str, "lzo", 3) == 0) {
351                compress_type = TRACE_OPTION_COMPRESSTYPE_LZO;
352        } else if (strncmp(compress_type_str, "no", 2) == 0) {
353                compress_type = TRACE_OPTION_COMPRESSTYPE_NONE;
354        } else {
355                fprintf(stderr, "Unknown compression type: %s\n",
356                        compress_type_str);
357                return 1;
358        }
359       
360
361       
362
363        /* open input uri */
364        trace = trace_create(argv[optind]);
365        if (trace_is_err(trace)) {
366                trace_perror(trace,"trace_create");
367                trace_destroy(trace);
368                return 1;
369        }
370
371        if (optind +1>= argc) {
372                /* no output specified, output in same format to
373                 * stdout
374                 */
375                output = strdup("erf:-");
376                writer = trace_create_output(output);
377        } else {
378                writer = trace_create_output(argv[optind +1]);
379        }
380        if (trace_is_err_output(writer)) {
381                trace_perror_output(writer,"trace_create_output");
382                trace_destroy_output(writer);
383                trace_destroy(trace);
384                return 1;
385        }
386       
387        /* Hopefully this will deal nicely with people who want to crank the
388         * compression level up to 11 :) */
389        if (level > 9) {
390                fprintf(stderr, "WARNING: Compression level > 9 specified, setting to 9 instead\n");
391                level = 9;
392        }
393
394        if (level >= 0 && trace_config_output(writer, 
395                        TRACE_OPTION_OUTPUT_COMPRESS, &level) == -1) {
396                trace_perror_output(writer, "Configuring compression level");
397                trace_destroy_output(writer);
398                trace_destroy(trace);
399                return 1;
400        }
401
402        if (trace_config_output(writer, TRACE_OPTION_OUTPUT_COMPRESSTYPE,
403                                &compress_type) == -1) {
404                trace_perror_output(writer, "Configuring compression type");
405                trace_destroy_output(writer);
406                trace_destroy(trace);
407                return 1;
408        }
409
410        // OK parallel changes start here
411
412        /* Set a special mode flag that means the output is timestamped
413         * and ordered before its read into reduce. Seems like a good
414         * special case to have.
415         */
416         
417        int i = 1;
418        trace_parallel_config(trace, TRACE_OPTION_ORDERED, &i);
419        //trace_parallel_config(trace, TRACE_OPTION_SET_PERPKT_BUFFER_SIZE, &i);
420        i = 6;
421    trace_parallel_config(trace, TRACE_OPTION_SET_PERPKT_THREAD_COUNT, &i);
422        trace_parallel_config(trace, TRACE_OPTION_SET_CONFIG, &uc);
423        trace_set_hasher(trace, HASHER_CUSTOM, bad_hash, NULL);
424       
425        if (trace_pstart(trace, NULL, &per_packet, &write_out)==-1) {
426                trace_perror(trace,"trace_start");
427                trace_destroy_output(writer);
428                trace_destroy(trace);
429                return 1;
430        }
431       
432        if (trace_start_output(writer)==-1) {
433                trace_perror_output(writer,"trace_start_output");
434                trace_destroy_output(writer);
435                trace_destroy(trace);
436                return 1;
437        }
438
439        sigact.sa_handler = cleanup_signal;
440        sigemptyset(&sigact.sa_mask);
441        sigact.sa_flags = SA_RESTART;
442
443        sigaction(SIGINT, &sigact, NULL);
444        sigaction(SIGTERM, &sigact, NULL);
445
446        // Wait for the trace to finish
447        trace_join(trace);
448       
449        //trace_destroy_packet(packet);
450        //print_contention_stats(trace);
451        trace_destroy(trace);
452        trace_destroy_output(writer);
453        return 0;
454}
Note: See TracBrowser for help on using the repository browser.