source: tools/traceends/traceends.cc @ 99ee456

develop
Last change on this file since 99ee456 was 99ee456, checked in by Shane Alcock <salcock@…>, 3 years ago

Convert traceends to use parallel API.

  • Property mode set to 100644
File size: 20.9 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27
28#define __STDC_FORMAT_MACROS
29
30#include <libtrace.h>
31#include <stdio.h>
32#include <stdlib.h>
33#include <inttypes.h>
34#include <lt_inttypes.h>
35#include <getopt.h>
36#include <string.h>
37#include <assert.h>
38#include <signal.h>
39#include <time.h>
40#include <sys/socket.h>
41#include <netinet/in.h>
42#include <arpa/inet.h>
43#include <time.h>
44
45#include <map>
46
47#include "libtrace_parallel.h"
48
49typedef struct end_counter {
50        uint64_t src_bytes;
51        uint64_t src_pbytes;
52        uint64_t src_pkts;
53        uint64_t dst_pkts;
54        uint64_t dst_bytes;
55        uint64_t dst_pbytes;
56
57        double last_active;
58
59} end_counter_t;
60
61typedef struct mac_addr {
62        uint8_t addr[6];
63} mac_addr_t;
64
65struct v6comp {
66        bool operator() (const struct in6_addr &a, const struct in6_addr &b) const {
67                if (memcmp(&a, &b, sizeof(struct in6_addr)) < 0)
68                        return true;
69                return false;
70        }
71};
72       
73struct maccomp {
74        bool operator() (const mac_addr_t &a, const mac_addr_t &b) const {
75                if (memcmp(&a, &b, sizeof(mac_addr_t)) < 0)
76                        return true;
77                return false;
78        }
79};
80
81typedef std::map<uint32_t, end_counter_t *> IP4EndMap;
82typedef std::map<struct in6_addr, end_counter_t *, v6comp> IP6EndMap;
83typedef std::map<mac_addr_t, end_counter_t *, maccomp> MacEndMap;
84
85enum {
86        MODE_MAC,
87        MODE_IPV4,
88        MODE_IPV6
89};
90
91
92typedef struct traceend_global {
93        int mode;
94        int threads;
95        int track_source;
96        int track_dest;
97} global_t;
98
99typedef struct traceend_local {
100        union {
101                IP4EndMap *ipv4;
102                IP6EndMap *ipv6;
103                MacEndMap *mac;
104        } map;
105} local_t;
106
107typedef struct traceend_result_local {
108        union {
109                IP4EndMap *ipv4;
110                IP6EndMap *ipv6;
111                MacEndMap *mac;
112        } map;
113        int threads_reported;
114} result_t;
115
116libtrace_t *currenttrace = NULL;
117
118static int usage(char *argv0)
119{
120        printf("Usage:\n"
121        "%s flags inputuri [inputuri ... ] \n"
122        "-f --filter=bpf        Only output packets that match filter\n"
123        "-H --help              Print this message\n"
124        "-A --address=addr      Specifies which address type to match (mac, v4, v6)\n"
125        ,argv0);
126        exit(1);
127}
128
129static void cleanup_signal(int sig)
130{
131        (void)sig;
132        if (currenttrace) {
133                trace_pstop(currenttrace);
134        }
135}
136
137static void *cb_starting(libtrace_t *trace UNUSED, libtrace_thread_t *t UNUSED,
138                void *global) {
139
140        global_t *glob = (global_t *)global;
141        local_t *local = (local_t *)malloc(sizeof(local_t));
142
143        switch(glob->mode) {
144                case MODE_IPV4:
145                        local->map.ipv4 = new IP4EndMap();
146                        break;
147                case MODE_IPV6:
148                        local->map.ipv6 = new IP6EndMap();
149                        break;
150                case MODE_MAC:
151                        local->map.mac = new MacEndMap();
152                        break;
153        }
154        return local;
155
156}
157
158static void cb_stopping(libtrace_t *trace, libtrace_thread_t *t,
159                void *global UNUSED, void *tls) {
160
161        local_t *local = (local_t *)tls;
162        libtrace_generic_t gen;
163
164        gen.ptr = local;
165        trace_publish_result(trace, t, 0, gen, RESULT_USER);
166}
167
168static inline end_counter_t *create_counter() {
169        end_counter_t *c = (end_counter_t *)malloc(sizeof(end_counter_t));
170
171        c->src_bytes = 0;
172        c->src_pbytes = 0;
173        c->src_pkts = 0;
174        c->dst_pkts = 0;
175        c->dst_bytes = 0;
176        c->dst_pbytes = 0;
177
178        c->last_active = 0.0;
179        return c;
180}
181
182static inline char *mac_string(mac_addr_t m, char *str) {
183        snprintf(str, 80, "%02x:%02x:%02x:%02x:%02x:%02x", 
184                m.addr[0], m.addr[1], m.addr[2], m.addr[3], m.addr[4],
185                m.addr[5]);
186        return str;
187}
188
189static inline void combine_counters(end_counter_t *c, end_counter_t *c2) {
190
191        c->src_pkts += c2->src_pkts;
192        c->src_bytes += c2->src_bytes;
193        c->src_pbytes += c2->src_pbytes;
194        c->dst_pkts += c2->dst_pkts;
195        c->dst_bytes += c2->dst_bytes;
196        c->dst_pbytes += c2->dst_pbytes;
197
198}
199
200static void combine_mac_maps(MacEndMap *dst, MacEndMap *src) {
201
202        MacEndMap::iterator it;
203        MacEndMap::iterator found;
204
205        for (it = src->begin(); it != src->end(); it++) {
206                found = dst->find(it->first);
207
208                if (found == dst->end()) {
209                        (*dst)[it->first] = it->second;
210                        continue;
211                }
212
213                combine_counters(found->second, it->second);
214                free(it->second);
215        }
216
217}
218
219static void combine_ipv4_maps(IP4EndMap *dst, IP4EndMap *src) {
220
221        IP4EndMap::iterator it;
222        IP4EndMap::iterator found;
223
224        for (it = src->begin(); it != src->end(); it++) {
225                found = dst->find(it->first);
226
227                if (found == dst->end()) {
228                        (*dst)[it->first] = it->second;
229                        continue;
230                }
231
232                combine_counters(found->second, it->second);
233                free(it->second);
234        }
235
236}
237
238static void combine_ipv6_maps(IP6EndMap *dst, IP6EndMap *src) {
239
240        IP6EndMap::iterator it;
241        IP6EndMap::iterator found;
242
243        for (it = src->begin(); it != src->end(); it++) {
244                found = dst->find(it->first);
245
246                if (found == dst->end()) {
247                        (*dst)[it->first] = it->second;
248                        continue;
249                }
250
251                combine_counters(found->second, it->second);
252                free(it->second);
253        }
254
255}
256
257static void dump_ipv4_map(IP4EndMap *ipv4, bool destroy) {
258        IP4EndMap::iterator it;
259        struct in_addr in;
260        char timestr[80];
261        struct tm *tm;
262        time_t t;
263        for (it = ipv4->begin(); it != ipv4->end(); it++) {
264                in.s_addr = it->first;
265                t = (time_t)(it->second->last_active);
266                tm = localtime(&t);
267                strftime(timestr, 80, "%d/%m,%H:%M:%S", tm);
268                printf("%16s %16s %16" PRIu64 " %16" PRIu64 " %16" PRIu64 " %16" PRIu64 " %16" PRIu64 " %16" PRIu64 "\n", 
269                                inet_ntoa(in),
270                                timestr,
271                                it->second->src_pkts,
272                                it->second->src_bytes,
273                                it->second->src_pbytes,
274                                it->second->dst_pkts,
275                                it->second->dst_bytes,
276                                it->second->dst_pbytes);
277                if (destroy) {
278                       free(it->second);
279                }
280        }
281}
282
283static void dump_ipv6_map(IP6EndMap *ipv6, bool destroy) {
284        IP6EndMap::iterator it;
285        struct in6_addr in;
286        char ip6_addr[128];
287        char timestr[80];
288        struct tm *tm;
289        time_t t;
290
291        for (it = ipv6->begin(); it != ipv6->end(); it++) {
292                in = it->first;
293                t = (time_t)(it->second->last_active);
294                tm = localtime(&t);
295                strftime(timestr, 80, "%d/%m,%H:%M:%S", tm);
296                printf("%40s %16s %16" PRIu64 " %16" PRIu64 " %16" PRIu64 " %16" PRIu64 " %16" PRIu64 " %16" PRIu64 "\n", 
297                                inet_ntop(AF_INET6, &in, ip6_addr, 128),
298                                timestr,
299                                it->second->src_pkts,
300                                it->second->src_bytes,
301                                it->second->src_pbytes,
302                                it->second->dst_pkts,
303                                it->second->dst_bytes,
304                                it->second->dst_pbytes);
305                if (destroy) {
306                       free(it->second);
307                }
308        }
309}
310
311static void dump_mac_map(MacEndMap *mac, bool destroy) {
312        MacEndMap::iterator it;
313        char str[80];
314        char timestr[80];
315        struct tm *tm;
316        time_t t;
317
318        for (it = mac->begin(); it != mac->end(); it++) {
319                t = (time_t)(it->second->last_active);
320                tm = localtime(&t);
321                strftime(timestr, 80, "%d/%m,%H:%M:%S", tm);
322                printf("%18s %16s %16" PRIu64 " %16" PRIu64 " %16" PRIu64 " %16" PRIu64 " %16" PRIu64 " %16" PRIu64 "\n", 
323                                mac_string(it->first, str),
324                                timestr,
325                                it->second->src_pkts,
326                                it->second->src_bytes,
327                                it->second->src_pbytes,
328                                it->second->dst_pkts,
329                                it->second->dst_bytes,
330                                it->second->dst_pbytes);
331                if (destroy) {
332                       free(it->second);
333                }
334        }
335}
336
337static void update_ipv6(global_t *glob,
338                local_t *local, libtrace_ip6_t *ip, uint16_t ip_len,
339                uint32_t rem, uint32_t plen,    double ts) {
340
341        struct in6_addr key;
342        IP6EndMap::iterator it;
343        end_counter_t *c = NULL;
344
345        if (rem < sizeof(libtrace_ip6_t))
346                return;
347        if (glob->track_source) {
348                key = ip->ip_src;
349
350                it = local->map.ipv6->find(key);
351                if (it == local->map.ipv6->end()) {
352                        c = create_counter();
353                        (*(local->map.ipv6))[key] = c;
354                } else {
355                        c = it->second;
356                }
357
358                c->src_pkts ++;
359                c->src_pbytes += plen;
360                c->src_bytes += ip_len;
361                if (ts != 0)
362                        c->last_active = ts;
363        }
364
365        if (glob->track_dest) {
366                key = ip->ip_dst;
367
368                it = local->map.ipv6->find(key);
369                if (it == local->map.ipv6->end()) {
370                        c = create_counter();
371                        (*(local->map.ipv6))[key] = c;
372                } else {
373                        c = it->second;
374                }
375
376                c->dst_pkts ++;
377                c->dst_pbytes += plen;
378                c->dst_bytes += ip_len;
379                if (ts != 0)
380                        c->last_active = ts;
381        }
382}
383
384static void update_mac(global_t *glob, local_t *local,
385                uint8_t *src, uint8_t *dst, uint16_t ip_len,
386                uint32_t plen, double ts) {
387
388        mac_addr_t key;
389        end_counter_t *c = NULL;
390        MacEndMap::iterator it;
391
392        if (glob->track_source) {
393                memcpy(&(key.addr), src, sizeof(key.addr));
394                it = local->map.mac->find(key);
395
396                if (it == local->map.mac->end()) {
397                        c = create_counter();
398                        (*(local->map.mac))[key] = c;
399                } else {
400                        c = it->second;
401                }
402
403                c->src_pkts ++;
404                c->src_pbytes += plen;
405                c->src_bytes += ip_len;
406                c->last_active = ts;
407        }
408
409        if (glob->track_dest) {
410                memcpy(&key.addr, dst, sizeof(key.addr));
411                it = local->map.mac->find(key);
412
413                if (it == local->map.mac->end()) {
414                        c = create_counter();
415                        (*(local->map.mac))[key] = c;
416                } else {
417                        c = it->second;
418                }
419
420                c->dst_pkts ++;
421                c->dst_pbytes += plen;
422                c->dst_bytes += ip_len;
423                c->last_active = ts;
424        }
425}
426
427static void update_ipv4(global_t *glob,
428                local_t *local, libtrace_ip_t *ip, uint16_t ip_len,
429                uint32_t rem, uint32_t plen, double ts) {
430
431        uint32_t key;
432        IP4EndMap::iterator it;
433        end_counter_t *c = NULL;
434
435        if (rem < sizeof(libtrace_ip_t))
436                return;
437
438        if (glob->track_source) {
439                key = ip->ip_src.s_addr;
440
441                it = local->map.ipv4->find(key);
442                if (it == local->map.ipv4->end()) {
443                        c = create_counter();
444                        (*(local->map.ipv4))[key] = c;
445                } else {
446                        c = it->second;
447                }
448
449                c->src_pkts ++;
450                c->src_pbytes += plen;
451                c->src_bytes += ip->ip_len;
452                if (ts != 0)
453                        c->last_active = ts;
454        }
455
456        if (glob->track_dest) {
457                key = ip->ip_dst.s_addr;
458
459                it = local->map.ipv4->find(key);
460                if (it == local->map.ipv4->end()) {
461                        c = create_counter();
462                        (*(local->map.ipv4))[key] = c;
463                } else {
464                        c = it->second;
465                }
466
467                c->dst_pkts ++;
468                c->dst_pbytes += plen;
469                c->dst_bytes += ip_len;
470                if (ts != 0)
471                        c->last_active = ts;
472        }
473}
474
475static void *cb_result_starting(libtrace_t *trace UNUSED,
476                libtrace_thread_t *t UNUSED, void *global) {
477
478        global_t *glob = (global_t *)global;
479        result_t *res = (result_t *)malloc(sizeof(result_t));
480
481        switch(glob->mode) {
482                case MODE_IPV4:
483                        res->map.ipv4 = new IP4EndMap();
484                        break;
485                case MODE_IPV6:
486                        res->map.ipv6 = new IP6EndMap();
487                        break;
488                case MODE_MAC:
489                        res->map.mac = new MacEndMap();
490                        break;
491        }
492        res->threads_reported = 0;
493        return res;
494}
495
496static void cb_result(libtrace_t *trace UNUSED,
497                libtrace_thread_t *sender UNUSED, void *global,
498                void *tls, libtrace_result_t *result) {
499
500        global_t *glob = (global_t *)global;
501        result_t *res = (result_t *)tls;
502        local_t *recvd = (local_t *)(result->value.ptr);
503
504
505        switch(glob->mode) {
506                case MODE_IPV4:
507                        combine_ipv4_maps(res->map.ipv4, recvd->map.ipv4);
508                        delete(recvd->map.ipv4);
509                        break;
510                case MODE_IPV6:
511                        combine_ipv6_maps(res->map.ipv6, recvd->map.ipv6);
512                        delete(recvd->map.ipv6);
513                        break;
514                case MODE_MAC:
515                        combine_mac_maps(res->map.mac, recvd->map.mac);
516                        delete(recvd->map.mac);
517                        break;
518        }
519        res->threads_reported ++;
520        free(recvd);
521}
522
523static void cb_result_stopping(libtrace_t *trace UNUSED,
524                libtrace_thread_t *t UNUSED, void *global, void *tls) {
525
526        global_t *glob = (global_t *)global;
527        result_t *res = (result_t *)tls;
528        switch(glob->mode) {
529                case MODE_IPV4:
530                        dump_ipv4_map(res->map.ipv4, 1);
531                        delete(res->map.ipv4);
532                        break;
533                case MODE_IPV6:
534                        dump_ipv6_map(res->map.ipv6, 1);
535                        delete(res->map.ipv6);
536                        break;
537                case MODE_MAC:
538                        dump_mac_map(res->map.mac, 1);
539                        delete(res->map.mac);
540                        break;
541        }
542        free(res);
543}
544
545
546static libtrace_packet_t *cb_packet(libtrace_t *trace, libtrace_thread_t *t,
547                void *global, void *tls, libtrace_packet_t *packet) {
548
549        global_t *glob = (global_t *)global;
550        local_t *local = (local_t *)tls;
551        void *header;
552        uint16_t ethertype;
553        uint32_t rem;
554        uint16_t ip_len = 0;
555        uint32_t plen = trace_get_payload_length(packet);
556        double ts = trace_get_seconds(packet);
557        libtrace_ip_t *ip = NULL;
558        libtrace_ip6_t *ip6 = NULL;
559        uint8_t *src_mac, *dst_mac;
560
561        header = trace_get_layer3(packet, &ethertype, &rem);
562
563        if (header == NULL || rem == 0)
564                return packet;
565
566        if (ethertype == TRACE_ETHERTYPE_IP) {
567                ip = (libtrace_ip_t *)header;
568                if (rem < sizeof(libtrace_ip_t))
569                        goto endpacketcb;
570                ip_len = ntohs(ip->ip_len);
571                if (glob->mode == MODE_IPV4 && ip) {
572                        update_ipv4(glob, local, ip, ip_len, rem, plen, ts);
573                        goto endpacketcb;
574                }
575        }
576
577        if (ethertype == TRACE_ETHERTYPE_IPV6) {
578                ip6 = (libtrace_ip6_t *)header;
579                if (rem < sizeof(libtrace_ip6_t))
580                        goto endpacketcb;
581                ip_len = ntohs(ip6->plen) + sizeof(libtrace_ip6_t);
582                if (glob->mode == MODE_IPV6 && ip6) {
583                        update_ipv6(glob, local, ip6, ip_len, rem, plen, ts);
584                        goto endpacketcb;
585                }
586        }
587
588        if (glob->mode == MODE_MAC) {
589                src_mac = trace_get_source_mac(packet);
590                dst_mac = trace_get_destination_mac(packet);
591
592                if (src_mac == NULL || dst_mac == NULL)
593                        goto endpacketcb;
594                update_mac(glob, local, src_mac, dst_mac, ip_len, plen, ts);
595        }
596
597endpacketcb:
598        return packet;
599}
600
601int main(int argc, char *argv[]) {
602
603        int i;
604        int threads = 1;
605        struct sigaction sigact;
606        struct libtrace_filter_t *filter=NULL;
607        struct libtrace_t *input = NULL;
608        global_t glob;
609        libtrace_callback_set_t *pktcbs, *repcbs;
610
611        glob.mode = MODE_IPV4;
612        glob.track_source = 1;
613        glob.track_dest = 1;
614
615        while(1) {
616                int option_index;
617                struct option long_options[] = {
618                        { "filter",        1, 0, 'f' },
619                        { "help",          0, 0, 'H' },
620                        { "addresses",     1, 0, 'A' }, 
621                        { "threads",       1, 0, 't' }, 
622                        { "ignore-dest",           0, 0, 'D' }, 
623                        { "ignore-source",         0, 0, 'S' }, 
624                        { NULL,            0, 0, 0   },
625                };
626
627                int c=getopt_long(argc, argv, "A:f:t:HDS",
628                                long_options, &option_index);
629
630                if (c==-1)
631                        break;
632                switch (c) {
633                        case 'A':
634                                if (strncmp(optarg, "mac", 3) == 0)
635                                        glob.mode = MODE_MAC;
636                                else if (strncmp(optarg, "v4", 2) == 0)
637                                        glob.mode = MODE_IPV4;
638                                else if (strncmp(optarg, "v6", 2) == 0)
639                                        glob.mode = MODE_IPV6;
640                                else {
641                                        fprintf(stderr, "Invalid address type, must be either mac, v4 or v6\n");
642                                        return 1;
643                                }
644                                break;
645                        case 'D':
646                                glob.track_dest = 0;
647                                break;
648                        case 'f': filter=trace_create_filter(optarg);
649                                break;
650                        case 'H':
651                                usage(argv[0]);
652                                break;
653                        case 'S':
654                                glob.track_source = 0;
655                                break;
656                        case 't':
657                                threads = atoi(optarg);
658                                break;
659                        default:
660                                fprintf(stderr,"Unknown option: %c\n",c);
661                                usage(argv[0]);
662                                return 1;
663                }
664
665        }
666        sigact.sa_handler = cleanup_signal;
667        sigemptyset(&sigact.sa_mask);
668        sigact.sa_flags = SA_RESTART;
669
670        sigaction(SIGINT, &sigact, NULL);
671        sigaction(SIGTERM, &sigact, NULL);
672        sigaction(SIGPIPE, &sigact, NULL);
673        sigaction(SIGHUP, &sigact, NULL);
674
675        if (threads <= 0) {
676                threads = 1;
677        }
678        glob.threads = threads;
679
680        if (glob.track_source == 0 && glob.track_dest == 0) {
681                fprintf(stderr, "Bad configuration -- ignoring both source and dest endpoints will produce\nno results!\n");
682                usage(argv[0]);
683                return 1;
684        }
685
686        for (i = optind; i < argc; i++) {
687                input = trace_create(argv[i]);
688
689                if (trace_is_err(input)) {
690                        trace_perror(input,"%s",argv[i]);
691                        trace_destroy(input);
692                        return 1;
693                }
694
695                if (filter && trace_config(input, TRACE_OPTION_FILTER, filter) == 1) {
696                        trace_perror(input, "Configuring filter for %s",
697                                        argv[i]);
698                        return 1;
699                }
700
701                trace_set_combiner(input, &combiner_unordered,
702                        (libtrace_generic_t){0});
703                trace_set_perpkt_threads(input, threads);
704
705                pktcbs = trace_create_callback_set();
706                trace_set_starting_cb(pktcbs, cb_starting);
707                trace_set_stopping_cb(pktcbs, cb_stopping);
708                trace_set_packet_cb(pktcbs, cb_packet);
709
710                repcbs = trace_create_callback_set();
711                trace_set_starting_cb(repcbs, cb_result_starting);
712                trace_set_stopping_cb(repcbs, cb_result_stopping);
713                trace_set_result_cb(repcbs, cb_result);
714
715                currenttrace = input;
716                if (trace_pstart(input, &glob, pktcbs, repcbs) == -1) {
717                        trace_perror(input, "Failed to start trace");
718                        trace_destroy(input);
719                        trace_destroy_callback_set(pktcbs);
720                        trace_destroy_callback_set(repcbs);
721                        return 1;
722                }
723
724                trace_join(input);
725
726                if (trace_is_err(input)) {
727                        trace_perror(input,"Reading packets");
728                        trace_destroy(input);
729                        break;
730                }
731
732                currenttrace = NULL;
733                trace_destroy(input);
734                trace_destroy_callback_set(pktcbs);
735                trace_destroy_callback_set(repcbs);
736        }
737
738        return 0;
739}
Note: See TracBrowser for help on using the repository browser.