source: tools/traceanon/traceanon_parallel.c @ 29bbef0

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 29bbef0 was 29bbef0, checked in by Richard Sanger <rsangerarj@…>, 7 years ago

My work from over summer, with a few things tidied up and updated to include recent commits/patches to bring this up to date. Still very much work in progress.

  • Property mode set to 100644
File size: 12.1 KB
Line 
1#define _GNU_SOURCE
2#include "libtrace.h"
3#include <stdio.h>
4#include <unistd.h>
5#include <stdlib.h>
6#include <getopt.h>
7#include <stdbool.h>
8#include <stddef.h>
9#include <string.h>
10#include <time.h>
11#include "ipenc.h"
12#include <trace_vector.h>
13
14bool enc_source = false;
15bool enc_dest   = false;
16enum enc_type_t enc_type = ENC_NONE;
17char *key = NULL;
18
19static void usage(char *argv0)
20{
21        fprintf(stderr,"Usage:\n"
22        "%s flags inputfile outputfile\n"
23        "-s --encrypt-source    Encrypt the source addresses\n"
24        "-d --encrypt-dest      Encrypt the destination addresses\n"
25        "-c --cryptopan=key     Encrypt the addresses with the cryptopan\n"
26        "                       prefix preserving\n"
27        "-f --keyfile=file      A file containing the cryptopan key\n"
28        "-p --prefix=C.I.D.R/bits Substitute the prefix of the address\n"
29        "-H --libtrace-help     Print libtrace runtime documentation\n"
30        "-z --compress-level    Compress the output trace at the specified level\n"
31        "-Z --compress-type     Compress the output trace using the specified"
32        "                       compression algorithm\n"
33        ,argv0);
34        exit(1);
35}
36
37/* Incrementally update a checksum */
38static void update_in_cksum(uint16_t *csum, uint16_t old, uint16_t new)
39{
40        uint32_t sum = (~htons(*csum) & 0xFFFF) 
41                     + (~htons(old) & 0xFFFF) 
42                     + htons(new);
43        sum = (sum & 0xFFFF) + (sum >> 16);
44        *csum = htons(~(sum + (sum >> 16)));
45}
46
47static void update_in_cksum32(uint16_t *csum, uint32_t old, uint32_t new)
48{
49        update_in_cksum(csum,(uint16_t)(old>>16),(uint16_t)(new>>16));
50        update_in_cksum(csum,(uint16_t)(old&0xFFFF),(uint16_t)(new&0xFFFF));
51}
52
53/* Ok this is remarkably complicated
54 *
55 * We want to change one, or the other IP address, while preserving
56 * the checksum.  TCP and UDP both include the faux header in their
57 * checksum calculations, so you have to update them too.  ICMP is
58 * even worse -- it can include the original IP packet that caused the
59 * error!  So anonymise that too, but remember that it's travelling in
60 * the opposite direction so we need to encrypt the destination and
61 * source instead of the source and destination!
62 */
63static void encrypt_ips(struct libtrace_ip *ip,bool enc_source,bool enc_dest)
64{
65        struct libtrace_tcp *tcp;
66        struct libtrace_udp *udp;
67        struct libtrace_icmp *icmp;
68
69        tcp=trace_get_tcp_from_ip(ip,NULL);
70        udp=trace_get_udp_from_ip(ip,NULL);
71        icmp=trace_get_icmp_from_ip(ip,NULL);
72
73        if (enc_source) {
74                uint32_t old_ip=ip->ip_src.s_addr;
75                uint32_t new_ip=htonl(enc_ip(
76                                        htonl(ip->ip_src.s_addr)
77                                        ));
78                update_in_cksum32(&ip->ip_sum,old_ip,new_ip);
79                if (tcp) update_in_cksum32(&tcp->check,old_ip,new_ip);
80                if (udp) update_in_cksum32(&udp->check,old_ip,new_ip);
81                ip->ip_src.s_addr = new_ip;
82        }
83
84        if (enc_dest) {
85                uint32_t old_ip=ip->ip_dst.s_addr;
86                uint32_t new_ip=htonl(enc_ip(
87                                        htonl(ip->ip_dst.s_addr)
88                                        ));
89                update_in_cksum32(&ip->ip_sum,old_ip,new_ip);
90                if (tcp) update_in_cksum32(&tcp->check,old_ip,new_ip);
91                if (udp) update_in_cksum32(&udp->check,old_ip,new_ip);
92                ip->ip_dst.s_addr = new_ip;
93        }
94
95        if (icmp) {
96                /* These are error codes that return the IP packet
97                 * internally
98                 */
99               
100                if (icmp->type == 3 
101                                || icmp->type == 5 
102                                || icmp->type == 11) {
103                        char *ptr = (char *)icmp;
104                        encrypt_ips(
105                                (struct libtrace_ip*)(ptr+
106                                        sizeof(struct libtrace_icmp)),
107                                enc_dest,
108                                enc_source);
109                }
110
111                if (enc_source || enc_dest)
112                        icmp->checksum = 0;
113        }
114}
115
116
117static uint64_t bad_hash(libtrace_packet_t * pkt)
118{
119        return 0;
120}
121
122
123static uint64_t rand_hash(libtrace_packet_t * pkt)
124{
125        return rand();
126}
127
128
129static void* per_packet(libtrace_t *trace, libtrace_packet_t *pkt, libtrace_message_t *mesg, libtrace_thread_t *t)
130{
131        int i;
132       
133        if (pkt) {
134                struct libtrace_ip *ipptr;
135                libtrace_udp_t *udp = NULL;
136                libtrace_tcp_t *tcp = NULL;
137
138                ipptr = trace_get_ip(pkt);
139
140                if (ipptr && (enc_source || enc_dest)) {
141                        encrypt_ips(ipptr,enc_source,enc_dest);
142                        ipptr->ip_sum = 0;
143                }
144
145                /* Replace checksums so that IP encryption cannot be
146                 * reversed */
147
148                /* XXX replace with nice use of trace_get_transport() */
149
150                udp = trace_get_udp(pkt);
151                if (udp && (enc_source || enc_dest)) {
152                        udp->check = 0;
153                } 
154
155                tcp = trace_get_tcp(pkt);
156                if (tcp && (enc_source || enc_dest)) {
157                        tcp->check = 0;
158                }
159
160                /* TODO: Encrypt IP's in ARP packets */
161               
162                // Send our result keyed with the time
163                // Arg don't copy packets
164                //libtrace_packet_t * packet_copy = trace_copy_packet(packet);
165                //libtrace_packet_t * packet_copy = trace_result_packet(trace, pkt);
166                trace_publish_result(trace, trace_packet_get_order(pkt), pkt);
167                //return ;
168        }
169        if (mesg) {
170                // printf ("%d.%06d READ #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet));
171                switch (mesg->code) {
172                        case MESSAGE_STARTED:
173                                enc_init(enc_type,key);
174                }
175        }
176        return NULL;
177}
178
179int main(int argc, char *argv[]) 
180{
181        struct libtrace_t *trace = 0;
182        struct libtrace_packet_t *packet/* = trace_create_packet()*/;
183        struct libtrace_out_t *writer = 0;
184        char *output = 0;
185        int level = -1;
186        char *compress_type_str=NULL;
187        trace_option_compresstype_t compress_type = TRACE_OPTION_COMPRESSTYPE_NONE;
188
189
190        if (argc<2)
191                usage(argv[0]);
192
193        while (1) {
194                int option_index;
195                struct option long_options[] = {
196                        { "encrypt-source",     0, 0, 's' },
197                        { "encrypt-dest",       0, 0, 'd' },
198                        { "cryptopan",          1, 0, 'c' },
199                        { "cryptopan-file",     1, 0, 'f' },
200                        { "prefix",             1, 0, 'p' },
201                        { "compress-level",     1, 0, 'z' },
202                        { "compress-type",      1, 0, 'Z' },
203                        { "libtrace-help",      0, 0, 'H' },
204                        { NULL,                 0, 0, 0   },
205                };
206
207                int c=getopt_long(argc, argv, "Z:z:sc:f:dp:H",
208                                long_options, &option_index);
209
210                if (c==-1)
211                        break;
212
213                switch (c) {
214                        case 'Z': compress_type_str=optarg; break;         
215                        case 'z': level = atoi(optarg); break;
216                        case 's': enc_source=true; break;
217                        case 'd': enc_dest  =true; break;
218                        case 'c':
219                                  if (key!=NULL) {
220                                          fprintf(stderr,"You can only have one encryption type and one key\n");
221                                          usage(argv[0]);
222                                  }
223                                  key=strdup(optarg);
224                                  enc_type = ENC_CRYPTOPAN;
225                                  break;
226                        case 'f':
227                                  if(key != NULL) {
228                                    fprintf(stderr,"You can only have one encryption type and one key\n");
229                                    usage(argv[0]);
230                                  }
231                                  FILE * infile = fopen(optarg,"rb");
232                                  if(infile == NULL) {
233                                    perror("Failed to open cryptopan keyfile");
234                                    return 1;
235                                  }
236                                  key = (char *) malloc(sizeof(char *) * 32);
237                                  if(fread(key,1,32,infile) != 32) {
238                                    if(ferror(infile)) {
239                                      perror("Failed while reading cryptopan keyfile");
240                                    }
241                                  }
242                                  fclose(infile);
243                                  enc_type = ENC_CRYPTOPAN;
244                                  break;
245                        case 'p':
246                                  if (key!=NULL) {
247                                          fprintf(stderr,"You can only have one encryption type and one key\n");
248                                          usage(argv[0]);
249                                  }
250                                  key=strdup(optarg);
251                                  enc_type = ENC_PREFIX_SUBSTITUTION;
252                                  break;
253                        case 'H':
254                                  trace_help(); 
255                                  exit(1); 
256                                  break;
257                        default:
258                                fprintf(stderr,"unknown option: %c\n",c);
259                                usage(argv[0]);
260
261                }
262
263        }
264
265        if (compress_type_str == NULL && level >= 0) {
266                fprintf(stderr, "Compression level set, but no compression type was defined, setting to gzip\n");
267                compress_type = TRACE_OPTION_COMPRESSTYPE_ZLIB;
268        }
269
270        else if (compress_type_str == NULL) {
271                /* If a level or type is not specified, use the "none"
272                 * compression module */
273                compress_type = TRACE_OPTION_COMPRESSTYPE_NONE;
274        }
275
276        /* I decided to be fairly generous in what I accept for the
277         * compression type string */
278        else if (strncmp(compress_type_str, "gz", 2) == 0 ||
279                        strncmp(compress_type_str, "zlib", 4) == 0) {
280                compress_type = TRACE_OPTION_COMPRESSTYPE_ZLIB;
281        } else if (strncmp(compress_type_str, "bz", 2) == 0) {
282                compress_type = TRACE_OPTION_COMPRESSTYPE_BZ2;
283        } else if (strncmp(compress_type_str, "lzo", 3) == 0) {
284                compress_type = TRACE_OPTION_COMPRESSTYPE_LZO;
285        } else if (strncmp(compress_type_str, "no", 2) == 0) {
286                compress_type = TRACE_OPTION_COMPRESSTYPE_NONE;
287        } else {
288                fprintf(stderr, "Unknown compression type: %s\n",
289                        compress_type_str);
290                return 1;
291        }
292       
293
294       
295
296        /* open input uri */
297        trace = trace_create(argv[optind]);
298        if (trace_is_err(trace)) {
299                trace_perror(trace,"trace_create");
300                trace_destroy(trace);
301                return 1;
302        }
303
304        if (optind +1>= argc) {
305                /* no output specified, output in same format to
306                 * stdout
307                 */
308                output = strdup("erf:-");
309                writer = trace_create_output(output);
310        } else {
311                writer = trace_create_output(argv[optind +1]);
312        }
313        if (trace_is_err_output(writer)) {
314                trace_perror_output(writer,"trace_create_output");
315                trace_destroy_output(writer);
316                trace_destroy(trace);
317                return 1;
318        }
319       
320        /* Hopefully this will deal nicely with people who want to crank the
321         * compression level up to 11 :) */
322        if (level > 9) {
323                fprintf(stderr, "WARNING: Compression level > 9 specified, setting to 9 instead\n");
324                level = 9;
325        }
326
327        if (level >= 0 && trace_config_output(writer, 
328                        TRACE_OPTION_OUTPUT_COMPRESS, &level) == -1) {
329                trace_perror_output(writer, "Configuring compression level");
330                trace_destroy_output(writer);
331                trace_destroy(trace);
332                return 1;
333        }
334
335        if (trace_config_output(writer, TRACE_OPTION_OUTPUT_COMPRESSTYPE,
336                                &compress_type) == -1) {
337                trace_perror_output(writer, "Configuring compression type");
338                trace_destroy_output(writer);
339                trace_destroy(trace);
340                return 1;
341        }
342
343        // OK parallel changes start here
344
345        /* Set a special mode flag that means the output is timestamped
346         * and ordered before its read into reduce. Seems like a good
347         * special case to have.
348         */
349         
350        int i = 1;
351        trace_parallel_config(trace, TRACE_OPTION_SEQUENTIAL, &i);
352        //trace_parallel_config(trace, TRACE_OPTION_SET_MAPPER_BUFFER_SIZE, &i);
353        //trace_parallel_config(trace, TRACE_OPTION_USE_DEDICATED_HASHER, &i);
354        //trace_parallel_config(trace, TRACE_OPTION_USE_SLIDING_WINDOW_BUFFER, &i);
355        i = 2;
356        trace_parallel_config(trace, TRACE_OPTION_SET_MAPPER_THREAD_COUNT, &i);
357       
358        if (trace_pstart(trace, NULL, &per_packet, NULL)==-1) {
359                trace_perror(trace,"trace_start");
360                trace_destroy_output(writer);
361                trace_destroy(trace);
362                return 1;
363        }
364       
365        if (trace_start_output(writer)==-1) {
366                trace_perror_output(writer,"trace_start_output");
367                trace_destroy_output(writer);
368                trace_destroy(trace);
369                return 1;
370        }
371       
372        // Read in the resulting packets and then free them when done
373        libtrace_vector_t res;
374        int res_size = 0;
375        libtrace_vector_init(&res, sizeof(libtrace_result_t));
376        uint64_t packet_count = 0;
377        while (!trace_finished(trace)) {
378                // Read messages
379                libtrace_message_t message;
380               
381                // We just release and do work currently, maybe if something
382                // interesting comes through we'd deal with that
383                libtrace_thread_get_message(trace, &message);
384               
385                while (libtrace_thread_try_get_message(trace, &message) != LIBTRACE_MQ_FAILED) { }
386               
387                if ((res_size = trace_get_results(trace, &res)) == 0)
388                        ;/*sched_yield();*/
389               
390                for (i = 0 ; i < res_size ; i++) {
391                        libtrace_result_t result;
392                        assert(libtrace_vector_get(&res, i, (void *) &result) == 1);
393                        packet = libtrace_result_get_value(&result);
394                        assert(libtrace_result_get_key(&result) == packet_count);
395                        packet_count++;
396                        if (trace_write_packet(writer,packet)==-1) {
397                                trace_perror_output(writer,"writer");
398                                trace_interrupt();
399                                break;
400                        }
401                        //trace_destroy_packet(packet);
402                        trace_free_result_packet(trace, packet);
403                }
404        }
405        trace_join(trace);
406       
407        // Grab everything that's left here
408        res_size = trace_get_results(trace, &res);
409       
410        for (i = 0 ; i < res_size ; i++) {
411                libtrace_result_t result;
412                assert(libtrace_vector_get(&res, i, (void *) &result) == 1);
413                packet = libtrace_result_get_value(&result);
414                if (libtrace_result_get_key(&result) != packet_count)
415                        printf ("Got a %"PRIu64" but expected a %"PRIu64" %d\n", libtrace_result_get_key(&result), packet_count, res_size);
416                assert(libtrace_result_get_key(&result) == packet_count);
417               
418                packet_count++;
419                if (trace_write_packet(writer,packet)==-1) {
420                        trace_perror_output(writer,"writer");
421                        trace_interrupt();
422                        break;
423                }
424                trace_destroy_packet(packet);
425        }
426        libtrace_vector_destroy(&res);
427       
428        //trace_destroy_packet(packet);
429        //print_contention_stats(trace);
430        trace_destroy(trace);
431        trace_destroy_output(writer);
432        return 0;
433}
Note: See TracBrowser for help on using the repository browser.