source: tools/traceanon/traceanon_parallel.c @ 2498008

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 2498008 was 2498008, checked in by Richard Sanger <rsangerarj@…>, 7 years ago

Refactor the combining step to allow user defined functions here.

Remove the old trace_get_results, now instead simply provide a reporter function which gets called as soon as results are ready.
The combiner function used determines the order of these results and when they are released etc.
The combiner function can be selected from those built-in or a custom version can be defined results are provided when ready.
Quickly hacked the parallel tests to work with this update, these are still a bit messy.

Also some fixes some compile warnings.

  • Property mode set to 100644
File size: 12.1 KB
Line 
1#define _GNU_SOURCE
2#include "libtrace.h"
3#include <stdio.h>
4#include <unistd.h>
5#include <stdlib.h>
6#include <getopt.h>
7#include <stdbool.h>
8#include <stddef.h>
9#include <string.h>
10#include <time.h>
11#include <assert.h>
12#include "ipenc.h"
13#include <data-struct/vector.h>
14#include <data-struct/message_queue.h>
15#include <signal.h>
16
17bool enc_source = false;
18bool enc_dest   = false;
19enum enc_type_t enc_type = ENC_NONE;
20char *key = NULL;
21
22
23struct libtrace_t *trace = NULL;
24
25static void cleanup_signal(int signal)
26{
27        static int s = 0;
28        (void)signal;
29    //trace_interrupt();
30        // trace_pstop isn't really signal safe because its got lots of locks in it
31    trace_pstop(trace);
32    /*if (s == 0) {
33                if (trace_ppause(trace) == -1)
34                        trace_perror(trace, "Pause failed");
35        }
36        else {
37                if (trace_pstart(trace, NULL, NULL, NULL) == -1)
38                        trace_perror(trace, "Start failed");
39    }*/
40        s = !s;
41}
42
43
44
45static void usage(char *argv0)
46{
47        fprintf(stderr,"Usage:\n"
48        "%s flags inputfile outputfile\n"
49        "-s --encrypt-source    Encrypt the source addresses\n"
50        "-d --encrypt-dest      Encrypt the destination addresses\n"
51        "-c --cryptopan=key     Encrypt the addresses with the cryptopan\n"
52        "                       prefix preserving\n"
53        "-f --keyfile=file      A file containing the cryptopan key\n"
54        "-p --prefix=C.I.D.R/bits Substitute the prefix of the address\n"
55        "-H --libtrace-help     Print libtrace runtime documentation\n"
56        "-z --compress-level    Compress the output trace at the specified level\n"
57        "-Z --compress-type     Compress the output trace using the specified"
58        "                       compression algorithm\n"
59        ,argv0);
60        exit(1);
61}
62
63/* Incrementally update a checksum */
64static void update_in_cksum(uint16_t *csum, uint16_t old, uint16_t new)
65{
66        uint32_t sum = (~htons(*csum) & 0xFFFF) 
67                     + (~htons(old) & 0xFFFF) 
68                     + htons(new);
69        sum = (sum & 0xFFFF) + (sum >> 16);
70        *csum = htons(~(sum + (sum >> 16)));
71}
72
73static void update_in_cksum32(uint16_t *csum, uint32_t old, uint32_t new)
74{
75        update_in_cksum(csum,(uint16_t)(old>>16),(uint16_t)(new>>16));
76        update_in_cksum(csum,(uint16_t)(old&0xFFFF),(uint16_t)(new&0xFFFF));
77}
78
79/* Ok this is remarkably complicated
80 *
81 * We want to change one, or the other IP address, while preserving
82 * the checksum.  TCP and UDP both include the faux header in their
83 * checksum calculations, so you have to update them too.  ICMP is
84 * even worse -- it can include the original IP packet that caused the
85 * error!  So anonymise that too, but remember that it's travelling in
86 * the opposite direction so we need to encrypt the destination and
87 * source instead of the source and destination!
88 */
89static void encrypt_ips(struct libtrace_ip *ip,bool enc_source,bool enc_dest)
90{
91        struct libtrace_tcp *tcp;
92        struct libtrace_udp *udp;
93        struct libtrace_icmp *icmp;
94
95        tcp=trace_get_tcp_from_ip(ip,NULL);
96        udp=trace_get_udp_from_ip(ip,NULL);
97        icmp=trace_get_icmp_from_ip(ip,NULL);
98
99        if (enc_source) {
100                uint32_t old_ip=ip->ip_src.s_addr;
101                uint32_t new_ip=htonl(enc_ip(
102                                        htonl(ip->ip_src.s_addr)
103                                        ));
104                update_in_cksum32(&ip->ip_sum,old_ip,new_ip);
105                if (tcp) update_in_cksum32(&tcp->check,old_ip,new_ip);
106                if (udp) update_in_cksum32(&udp->check,old_ip,new_ip);
107                ip->ip_src.s_addr = new_ip;
108        }
109
110        if (enc_dest) {
111                uint32_t old_ip=ip->ip_dst.s_addr;
112                uint32_t new_ip=htonl(enc_ip(
113                                        htonl(ip->ip_dst.s_addr)
114                                        ));
115                update_in_cksum32(&ip->ip_sum,old_ip,new_ip);
116                if (tcp) update_in_cksum32(&tcp->check,old_ip,new_ip);
117                if (udp) update_in_cksum32(&udp->check,old_ip,new_ip);
118                ip->ip_dst.s_addr = new_ip;
119        }
120
121        if (icmp) {
122                /* These are error codes that return the IP packet
123                 * internally
124                 */
125               
126                if (icmp->type == 3 
127                                || icmp->type == 5 
128                                || icmp->type == 11) {
129                        char *ptr = (char *)icmp;
130                        encrypt_ips(
131                                (struct libtrace_ip*)(ptr+
132                                        sizeof(struct libtrace_icmp)),
133                                enc_dest,
134                                enc_source);
135                }
136
137                if (enc_source || enc_dest)
138                        icmp->checksum = 0;
139        }
140}
141
142
143UNUSED static uint64_t bad_hash(UNUSED libtrace_packet_t * pkt)
144{
145        return 0;
146}
147
148
149UNUSED static uint64_t rand_hash(UNUSED libtrace_packet_t * pkt)
150{
151        return rand();
152}
153
154
155static void* per_packet(libtrace_t *trace, libtrace_packet_t *pkt, libtrace_message_t *mesg, UNUSED libtrace_thread_t *t)
156{
157       
158        if (pkt) {
159                struct libtrace_ip *ipptr;
160                libtrace_udp_t *udp = NULL;
161                libtrace_tcp_t *tcp = NULL;
162
163                ipptr = trace_get_ip(pkt);
164
165                if (ipptr && (enc_source || enc_dest)) {
166                        encrypt_ips(ipptr,enc_source,enc_dest);
167                        ipptr->ip_sum = 0;
168                }
169
170                /* Replace checksums so that IP encryption cannot be
171                 * reversed */
172
173                /* XXX replace with nice use of trace_get_transport() */
174
175                udp = trace_get_udp(pkt);
176                if (udp && (enc_source || enc_dest)) {
177                        udp->check = 0;
178                } 
179
180                tcp = trace_get_tcp(pkt);
181                if (tcp && (enc_source || enc_dest)) {
182                        tcp->check = 0;
183                }
184
185                /* TODO: Encrypt IP's in ARP packets */
186               
187                // Send our result keyed with the time
188                // Arg don't copy packets
189                //libtrace_packet_t * packet_copy = trace_copy_packet(packet);
190                //libtrace_packet_t * packet_copy = trace_result_packet(trace, pkt);
191                //trace_publish_result(trace, trace_packet_get_order(pkt), pkt);
192                trace_publish_result(trace, t, trace_packet_get_order(pkt), pkt, RESULT_PACKET);
193                //return ;
194        }
195        if (mesg) {
196                // printf ("%d.%06d READ #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet));
197                switch (mesg->code) {
198                        case MESSAGE_STARTING:
199                                enc_init(enc_type,key);
200                        break;
201                        case MESSAGE_TICK:
202                                trace_publish_result(trace, t, mesg->additional.uint64, NULL, RESULT_TICK);
203                }
204        }
205        return NULL;
206}
207
208struct libtrace_out_t *writer = 0;
209
210static void write_out(libtrace_t *trace, libtrace_result_t *result, UNUSED libtrace_message_t *mesg) {
211        static uint64_t packet_count = 0; // TESTING PURPOSES, this is not going to work with a live format
212
213        if (result) {
214                if (result->type == RESULT_PACKET) {
215                        libtrace_packet_t *packet = (libtrace_packet_t*) libtrace_result_get_value(result);
216                        assert(libtrace_result_get_key(result) == packet_count++);
217                        if (trace_write_packet(writer,packet)==-1) {
218                                trace_perror_output(writer,"writer");
219                                trace_interrupt();
220                        }
221                        trace_free_result_packet(trace, packet);
222
223                } else {
224                        assert(result->type == RESULT_TICK);
225                        // Ignore it
226                }
227        }
228}
229
230
231int main(int argc, char *argv[]) 
232{
233        //struct libtrace_t *trace = 0;
234        struct sigaction sigact;
235        char *output = 0;
236        int level = -1;
237        char *compress_type_str=NULL;
238        trace_option_compresstype_t compress_type = TRACE_OPTION_COMPRESSTYPE_NONE;
239        struct user_configuration uc;
240        ZERO_USER_CONFIG(uc);
241
242        if (argc<2)
243                usage(argv[0]);
244
245        while (1) {
246                int option_index;
247                struct option long_options[] = {
248                        { "encrypt-source",     0, 0, 's' },
249                        { "encrypt-dest",       0, 0, 'd' },
250                        { "cryptopan",          1, 0, 'c' },
251                        { "cryptopan-file",     1, 0, 'f' },
252                        { "prefix",             1, 0, 'p' },
253                        { "compress-level",     1, 0, 'z' },
254                        { "compress-type",      1, 0, 'Z' },
255                        { "libtrace-help",      0, 0, 'H' },
256                        { "config",             1, 0, 'u' },
257                    { "config-file",            1, 0, 'U' },
258                        { NULL,                 0, 0, 0   },
259                };
260
261                int c=getopt_long(argc, argv, "Z:z:sc:f:dp:Hu:U:",
262                                long_options, &option_index);
263
264                if (c==-1)
265                        break;
266
267                switch (c) {
268                        case 'Z': compress_type_str=optarg; break;         
269                        case 'z': level = atoi(optarg); break;
270                        case 's': enc_source=true; break;
271                        case 'd': enc_dest  =true; break;
272                        case 'c':
273                                  if (key!=NULL) {
274                                          fprintf(stderr,"You can only have one encryption type and one key\n");
275                                          usage(argv[0]);
276                                  }
277                                  key=strdup(optarg);
278                                  enc_type = ENC_CRYPTOPAN;
279                                  break;
280                        case 'f':
281                                  if(key != NULL) {
282                                    fprintf(stderr,"You can only have one encryption type and one key\n");
283                                    usage(argv[0]);
284                                  }
285                                  FILE * infile = fopen(optarg,"rb");
286                                  if(infile == NULL) {
287                                    perror("Failed to open cryptopan keyfile");
288                                    return 1;
289                                  }
290                                  key = (char *) malloc(sizeof(char *) * 32);
291                                  if(fread(key,1,32,infile) != 32) {
292                                    if(ferror(infile)) {
293                                      perror("Failed while reading cryptopan keyfile");
294                                    }
295                                  }
296                                  fclose(infile);
297                                  enc_type = ENC_CRYPTOPAN;
298                                  break;
299                        case 'p':
300                                  if (key!=NULL) {
301                                          fprintf(stderr,"You can only have one encryption type and one key\n");
302                                          usage(argv[0]);
303                                  }
304                                  key=strdup(optarg);
305                                  enc_type = ENC_PREFIX_SUBSTITUTION;
306                                  break;
307                        case 'H':
308                                  trace_help(); 
309                                  exit(1); 
310                                  break;
311                        case 'u':
312                                  parse_user_config(&uc, optarg);
313                                  break;
314                        case 'U':;
315                                FILE * f = fopen(optarg, "r");
316                                if (f != NULL) {
317                                        parse_user_config_file(&uc, f);
318                                } else {
319                                        perror("Failed to open configuration file\n");
320                                        usage(argv[0]);
321                                }
322                                break;
323                        default:
324                                fprintf(stderr,"unknown option: %c\n",c);
325                                usage(argv[0]);
326
327                }
328
329        }
330
331        if (compress_type_str == NULL && level >= 0) {
332                fprintf(stderr, "Compression level set, but no compression type was defined, setting to gzip\n");
333                compress_type = TRACE_OPTION_COMPRESSTYPE_ZLIB;
334        }
335
336        else if (compress_type_str == NULL) {
337                /* If a level or type is not specified, use the "none"
338                 * compression module */
339                compress_type = TRACE_OPTION_COMPRESSTYPE_NONE;
340        }
341
342        /* I decided to be fairly generous in what I accept for the
343         * compression type string */
344        else if (strncmp(compress_type_str, "gz", 2) == 0 ||
345                        strncmp(compress_type_str, "zlib", 4) == 0) {
346                compress_type = TRACE_OPTION_COMPRESSTYPE_ZLIB;
347        } else if (strncmp(compress_type_str, "bz", 2) == 0) {
348                compress_type = TRACE_OPTION_COMPRESSTYPE_BZ2;
349        } else if (strncmp(compress_type_str, "lzo", 3) == 0) {
350                compress_type = TRACE_OPTION_COMPRESSTYPE_LZO;
351        } else if (strncmp(compress_type_str, "no", 2) == 0) {
352                compress_type = TRACE_OPTION_COMPRESSTYPE_NONE;
353        } else {
354                fprintf(stderr, "Unknown compression type: %s\n",
355                        compress_type_str);
356                return 1;
357        }
358       
359
360       
361
362        /* open input uri */
363        trace = trace_create(argv[optind]);
364        if (trace_is_err(trace)) {
365                trace_perror(trace,"trace_create");
366                trace_destroy(trace);
367                return 1;
368        }
369
370        if (optind +1>= argc) {
371                /* no output specified, output in same format to
372                 * stdout
373                 */
374                output = strdup("erf:-");
375                writer = trace_create_output(output);
376        } else {
377                writer = trace_create_output(argv[optind +1]);
378        }
379        if (trace_is_err_output(writer)) {
380                trace_perror_output(writer,"trace_create_output");
381                trace_destroy_output(writer);
382                trace_destroy(trace);
383                return 1;
384        }
385       
386        /* Hopefully this will deal nicely with people who want to crank the
387         * compression level up to 11 :) */
388        if (level > 9) {
389                fprintf(stderr, "WARNING: Compression level > 9 specified, setting to 9 instead\n");
390                level = 9;
391        }
392
393        if (level >= 0 && trace_config_output(writer, 
394                        TRACE_OPTION_OUTPUT_COMPRESS, &level) == -1) {
395                trace_perror_output(writer, "Configuring compression level");
396                trace_destroy_output(writer);
397                trace_destroy(trace);
398                return 1;
399        }
400
401        if (trace_config_output(writer, TRACE_OPTION_OUTPUT_COMPRESSTYPE,
402                                &compress_type) == -1) {
403                trace_perror_output(writer, "Configuring compression type");
404                trace_destroy_output(writer);
405                trace_destroy(trace);
406                return 1;
407        }
408
409        if (trace_start_output(writer)==-1) {
410                trace_perror_output(writer,"trace_start_output");
411                trace_destroy_output(writer);
412                trace_destroy(trace);
413                return 1;
414        }
415
416        // OK parallel changes start here
417
418        /* Set a special mode flag that means the output is timestamped
419         * and ordered before its read into reduce. Seems like a good
420         * special case to have.
421         */
422         
423        int i = 1;
424        trace_parallel_config(trace, TRACE_OPTION_ORDERED, &i);
425        trace_parallel_config(trace, TRACE_OPTION_SET_CONFIG, &uc);
426        //trace_set_hasher(trace, HASHER_CUSTOM, rand_hash, NULL);
427       
428        if (trace_pstart(trace, NULL, &per_packet, &write_out)==-1) {
429                trace_perror(trace,"trace_start");
430                trace_destroy_output(writer);
431                trace_destroy(trace);
432                return 1;
433        }
434
435        sigact.sa_handler = cleanup_signal;
436        sigemptyset(&sigact.sa_mask);
437        sigact.sa_flags = SA_RESTART;
438
439        sigaction(SIGINT, &sigact, NULL);
440        sigaction(SIGTERM, &sigact, NULL);
441
442        // Wait for the trace to finish
443        trace_join(trace);
444       
445        //trace_destroy_packet(packet);
446        //print_contention_stats(trace);
447        trace_destroy(trace);
448        trace_destroy_output(writer);
449        return 0;
450}
Note: See TracBrowser for help on using the repository browser.