source: tools/traceanon/traceanon_parallel.c @ 17a3dff

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 17a3dff was 17a3dff, checked in by Richard Sanger <rsangerarj@…>, 7 years ago

Rename from google map/reduce framework names to something more meaningful.
Rename mapper to perpkt since this is what it actually is in libtrace.

  • Property mode set to 100644
File size: 12.1 KB
Line 
1#define _GNU_SOURCE
2#include "libtrace.h"
3#include <stdio.h>
4#include <unistd.h>
5#include <stdlib.h>
6#include <getopt.h>
7#include <stdbool.h>
8#include <stddef.h>
9#include <string.h>
10#include <time.h>
11#include <assert.h>
12#include "ipenc.h"
13#include <data-struct/vector.h>
14#include <data-struct/message_queue.h>
15
16bool enc_source = false;
17bool enc_dest   = false;
18enum enc_type_t enc_type = ENC_NONE;
19char *key = NULL;
20
21static void usage(char *argv0)
22{
23        fprintf(stderr,"Usage:\n"
24        "%s flags inputfile outputfile\n"
25        "-s --encrypt-source    Encrypt the source addresses\n"
26        "-d --encrypt-dest      Encrypt the destination addresses\n"
27        "-c --cryptopan=key     Encrypt the addresses with the cryptopan\n"
28        "                       prefix preserving\n"
29        "-f --keyfile=file      A file containing the cryptopan key\n"
30        "-p --prefix=C.I.D.R/bits Substitute the prefix of the address\n"
31        "-H --libtrace-help     Print libtrace runtime documentation\n"
32        "-z --compress-level    Compress the output trace at the specified level\n"
33        "-Z --compress-type     Compress the output trace using the specified"
34        "                       compression algorithm\n"
35        ,argv0);
36        exit(1);
37}
38
39/* Incrementally update a checksum */
40static void update_in_cksum(uint16_t *csum, uint16_t old, uint16_t new)
41{
42        uint32_t sum = (~htons(*csum) & 0xFFFF) 
43                     + (~htons(old) & 0xFFFF) 
44                     + htons(new);
45        sum = (sum & 0xFFFF) + (sum >> 16);
46        *csum = htons(~(sum + (sum >> 16)));
47}
48
49static void update_in_cksum32(uint16_t *csum, uint32_t old, uint32_t new)
50{
51        update_in_cksum(csum,(uint16_t)(old>>16),(uint16_t)(new>>16));
52        update_in_cksum(csum,(uint16_t)(old&0xFFFF),(uint16_t)(new&0xFFFF));
53}
54
55/* Ok this is remarkably complicated
56 *
57 * We want to change one, or the other IP address, while preserving
58 * the checksum.  TCP and UDP both include the faux header in their
59 * checksum calculations, so you have to update them too.  ICMP is
60 * even worse -- it can include the original IP packet that caused the
61 * error!  So anonymise that too, but remember that it's travelling in
62 * the opposite direction so we need to encrypt the destination and
63 * source instead of the source and destination!
64 */
65static void encrypt_ips(struct libtrace_ip *ip,bool enc_source,bool enc_dest)
66{
67        struct libtrace_tcp *tcp;
68        struct libtrace_udp *udp;
69        struct libtrace_icmp *icmp;
70
71        tcp=trace_get_tcp_from_ip(ip,NULL);
72        udp=trace_get_udp_from_ip(ip,NULL);
73        icmp=trace_get_icmp_from_ip(ip,NULL);
74
75        if (enc_source) {
76                uint32_t old_ip=ip->ip_src.s_addr;
77                uint32_t new_ip=htonl(enc_ip(
78                                        htonl(ip->ip_src.s_addr)
79                                        ));
80                update_in_cksum32(&ip->ip_sum,old_ip,new_ip);
81                if (tcp) update_in_cksum32(&tcp->check,old_ip,new_ip);
82                if (udp) update_in_cksum32(&udp->check,old_ip,new_ip);
83                ip->ip_src.s_addr = new_ip;
84        }
85
86        if (enc_dest) {
87                uint32_t old_ip=ip->ip_dst.s_addr;
88                uint32_t new_ip=htonl(enc_ip(
89                                        htonl(ip->ip_dst.s_addr)
90                                        ));
91                update_in_cksum32(&ip->ip_sum,old_ip,new_ip);
92                if (tcp) update_in_cksum32(&tcp->check,old_ip,new_ip);
93                if (udp) update_in_cksum32(&udp->check,old_ip,new_ip);
94                ip->ip_dst.s_addr = new_ip;
95        }
96
97        if (icmp) {
98                /* These are error codes that return the IP packet
99                 * internally
100                 */
101               
102                if (icmp->type == 3 
103                                || icmp->type == 5 
104                                || icmp->type == 11) {
105                        char *ptr = (char *)icmp;
106                        encrypt_ips(
107                                (struct libtrace_ip*)(ptr+
108                                        sizeof(struct libtrace_icmp)),
109                                enc_dest,
110                                enc_source);
111                }
112
113                if (enc_source || enc_dest)
114                        icmp->checksum = 0;
115        }
116}
117
118
119static uint64_t bad_hash(libtrace_packet_t * pkt)
120{
121        return 0;
122}
123
124
125static uint64_t rand_hash(libtrace_packet_t * pkt)
126{
127        return rand();
128}
129
130
131static void* per_packet(libtrace_t *trace, libtrace_packet_t *pkt, libtrace_message_t *mesg, libtrace_thread_t *t)
132{
133        int i;
134       
135        if (pkt) {
136                struct libtrace_ip *ipptr;
137                libtrace_udp_t *udp = NULL;
138                libtrace_tcp_t *tcp = NULL;
139
140                ipptr = trace_get_ip(pkt);
141
142                if (ipptr && (enc_source || enc_dest)) {
143                        encrypt_ips(ipptr,enc_source,enc_dest);
144                        ipptr->ip_sum = 0;
145                }
146
147                /* Replace checksums so that IP encryption cannot be
148                 * reversed */
149
150                /* XXX replace with nice use of trace_get_transport() */
151
152                udp = trace_get_udp(pkt);
153                if (udp && (enc_source || enc_dest)) {
154                        udp->check = 0;
155                } 
156
157                tcp = trace_get_tcp(pkt);
158                if (tcp && (enc_source || enc_dest)) {
159                        tcp->check = 0;
160                }
161
162                /* TODO: Encrypt IP's in ARP packets */
163               
164                // Send our result keyed with the time
165                // Arg don't copy packets
166                //libtrace_packet_t * packet_copy = trace_copy_packet(packet);
167                //libtrace_packet_t * packet_copy = trace_result_packet(trace, pkt);
168                trace_publish_result(trace, trace_packet_get_order(pkt), pkt);
169                //return ;
170        }
171        if (mesg) {
172                // printf ("%d.%06d READ #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet));
173                switch (mesg->code) {
174                        case MESSAGE_STARTED:
175                                enc_init(enc_type,key);
176                }
177        }
178        return NULL;
179}
180
181int main(int argc, char *argv[]) 
182{
183        struct libtrace_t *trace = 0;
184        struct libtrace_packet_t *packet/* = trace_create_packet()*/;
185        struct libtrace_out_t *writer = 0;
186        char *output = 0;
187        int level = -1;
188        char *compress_type_str=NULL;
189        trace_option_compresstype_t compress_type = TRACE_OPTION_COMPRESSTYPE_NONE;
190
191
192        if (argc<2)
193                usage(argv[0]);
194
195        while (1) {
196                int option_index;
197                struct option long_options[] = {
198                        { "encrypt-source",     0, 0, 's' },
199                        { "encrypt-dest",       0, 0, 'd' },
200                        { "cryptopan",          1, 0, 'c' },
201                        { "cryptopan-file",     1, 0, 'f' },
202                        { "prefix",             1, 0, 'p' },
203                        { "compress-level",     1, 0, 'z' },
204                        { "compress-type",      1, 0, 'Z' },
205                        { "libtrace-help",      0, 0, 'H' },
206                        { NULL,                 0, 0, 0   },
207                };
208
209                int c=getopt_long(argc, argv, "Z:z:sc:f:dp:H",
210                                long_options, &option_index);
211
212                if (c==-1)
213                        break;
214
215                switch (c) {
216                        case 'Z': compress_type_str=optarg; break;         
217                        case 'z': level = atoi(optarg); break;
218                        case 's': enc_source=true; break;
219                        case 'd': enc_dest  =true; break;
220                        case 'c':
221                                  if (key!=NULL) {
222                                          fprintf(stderr,"You can only have one encryption type and one key\n");
223                                          usage(argv[0]);
224                                  }
225                                  key=strdup(optarg);
226                                  enc_type = ENC_CRYPTOPAN;
227                                  break;
228                        case 'f':
229                                  if(key != NULL) {
230                                    fprintf(stderr,"You can only have one encryption type and one key\n");
231                                    usage(argv[0]);
232                                  }
233                                  FILE * infile = fopen(optarg,"rb");
234                                  if(infile == NULL) {
235                                    perror("Failed to open cryptopan keyfile");
236                                    return 1;
237                                  }
238                                  key = (char *) malloc(sizeof(char *) * 32);
239                                  if(fread(key,1,32,infile) != 32) {
240                                    if(ferror(infile)) {
241                                      perror("Failed while reading cryptopan keyfile");
242                                    }
243                                  }
244                                  fclose(infile);
245                                  enc_type = ENC_CRYPTOPAN;
246                                  break;
247                        case 'p':
248                                  if (key!=NULL) {
249                                          fprintf(stderr,"You can only have one encryption type and one key\n");
250                                          usage(argv[0]);
251                                  }
252                                  key=strdup(optarg);
253                                  enc_type = ENC_PREFIX_SUBSTITUTION;
254                                  break;
255                        case 'H':
256                                  trace_help(); 
257                                  exit(1); 
258                                  break;
259                        default:
260                                fprintf(stderr,"unknown option: %c\n",c);
261                                usage(argv[0]);
262
263                }
264
265        }
266
267        if (compress_type_str == NULL && level >= 0) {
268                fprintf(stderr, "Compression level set, but no compression type was defined, setting to gzip\n");
269                compress_type = TRACE_OPTION_COMPRESSTYPE_ZLIB;
270        }
271
272        else if (compress_type_str == NULL) {
273                /* If a level or type is not specified, use the "none"
274                 * compression module */
275                compress_type = TRACE_OPTION_COMPRESSTYPE_NONE;
276        }
277
278        /* I decided to be fairly generous in what I accept for the
279         * compression type string */
280        else if (strncmp(compress_type_str, "gz", 2) == 0 ||
281                        strncmp(compress_type_str, "zlib", 4) == 0) {
282                compress_type = TRACE_OPTION_COMPRESSTYPE_ZLIB;
283        } else if (strncmp(compress_type_str, "bz", 2) == 0) {
284                compress_type = TRACE_OPTION_COMPRESSTYPE_BZ2;
285        } else if (strncmp(compress_type_str, "lzo", 3) == 0) {
286                compress_type = TRACE_OPTION_COMPRESSTYPE_LZO;
287        } else if (strncmp(compress_type_str, "no", 2) == 0) {
288                compress_type = TRACE_OPTION_COMPRESSTYPE_NONE;
289        } else {
290                fprintf(stderr, "Unknown compression type: %s\n",
291                        compress_type_str);
292                return 1;
293        }
294       
295
296       
297
298        /* open input uri */
299        trace = trace_create(argv[optind]);
300        if (trace_is_err(trace)) {
301                trace_perror(trace,"trace_create");
302                trace_destroy(trace);
303                return 1;
304        }
305
306        if (optind +1>= argc) {
307                /* no output specified, output in same format to
308                 * stdout
309                 */
310                output = strdup("erf:-");
311                writer = trace_create_output(output);
312        } else {
313                writer = trace_create_output(argv[optind +1]);
314        }
315        if (trace_is_err_output(writer)) {
316                trace_perror_output(writer,"trace_create_output");
317                trace_destroy_output(writer);
318                trace_destroy(trace);
319                return 1;
320        }
321       
322        /* Hopefully this will deal nicely with people who want to crank the
323         * compression level up to 11 :) */
324        if (level > 9) {
325                fprintf(stderr, "WARNING: Compression level > 9 specified, setting to 9 instead\n");
326                level = 9;
327        }
328
329        if (level >= 0 && trace_config_output(writer, 
330                        TRACE_OPTION_OUTPUT_COMPRESS, &level) == -1) {
331                trace_perror_output(writer, "Configuring compression level");
332                trace_destroy_output(writer);
333                trace_destroy(trace);
334                return 1;
335        }
336
337        if (trace_config_output(writer, TRACE_OPTION_OUTPUT_COMPRESSTYPE,
338                                &compress_type) == -1) {
339                trace_perror_output(writer, "Configuring compression type");
340                trace_destroy_output(writer);
341                trace_destroy(trace);
342                return 1;
343        }
344
345        // OK parallel changes start here
346
347        /* Set a special mode flag that means the output is timestamped
348         * and ordered before its read into reduce. Seems like a good
349         * special case to have.
350         */
351         
352        int i = 1;
353        trace_parallel_config(trace, TRACE_OPTION_SEQUENTIAL, &i);
354        //trace_parallel_config(trace, TRACE_OPTION_SET_PERPKT_BUFFER_SIZE, &i);
355        //trace_parallel_config(trace, TRACE_OPTION_USE_DEDICATED_HASHER, &i);
356        //trace_parallel_config(trace, TRACE_OPTION_USE_SLIDING_WINDOW_BUFFER, &i);
357        i = 2;
358        trace_parallel_config(trace, TRACE_OPTION_SET_PERPKT_THREAD_COUNT, &i);
359       
360        if (trace_pstart(trace, NULL, &per_packet, NULL)==-1) {
361                trace_perror(trace,"trace_start");
362                trace_destroy_output(writer);
363                trace_destroy(trace);
364                return 1;
365        }
366       
367        if (trace_start_output(writer)==-1) {
368                trace_perror_output(writer,"trace_start_output");
369                trace_destroy_output(writer);
370                trace_destroy(trace);
371                return 1;
372        }
373       
374        // Read in the resulting packets and then free them when done
375        libtrace_vector_t res;
376        int res_size = 0;
377        libtrace_vector_init(&res, sizeof(libtrace_result_t));
378        uint64_t packet_count = 0;
379        while (!trace_finished(trace)) {
380                // Read messages
381                libtrace_message_t message;
382               
383                // We just release and do work currently, maybe if something
384                // interesting comes through we'd deal with that
385                libtrace_thread_get_message(trace, &message);
386               
387                while (libtrace_thread_try_get_message(trace, &message) != LIBTRACE_MQ_FAILED) { }
388               
389                if ((res_size = trace_get_results(trace, &res)) == 0)
390                        ;/*sched_yield();*/
391               
392                for (i = 0 ; i < res_size ; i++) {
393                        libtrace_result_t result;
394                        assert(libtrace_vector_get(&res, i, (void *) &result) == 1);
395                        packet = libtrace_result_get_value(&result);
396                        assert(libtrace_result_get_key(&result) == packet_count);
397                        packet_count++;
398                        if (trace_write_packet(writer,packet)==-1) {
399                                trace_perror_output(writer,"writer");
400                                trace_interrupt();
401                                break;
402                        }
403                        //trace_destroy_packet(packet);
404                        trace_free_result_packet(trace, packet);
405                }
406        }
407        trace_join(trace);
408       
409        // Grab everything that's left here
410        res_size = trace_get_results(trace, &res);
411       
412        for (i = 0 ; i < res_size ; i++) {
413                libtrace_result_t result;
414                assert(libtrace_vector_get(&res, i, (void *) &result) == 1);
415                packet = libtrace_result_get_value(&result);
416                if (libtrace_result_get_key(&result) != packet_count)
417                        printf ("Got a %"PRIu64" but expected a %"PRIu64" %d\n", libtrace_result_get_key(&result), packet_count, res_size);
418                assert(libtrace_result_get_key(&result) == packet_count);
419               
420                packet_count++;
421                if (trace_write_packet(writer,packet)==-1) {
422                        trace_perror_output(writer,"writer");
423                        trace_interrupt();
424                        break;
425                }
426                trace_destroy_packet(packet);
427        }
428        libtrace_vector_destroy(&res);
429       
430        //trace_destroy_packet(packet);
431        //print_contention_stats(trace);
432        trace_destroy(trace);
433        trace_destroy_output(writer);
434        return 0;
435}
Note: See TracBrowser for help on using the repository browser.