source: tools/traceanon/traceanon_parallel.c @ 6a6e6a8

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 6a6e6a8 was 6a6e6a8, checked in by Richard Sanger <rsangerarj@…>, 6 years ago

More documentation, including some renaming and modifications to behaviour

  • Removes accessor functions for libtrace_result_t, instead directly access the structure
  • Documentation for most functions
  • Split tick into interval and count messages for the two modes of operation
  • Normalise interval and packet order to use the erf timestamp format
  • Rename trace_send_message_to_XXX to trace trace_message_XXX
  • Property mode set to 100644
File size: 12.7 KB
Line 
1#define _GNU_SOURCE
2#include "libtrace_parallel.h"
3#include "data-struct/vector.h"
4#include "data-struct/message_queue.h"
5#include "combiners.h"
6#include <stdio.h>
7#include <unistd.h>
8#include <stdlib.h>
9#include <getopt.h>
10#include <stdbool.h>
11#include <stddef.h>
12#include <string.h>
13#include <time.h>
14#include <assert.h>
15#include "ipenc.h"
16#include <signal.h>
17
18bool enc_source = false;
19bool enc_dest   = false;
20enum enc_type_t enc_type = ENC_NONE;
21char *key = NULL;
22
23
24struct libtrace_t *trace = NULL;
25
26static void cleanup_signal(int signal)
27{
28        static int s = 0;
29        (void)signal;
30    //trace_interrupt();
31        // trace_pstop isn't really signal safe because its got lots of locks in it
32    trace_pstop(trace);
33    /*if (s == 0) {
34                if (trace_ppause(trace) == -1)
35                        trace_perror(trace, "Pause failed");
36        }
37        else {
38                if (trace_pstart(trace, NULL, NULL, NULL) == -1)
39                        trace_perror(trace, "Start failed");
40    }*/
41        s = !s;
42}
43
44
45
46static void usage(char *argv0)
47{
48        fprintf(stderr,"Usage:\n"
49        "%s flags inputfile outputfile\n"
50        "-s --encrypt-source    Encrypt the source addresses\n"
51        "-d --encrypt-dest      Encrypt the destination addresses\n"
52        "-c --cryptopan=key     Encrypt the addresses with the cryptopan\n"
53        "                       prefix preserving\n"
54        "-f --keyfile=file      A file containing the cryptopan key\n"
55        "-p --prefix=C.I.D.R/bits Substitute the prefix of the address\n"
56        "-H --libtrace-help     Print libtrace runtime documentation\n"
57        "-z --compress-level    Compress the output trace at the specified level\n"
58        "-Z --compress-type     Compress the output trace using the specified"
59        "                       compression algorithm\n"
60        ,argv0);
61        exit(1);
62}
63
64/* Incrementally update a checksum */
65static void update_in_cksum(uint16_t *csum, uint16_t old, uint16_t new)
66{
67        uint32_t sum = (~htons(*csum) & 0xFFFF) 
68                     + (~htons(old) & 0xFFFF) 
69                     + htons(new);
70        sum = (sum & 0xFFFF) + (sum >> 16);
71        *csum = htons(~(sum + (sum >> 16)));
72}
73
74static void update_in_cksum32(uint16_t *csum, uint32_t old, uint32_t new)
75{
76        update_in_cksum(csum,(uint16_t)(old>>16),(uint16_t)(new>>16));
77        update_in_cksum(csum,(uint16_t)(old&0xFFFF),(uint16_t)(new&0xFFFF));
78}
79
80/* Ok this is remarkably complicated
81 *
82 * We want to change one, or the other IP address, while preserving
83 * the checksum.  TCP and UDP both include the faux header in their
84 * checksum calculations, so you have to update them too.  ICMP is
85 * even worse -- it can include the original IP packet that caused the
86 * error!  So anonymise that too, but remember that it's travelling in
87 * the opposite direction so we need to encrypt the destination and
88 * source instead of the source and destination!
89 */
90static void encrypt_ips(struct libtrace_ip *ip,bool enc_source,bool enc_dest)
91{
92        struct libtrace_tcp *tcp;
93        struct libtrace_udp *udp;
94        struct libtrace_icmp *icmp;
95
96        tcp=trace_get_tcp_from_ip(ip,NULL);
97        udp=trace_get_udp_from_ip(ip,NULL);
98        icmp=trace_get_icmp_from_ip(ip,NULL);
99
100        if (enc_source) {
101                uint32_t old_ip=ip->ip_src.s_addr;
102                uint32_t new_ip=htonl(enc_ip(
103                                        htonl(ip->ip_src.s_addr)
104                                        ));
105                update_in_cksum32(&ip->ip_sum,old_ip,new_ip);
106                if (tcp) update_in_cksum32(&tcp->check,old_ip,new_ip);
107                if (udp) update_in_cksum32(&udp->check,old_ip,new_ip);
108                ip->ip_src.s_addr = new_ip;
109        }
110
111        if (enc_dest) {
112                uint32_t old_ip=ip->ip_dst.s_addr;
113                uint32_t new_ip=htonl(enc_ip(
114                                        htonl(ip->ip_dst.s_addr)
115                                        ));
116                update_in_cksum32(&ip->ip_sum,old_ip,new_ip);
117                if (tcp) update_in_cksum32(&tcp->check,old_ip,new_ip);
118                if (udp) update_in_cksum32(&udp->check,old_ip,new_ip);
119                ip->ip_dst.s_addr = new_ip;
120        }
121
122        if (icmp) {
123                /* These are error codes that return the IP packet
124                 * internally
125                 */
126               
127                if (icmp->type == 3 
128                                || icmp->type == 5 
129                                || icmp->type == 11) {
130                        char *ptr = (char *)icmp;
131                        encrypt_ips(
132                                (struct libtrace_ip*)(ptr+
133                                        sizeof(struct libtrace_icmp)),
134                                enc_dest,
135                                enc_source);
136                }
137
138                if (enc_source || enc_dest)
139                        icmp->checksum = 0;
140        }
141}
142
143
144UNUSED static uint64_t bad_hash(UNUSED libtrace_packet_t * pkt)
145{
146        return 0;
147}
148
149
150UNUSED static uint64_t rand_hash(UNUSED libtrace_packet_t * pkt)
151{
152        return rand();
153}
154
155
156static void* per_packet(libtrace_t *trace, libtrace_thread_t *t,
157                        int mesg, libtrace_generic_t data,
158                        libtrace_thread_t *sender UNUSED)
159{
160        struct libtrace_ip *ipptr;
161        libtrace_udp_t *udp = NULL;
162        libtrace_tcp_t *tcp = NULL;
163        libtrace_stat_t *stats = NULL;
164        switch (mesg) {
165        case MESSAGE_PACKET:
166                ipptr = trace_get_ip(data.pkt);
167
168                if (ipptr && (enc_source || enc_dest)) {
169                        encrypt_ips(ipptr,enc_source,enc_dest);
170                        ipptr->ip_sum = 0;
171                }
172
173                /* Replace checksums so that IP encryption cannot be
174                 * reversed */
175
176                /* XXX replace with nice use of trace_get_transport() */
177
178                udp = trace_get_udp(data.pkt);
179                if (udp && (enc_source || enc_dest)) {
180                        udp->check = 0;
181                } 
182
183                tcp = trace_get_tcp(data.pkt);
184                if (tcp && (enc_source || enc_dest)) {
185                        tcp->check = 0;
186                }
187
188                /* TODO: Encrypt IP's in ARP packets */
189               
190                // Send our result keyed with the time
191                // Arg don't copy packets
192                //libtrace_packet_t * packet_copy = trace_copy_packet(packet);
193                //libtrace_packet_t * packet_copy = trace_result_packet(trace, pkt);
194                //trace_publish_result(trace, trace_packet_get_order(pkt), pkt);
195
196                trace_publish_result(trace, t, trace_packet_get_order(data.pkt), data, RESULT_PACKET);
197                break;
198        case MESSAGE_STARTING:
199                enc_init(enc_type,key);
200                break;
201        case MESSAGE_TICK_INTERVAL:
202                trace_publish_result(trace, t, data.uint64, (libtrace_generic_t){0}, RESULT_TICK_INTERVAL);
203                break;
204        case MESSAGE_TICK_COUNT:
205                trace_publish_result(trace, t, data.uint64, (libtrace_generic_t){0}, RESULT_TICK_COUNT);
206                break;
207        case MESSAGE_STOPPING:
208                stats = trace_create_statistics();
209                trace_get_thread_statistics(trace, t, stats);
210                trace_print_statistics(stats, stderr, NULL);
211                free(stats);
212                stats = trace_get_statistics(trace, NULL);
213                trace_print_statistics(stats, stderr, NULL);
214                //fprintf(stderr, "tracestats_parallel:\t Stopping thread - publishing results\n");
215                break;
216        }
217        return NULL;
218}
219
220struct libtrace_out_t *writer = 0;
221
222static void write_out(libtrace_t *trace UNUSED, int mesg,
223                      libtrace_generic_t data,
224                      libtrace_thread_t *sender UNUSED) {
225        static uint64_t packet_count = 0; // TESTING PURPOSES, this is not going to work with a live format
226
227        switch (mesg) {
228        case MESSAGE_RESULT:
229                if (data.res->type == RESULT_PACKET) {
230                        libtrace_packet_t *packet = (libtrace_packet_t*) data.res->value.pkt;
231                        assert(data.res->key >= packet_count);
232                        packet_count = data.res->key;
233                        if (trace_write_packet(writer,packet)==-1) {
234                                trace_perror_output(writer,"writer");
235                                trace_interrupt();
236                        }
237                        trace_free_packet(trace, packet);
238
239                } else {
240                        assert(data.res->type == RESULT_TICK_COUNT || data.res->type == RESULT_TICK_INTERVAL);
241                        // Ignore it
242                }
243        }
244}
245
246
247int main(int argc, char *argv[]) 
248{
249        //struct libtrace_t *trace = 0;
250        struct sigaction sigact;
251        char *output = 0;
252        int level = -1;
253        char *compress_type_str=NULL;
254        trace_option_compresstype_t compress_type = TRACE_OPTION_COMPRESSTYPE_NONE;
255        struct user_configuration uc;
256        ZERO_USER_CONFIG(uc);
257
258        if (argc<2)
259                usage(argv[0]);
260
261        while (1) {
262                int option_index;
263                struct option long_options[] = {
264                        { "encrypt-source",     0, 0, 's' },
265                        { "encrypt-dest",       0, 0, 'd' },
266                        { "cryptopan",          1, 0, 'c' },
267                        { "cryptopan-file",     1, 0, 'f' },
268                        { "prefix",             1, 0, 'p' },
269                        { "compress-level",     1, 0, 'z' },
270                        { "compress-type",      1, 0, 'Z' },
271                        { "libtrace-help",      0, 0, 'H' },
272                        { "config",             1, 0, 'u' },
273                    { "config-file",            1, 0, 'U' },
274                        { NULL,                 0, 0, 0   },
275                };
276
277                int c=getopt_long(argc, argv, "Z:z:sc:f:dp:Hu:U:",
278                                long_options, &option_index);
279
280                if (c==-1)
281                        break;
282
283                switch (c) {
284                        case 'Z': compress_type_str=optarg; break;         
285                        case 'z': level = atoi(optarg); break;
286                        case 's': enc_source=true; break;
287                        case 'd': enc_dest  =true; break;
288                        case 'c':
289                                  if (key!=NULL) {
290                                          fprintf(stderr,"You can only have one encryption type and one key\n");
291                                          usage(argv[0]);
292                                  }
293                                  key=strdup(optarg);
294                                  enc_type = ENC_CRYPTOPAN;
295                                  break;
296                        case 'f':
297                                  if(key != NULL) {
298                                    fprintf(stderr,"You can only have one encryption type and one key\n");
299                                    usage(argv[0]);
300                                  }
301                                  FILE * infile = fopen(optarg,"rb");
302                                  if(infile == NULL) {
303                                    perror("Failed to open cryptopan keyfile");
304                                    return 1;
305                                  }
306                                  key = (char *) malloc(sizeof(char *) * 32);
307                                  if(fread(key,1,32,infile) != 32) {
308                                    if(ferror(infile)) {
309                                      perror("Failed while reading cryptopan keyfile");
310                                    }
311                                  }
312                                  fclose(infile);
313                                  enc_type = ENC_CRYPTOPAN;
314                                  break;
315                        case 'p':
316                                  if (key!=NULL) {
317                                          fprintf(stderr,"You can only have one encryption type and one key\n");
318                                          usage(argv[0]);
319                                  }
320                                  key=strdup(optarg);
321                                  enc_type = ENC_PREFIX_SUBSTITUTION;
322                                  break;
323                        case 'H':
324                                  trace_help(); 
325                                  exit(1); 
326                                  break;
327                        case 'u':
328                                  parse_user_config(&uc, optarg);
329                                  break;
330                        case 'U':;
331                                FILE * f = fopen(optarg, "r");
332                                if (f != NULL) {
333                                        parse_user_config_file(&uc, f);
334                                } else {
335                                        perror("Failed to open configuration file\n");
336                                        usage(argv[0]);
337                                }
338                                break;
339                        default:
340                                fprintf(stderr,"unknown option: %c\n",c);
341                                usage(argv[0]);
342
343                }
344
345        }
346
347        if (compress_type_str == NULL && level >= 0) {
348                fprintf(stderr, "Compression level set, but no compression type was defined, setting to gzip\n");
349                compress_type = TRACE_OPTION_COMPRESSTYPE_ZLIB;
350        }
351
352        else if (compress_type_str == NULL) {
353                /* If a level or type is not specified, use the "none"
354                 * compression module */
355                compress_type = TRACE_OPTION_COMPRESSTYPE_NONE;
356        }
357
358        /* I decided to be fairly generous in what I accept for the
359         * compression type string */
360        else if (strncmp(compress_type_str, "gz", 2) == 0 ||
361                        strncmp(compress_type_str, "zlib", 4) == 0) {
362                compress_type = TRACE_OPTION_COMPRESSTYPE_ZLIB;
363        } else if (strncmp(compress_type_str, "bz", 2) == 0) {
364                compress_type = TRACE_OPTION_COMPRESSTYPE_BZ2;
365        } else if (strncmp(compress_type_str, "lzo", 3) == 0) {
366                compress_type = TRACE_OPTION_COMPRESSTYPE_LZO;
367        } else if (strncmp(compress_type_str, "no", 2) == 0) {
368                compress_type = TRACE_OPTION_COMPRESSTYPE_NONE;
369        } else {
370                fprintf(stderr, "Unknown compression type: %s\n",
371                        compress_type_str);
372                return 1;
373        }
374       
375
376       
377
378        /* open input uri */
379        trace = trace_create(argv[optind]);
380        if (trace_is_err(trace)) {
381                trace_perror(trace,"trace_create");
382                trace_destroy(trace);
383                return 1;
384        }
385
386        if (optind +1>= argc) {
387                /* no output specified, output in same format to
388                 * stdout
389                 */
390                output = strdup("erf:-");
391                writer = trace_create_output(output);
392        } else {
393                writer = trace_create_output(argv[optind +1]);
394        }
395        if (trace_is_err_output(writer)) {
396                trace_perror_output(writer,"trace_create_output");
397                trace_destroy_output(writer);
398                trace_destroy(trace);
399                return 1;
400        }
401       
402        /* Hopefully this will deal nicely with people who want to crank the
403         * compression level up to 11 :) */
404        if (level > 9) {
405                fprintf(stderr, "WARNING: Compression level > 9 specified, setting to 9 instead\n");
406                level = 9;
407        }
408
409        if (level >= 0 && trace_config_output(writer, 
410                        TRACE_OPTION_OUTPUT_COMPRESS, &level) == -1) {
411                trace_perror_output(writer, "Configuring compression level");
412                trace_destroy_output(writer);
413                trace_destroy(trace);
414                return 1;
415        }
416
417        if (trace_config_output(writer, TRACE_OPTION_OUTPUT_COMPRESSTYPE,
418                                &compress_type) == -1) {
419                trace_perror_output(writer, "Configuring compression type");
420                trace_destroy_output(writer);
421                trace_destroy(trace);
422                return 1;
423        }
424
425        if (trace_start_output(writer)==-1) {
426                trace_perror_output(writer,"trace_start_output");
427                trace_destroy_output(writer);
428                trace_destroy(trace);
429                return 1;
430        }
431
432        // OK parallel changes start here
433
434        /* Set a special mode flag that means the output is timestamped
435         * and ordered before its read into reduce. Seems like a good
436         * special case to have.
437         */
438         
439        int i = 1;
440        trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_t){0});
441        trace_parallel_config(trace, TRACE_OPTION_SET_CONFIG, &uc);
442
443        //trace_set_hasher(trace, HASHER_CUSTOM, rand_hash, NULL);
444       
445        if (trace_pstart(trace, NULL, &per_packet, &write_out)==-1) {
446                trace_perror(trace,"trace_start");
447                trace_destroy_output(writer);
448                trace_destroy(trace);
449                return 1;
450        }
451
452        sigact.sa_handler = cleanup_signal;
453        sigemptyset(&sigact.sa_mask);
454        sigact.sa_flags = SA_RESTART;
455
456        sigaction(SIGINT, &sigact, NULL);
457        sigaction(SIGTERM, &sigact, NULL);
458
459        // Wait for the trace to finish
460        trace_join(trace);
461       
462        //trace_destroy_packet(packet);
463        //print_contention_stats(trace);
464        trace_destroy(trace);
465        trace_destroy_output(writer);
466        return 0;
467}
Note: See TracBrowser for help on using the repository browser.