source: tools/traceanon/traceanon_parallel.c @ 17c5749

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 17c5749 was 9594cf9, checked in by Richard Sanger <rsangerarj@…>, 7 years ago

Remove sliding window option, this was very complicated to ensure thread safety. For now its best left out.

  • Property mode set to 100644
File size: 12.7 KB
Line 
1#define _GNU_SOURCE
2#include "libtrace.h"
3#include <stdio.h>
4#include <unistd.h>
5#include <stdlib.h>
6#include <getopt.h>
7#include <stdbool.h>
8#include <stddef.h>
9#include <string.h>
10#include <time.h>
11#include <assert.h>
12#include "ipenc.h"
13#include <data-struct/vector.h>
14#include <data-struct/message_queue.h>
15#include <signal.h>
16
17bool enc_source = false;
18bool enc_dest   = false;
19enum enc_type_t enc_type = ENC_NONE;
20char *key = NULL;
21
22
23struct libtrace_t *trace = NULL;
24
25static void cleanup_signal(int signal)
26{
27        static int s = 0;
28        (void)signal;
29        // trace_interrupt();
30        // trace_pstop isn't really signal safe because its got lots of locks in it
31        // trace_pstop(trace);
32        if (s == 0) {
33                if (trace_ppause(trace) == -1)
34                        trace_perror(trace, "Pause failed");
35        }
36        else {
37                if (trace_pstart(trace, NULL, NULL, NULL) == -1)
38                        trace_perror(trace, "Start failed");
39        }
40        s = !s;
41}
42
43
44
45static void usage(char *argv0)
46{
47        fprintf(stderr,"Usage:\n"
48        "%s flags inputfile outputfile\n"
49        "-s --encrypt-source    Encrypt the source addresses\n"
50        "-d --encrypt-dest      Encrypt the destination addresses\n"
51        "-c --cryptopan=key     Encrypt the addresses with the cryptopan\n"
52        "                       prefix preserving\n"
53        "-f --keyfile=file      A file containing the cryptopan key\n"
54        "-p --prefix=C.I.D.R/bits Substitute the prefix of the address\n"
55        "-H --libtrace-help     Print libtrace runtime documentation\n"
56        "-z --compress-level    Compress the output trace at the specified level\n"
57        "-Z --compress-type     Compress the output trace using the specified"
58        "                       compression algorithm\n"
59        ,argv0);
60        exit(1);
61}
62
63/* Incrementally update a checksum */
64static void update_in_cksum(uint16_t *csum, uint16_t old, uint16_t new)
65{
66        uint32_t sum = (~htons(*csum) & 0xFFFF) 
67                     + (~htons(old) & 0xFFFF) 
68                     + htons(new);
69        sum = (sum & 0xFFFF) + (sum >> 16);
70        *csum = htons(~(sum + (sum >> 16)));
71}
72
73static void update_in_cksum32(uint16_t *csum, uint32_t old, uint32_t new)
74{
75        update_in_cksum(csum,(uint16_t)(old>>16),(uint16_t)(new>>16));
76        update_in_cksum(csum,(uint16_t)(old&0xFFFF),(uint16_t)(new&0xFFFF));
77}
78
79/* Ok this is remarkably complicated
80 *
81 * We want to change one, or the other IP address, while preserving
82 * the checksum.  TCP and UDP both include the faux header in their
83 * checksum calculations, so you have to update them too.  ICMP is
84 * even worse -- it can include the original IP packet that caused the
85 * error!  So anonymise that too, but remember that it's travelling in
86 * the opposite direction so we need to encrypt the destination and
87 * source instead of the source and destination!
88 */
89static void encrypt_ips(struct libtrace_ip *ip,bool enc_source,bool enc_dest)
90{
91        struct libtrace_tcp *tcp;
92        struct libtrace_udp *udp;
93        struct libtrace_icmp *icmp;
94
95        tcp=trace_get_tcp_from_ip(ip,NULL);
96        udp=trace_get_udp_from_ip(ip,NULL);
97        icmp=trace_get_icmp_from_ip(ip,NULL);
98
99        if (enc_source) {
100                uint32_t old_ip=ip->ip_src.s_addr;
101                uint32_t new_ip=htonl(enc_ip(
102                                        htonl(ip->ip_src.s_addr)
103                                        ));
104                update_in_cksum32(&ip->ip_sum,old_ip,new_ip);
105                if (tcp) update_in_cksum32(&tcp->check,old_ip,new_ip);
106                if (udp) update_in_cksum32(&udp->check,old_ip,new_ip);
107                ip->ip_src.s_addr = new_ip;
108        }
109
110        if (enc_dest) {
111                uint32_t old_ip=ip->ip_dst.s_addr;
112                uint32_t new_ip=htonl(enc_ip(
113                                        htonl(ip->ip_dst.s_addr)
114                                        ));
115                update_in_cksum32(&ip->ip_sum,old_ip,new_ip);
116                if (tcp) update_in_cksum32(&tcp->check,old_ip,new_ip);
117                if (udp) update_in_cksum32(&udp->check,old_ip,new_ip);
118                ip->ip_dst.s_addr = new_ip;
119        }
120
121        if (icmp) {
122                /* These are error codes that return the IP packet
123                 * internally
124                 */
125               
126                if (icmp->type == 3 
127                                || icmp->type == 5 
128                                || icmp->type == 11) {
129                        char *ptr = (char *)icmp;
130                        encrypt_ips(
131                                (struct libtrace_ip*)(ptr+
132                                        sizeof(struct libtrace_icmp)),
133                                enc_dest,
134                                enc_source);
135                }
136
137                if (enc_source || enc_dest)
138                        icmp->checksum = 0;
139        }
140}
141
142
143static uint64_t bad_hash(libtrace_packet_t * pkt)
144{
145        return 0;
146}
147
148
149static uint64_t rand_hash(libtrace_packet_t * pkt)
150{
151        return rand();
152}
153
154
155static void* per_packet(libtrace_t *trace, libtrace_packet_t *pkt, libtrace_message_t *mesg, libtrace_thread_t *t)
156{
157        int i;
158       
159        if (pkt) {
160                struct libtrace_ip *ipptr;
161                libtrace_udp_t *udp = NULL;
162                libtrace_tcp_t *tcp = NULL;
163
164                ipptr = trace_get_ip(pkt);
165
166                if (ipptr && (enc_source || enc_dest)) {
167                        encrypt_ips(ipptr,enc_source,enc_dest);
168                        ipptr->ip_sum = 0;
169                }
170
171                /* Replace checksums so that IP encryption cannot be
172                 * reversed */
173
174                /* XXX replace with nice use of trace_get_transport() */
175
176                udp = trace_get_udp(pkt);
177                if (udp && (enc_source || enc_dest)) {
178                        udp->check = 0;
179                } 
180
181                tcp = trace_get_tcp(pkt);
182                if (tcp && (enc_source || enc_dest)) {
183                        tcp->check = 0;
184                }
185
186                /* TODO: Encrypt IP's in ARP packets */
187               
188                // Send our result keyed with the time
189                // Arg don't copy packets
190                //libtrace_packet_t * packet_copy = trace_copy_packet(packet);
191                //libtrace_packet_t * packet_copy = trace_result_packet(trace, pkt);
192                //trace_publish_result(trace, trace_packet_get_order(pkt), pkt);
193                trace_publish_packet(trace, pkt);
194                //return ;
195        }
196        if (mesg) {
197                // printf ("%d.%06d READ #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet));
198                switch (mesg->code) {
199                        case MESSAGE_STARTED:
200                                enc_init(enc_type,key);
201                }
202        }
203        return NULL;
204}
205
206int main(int argc, char *argv[]) 
207{
208        //struct libtrace_t *trace = 0;
209        struct libtrace_packet_t *packet/* = trace_create_packet()*/;
210        struct libtrace_out_t *writer = 0;
211        struct sigaction sigact;
212        char *output = 0;
213        int level = -1;
214        char *compress_type_str=NULL;
215        trace_option_compresstype_t compress_type = TRACE_OPTION_COMPRESSTYPE_NONE;
216
217
218        if (argc<2)
219                usage(argv[0]);
220
221        while (1) {
222                int option_index;
223                struct option long_options[] = {
224                        { "encrypt-source",     0, 0, 's' },
225                        { "encrypt-dest",       0, 0, 'd' },
226                        { "cryptopan",          1, 0, 'c' },
227                        { "cryptopan-file",     1, 0, 'f' },
228                        { "prefix",             1, 0, 'p' },
229                        { "compress-level",     1, 0, 'z' },
230                        { "compress-type",      1, 0, 'Z' },
231                        { "libtrace-help",      0, 0, 'H' },
232                        { NULL,                 0, 0, 0   },
233                };
234
235                int c=getopt_long(argc, argv, "Z:z:sc:f:dp:H",
236                                long_options, &option_index);
237
238                if (c==-1)
239                        break;
240
241                switch (c) {
242                        case 'Z': compress_type_str=optarg; break;         
243                        case 'z': level = atoi(optarg); break;
244                        case 's': enc_source=true; break;
245                        case 'd': enc_dest  =true; break;
246                        case 'c':
247                                  if (key!=NULL) {
248                                          fprintf(stderr,"You can only have one encryption type and one key\n");
249                                          usage(argv[0]);
250                                  }
251                                  key=strdup(optarg);
252                                  enc_type = ENC_CRYPTOPAN;
253                                  break;
254                        case 'f':
255                                  if(key != NULL) {
256                                    fprintf(stderr,"You can only have one encryption type and one key\n");
257                                    usage(argv[0]);
258                                  }
259                                  FILE * infile = fopen(optarg,"rb");
260                                  if(infile == NULL) {
261                                    perror("Failed to open cryptopan keyfile");
262                                    return 1;
263                                  }
264                                  key = (char *) malloc(sizeof(char *) * 32);
265                                  if(fread(key,1,32,infile) != 32) {
266                                    if(ferror(infile)) {
267                                      perror("Failed while reading cryptopan keyfile");
268                                    }
269                                  }
270                                  fclose(infile);
271                                  enc_type = ENC_CRYPTOPAN;
272                                  break;
273                        case 'p':
274                                  if (key!=NULL) {
275                                          fprintf(stderr,"You can only have one encryption type and one key\n");
276                                          usage(argv[0]);
277                                  }
278                                  key=strdup(optarg);
279                                  enc_type = ENC_PREFIX_SUBSTITUTION;
280                                  break;
281                        case 'H':
282                                  trace_help(); 
283                                  exit(1); 
284                                  break;
285                        default:
286                                fprintf(stderr,"unknown option: %c\n",c);
287                                usage(argv[0]);
288
289                }
290
291        }
292
293        if (compress_type_str == NULL && level >= 0) {
294                fprintf(stderr, "Compression level set, but no compression type was defined, setting to gzip\n");
295                compress_type = TRACE_OPTION_COMPRESSTYPE_ZLIB;
296        }
297
298        else if (compress_type_str == NULL) {
299                /* If a level or type is not specified, use the "none"
300                 * compression module */
301                compress_type = TRACE_OPTION_COMPRESSTYPE_NONE;
302        }
303
304        /* I decided to be fairly generous in what I accept for the
305         * compression type string */
306        else if (strncmp(compress_type_str, "gz", 2) == 0 ||
307                        strncmp(compress_type_str, "zlib", 4) == 0) {
308                compress_type = TRACE_OPTION_COMPRESSTYPE_ZLIB;
309        } else if (strncmp(compress_type_str, "bz", 2) == 0) {
310                compress_type = TRACE_OPTION_COMPRESSTYPE_BZ2;
311        } else if (strncmp(compress_type_str, "lzo", 3) == 0) {
312                compress_type = TRACE_OPTION_COMPRESSTYPE_LZO;
313        } else if (strncmp(compress_type_str, "no", 2) == 0) {
314                compress_type = TRACE_OPTION_COMPRESSTYPE_NONE;
315        } else {
316                fprintf(stderr, "Unknown compression type: %s\n",
317                        compress_type_str);
318                return 1;
319        }
320       
321
322       
323
324        /* open input uri */
325        trace = trace_create(argv[optind]);
326        if (trace_is_err(trace)) {
327                trace_perror(trace,"trace_create");
328                trace_destroy(trace);
329                return 1;
330        }
331
332        if (optind +1>= argc) {
333                /* no output specified, output in same format to
334                 * stdout
335                 */
336                output = strdup("erf:-");
337                writer = trace_create_output(output);
338        } else {
339                writer = trace_create_output(argv[optind +1]);
340        }
341        if (trace_is_err_output(writer)) {
342                trace_perror_output(writer,"trace_create_output");
343                trace_destroy_output(writer);
344                trace_destroy(trace);
345                return 1;
346        }
347       
348        /* Hopefully this will deal nicely with people who want to crank the
349         * compression level up to 11 :) */
350        if (level > 9) {
351                fprintf(stderr, "WARNING: Compression level > 9 specified, setting to 9 instead\n");
352                level = 9;
353        }
354
355        if (level >= 0 && trace_config_output(writer, 
356                        TRACE_OPTION_OUTPUT_COMPRESS, &level) == -1) {
357                trace_perror_output(writer, "Configuring compression level");
358                trace_destroy_output(writer);
359                trace_destroy(trace);
360                return 1;
361        }
362
363        if (trace_config_output(writer, TRACE_OPTION_OUTPUT_COMPRESSTYPE,
364                                &compress_type) == -1) {
365                trace_perror_output(writer, "Configuring compression type");
366                trace_destroy_output(writer);
367                trace_destroy(trace);
368                return 1;
369        }
370
371        // OK parallel changes start here
372
373        /* Set a special mode flag that means the output is timestamped
374         * and ordered before its read into reduce. Seems like a good
375         * special case to have.
376         */
377         
378        int i = 1;
379        trace_parallel_config(trace, TRACE_OPTION_SEQUENTIAL, &i);
380        //trace_parallel_config(trace, TRACE_OPTION_SET_PERPKT_BUFFER_SIZE, &i);
381        i = 2;
382        trace_parallel_config(trace, TRACE_OPTION_SET_PERPKT_THREAD_COUNT, &i);
383       
384        if (trace_pstart(trace, NULL, &per_packet, NULL)==-1) {
385                trace_perror(trace,"trace_start");
386                trace_destroy_output(writer);
387                trace_destroy(trace);
388                return 1;
389        }
390       
391        if (trace_start_output(writer)==-1) {
392                trace_perror_output(writer,"trace_start_output");
393                trace_destroy_output(writer);
394                trace_destroy(trace);
395                return 1;
396        }
397
398        sigact.sa_handler = cleanup_signal;
399        sigemptyset(&sigact.sa_mask);
400        sigact.sa_flags = SA_RESTART;
401
402        sigaction(SIGINT, &sigact, NULL);
403        sigaction(SIGTERM, &sigact, NULL);
404       
405        // Read in the resulting packets and then free them when done
406        libtrace_vector_t res;
407        int res_size = 0;
408        libtrace_vector_init(&res, sizeof(libtrace_result_t));
409        uint64_t packet_count = 0;
410        while (!trace_finished(trace)) {
411                // Read messages
412                libtrace_message_t message;
413               
414                // We just release and do work currently, maybe if something
415                // interesting comes through we'd deal with that
416                libtrace_thread_get_message(trace, &message);
417               
418                while (libtrace_thread_try_get_message(trace, &message) != LIBTRACE_MQ_FAILED) { }
419               
420                if ((res_size = trace_get_results(trace, &res)) == 0)
421                        ;/*sched_yield();*/
422               
423                for (i = 0 ; i < res_size ; i++) {
424                        libtrace_result_t result;
425                        assert(libtrace_vector_get(&res, i, (void *) &result) == 1);
426                        packet = libtrace_result_get_value(&result);
427                        assert(libtrace_result_get_key(&result) == packet_count);
428                        packet_count++;
429                        if (trace_write_packet(writer,packet)==-1) {
430                                trace_perror_output(writer,"writer");
431                                trace_interrupt();
432                                break;
433                        }
434                        //trace_destroy_packet(packet);
435                        trace_free_result_packet(trace, packet);
436                }
437        }
438        trace_join(trace);
439       
440        // Grab everything that's left here
441        res_size = trace_get_results(trace, &res);
442       
443        for (i = 0 ; i < res_size ; i++) {
444                libtrace_result_t result;
445                assert(libtrace_vector_get(&res, i, (void *) &result) == 1);
446                packet = libtrace_result_get_value(&result);
447                if (libtrace_result_get_key(&result) != packet_count)
448                        printf ("Got a %"PRIu64" but expected a %"PRIu64" %d\n", libtrace_result_get_key(&result), packet_count, res_size);
449                assert(libtrace_result_get_key(&result) == packet_count);
450               
451                packet_count++;
452                if (trace_write_packet(writer,packet)==-1) {
453                        trace_perror_output(writer,"writer");
454                        trace_interrupt();
455                        break;
456                }
457                trace_destroy_packet(packet);
458        }
459        libtrace_vector_destroy(&res);
460       
461        //trace_destroy_packet(packet);
462        //print_contention_stats(trace);
463        trace_destroy(trace);
464        trace_destroy_output(writer);
465        return 0;
466}
Note: See TracBrowser for help on using the repository browser.