source: lib/format_rt.c @ cd7eec7

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivegetfragoffhelplibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since cd7eec7 was cd7eec7, checked in by Shane Alcock <salcock@…>, 15 years ago

Added a new format for reading and writing DUCK packets
Added corresponding test cases for DUCK
Removed references to RT_DUCK_* from format_rt
Added a configuration option for meta-data frequency (used solely for DUCK frequency at the moment) and updated other formats to ignore the option

  • Property mode set to 100644
File size: 17.4 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2004 The University of Waikato, Hamilton, New Zealand.
5 * Authors: Daniel Lawson
6 *          Perry Lorier
7 *          Shane Alcock
8 *
9 * All rights reserved.
10 *
11 * This code has been developed by the University of Waikato WAND
12 * research group. For further information please see http://www.wand.net.nz/
13 *
14 * libtrace is free software; you can redistribute it and/or modify
15 * it under the terms of the GNU General Public License as published by
16 * the Free Software Foundation; either version 2 of the License, or
17 * (at your option) any later version.
18 *
19 * libtrace is distributed in the hope that it will be useful,
20 * but WITHOUT ANY WARRANTY; without even the implied warranty of
21 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22 * GNU General Public License for more details.
23 *
24 * You should have received a copy of the GNU General Public License
25 * along with libtrace; if not, write to the Free Software
26 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
27 *
28 * $Id$
29 *
30 */
31
32#define _GNU_SOURCE
33
34#include "config.h"
35#include "common.h"
36#include "libtrace.h"
37#include "libtrace_int.h"
38#include "format_helper.h"
39#include "parse_cmd.h"
40#include "rt_protocol.h"
41
42#include <sys/stat.h>
43#include <assert.h>
44#include <errno.h>
45#include <fcntl.h>
46#include <stdio.h>
47#include <string.h>
48#include <stdlib.h>
49
50#ifndef WIN32
51# include <netdb.h>
52#endif
53
54#define RT_INFO ((struct rt_format_data_t*)libtrace->format_data)
55
56char *rt_deny_reason(uint8_t reason) {
57        char *string = 0;
58
59        switch(reason) {
60                case RT_DENY_WRAPPER:
61                        string = "Rejected by TCP Wrappers";
62                        break;
63                case RT_DENY_FULL:
64                        string = "Max connections reached on server";
65                        break;
66                case RT_DENY_AUTH:
67                        string = "Authentication failed";
68                        break;
69                default:
70                        string = "Unknown reason";
71        }
72
73        return string;
74}
75
76
77struct rt_format_data_t {
78        char *hostname;
79        int port;
80        int input_fd;
81        int reliable;
82        char *pkt_buffer;
83        char *buf_current;
84        int buf_filled;
85
86        libtrace_t *dummy_duck;
87        libtrace_t *dummy_erf;
88        libtrace_t *dummy_pcap;
89        libtrace_t *dummy_wag;
90};
91
92static struct libtrace_format_t rt;
93
94static int rt_connect(libtrace_t *libtrace) {
95        struct hostent *he;
96        struct sockaddr_in remote;
97        rt_header_t connect_msg;
98        rt_deny_conn_t deny_hdr;       
99        rt_hello_t hello_opts;
100        uint8_t reason;
101       
102        if ((he=gethostbyname(RT_INFO->hostname)) == NULL) {
103                perror("gethostbyname");
104                return -1;
105        }
106        if ((RT_INFO->input_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
107                perror("socket");
108                return -1;
109        }
110
111        remote.sin_family = AF_INET;
112        remote.sin_port = htons(RT_INFO->port);
113        remote.sin_addr = *((struct in_addr *)he->h_addr);
114        memset(&(remote.sin_zero), 0, 8);
115
116        if (connect(RT_INFO->input_fd, (struct sockaddr *)&remote,
117                                sizeof(struct sockaddr)) == -1) {
118                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
119                                "Could not connect to host %s on port %d",
120                                RT_INFO->hostname, RT_INFO->port);
121                return -1;
122        }
123
124       
125#if 0
126        oldflags = fcntl(RT_INFO->input_fd, F_GETFL, 0);
127        if (oldflags == -1) {
128                trace_set_err(libtrace, errno,
129                                "Could not get fd flags from fd %d\n",
130                                RT_INFO->input_fd);
131                return -1;
132        }
133        oldflags |= O_NONBLOCK;
134        if (fcntl(RT_INFO->input_fd, F_SETFL, oldflags) == -1) {
135                trace_set_err(libtrace, errno,
136                                "Could not set fd flags for fd %d\n",
137                                RT_INFO->input_fd);
138                return -1;
139        }
140#endif
141       
142       
143        /* We are connected, now receive message from server */
144       
145        if (recv(RT_INFO->input_fd, (void*)&connect_msg, sizeof(rt_header_t), 0) != sizeof(rt_header_t) ) {
146                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
147                                "Could not receive connection message from %s",
148                                RT_INFO->hostname);
149                return -1;
150        }
151       
152        switch (connect_msg.type) {
153                case RT_DENY_CONN:
154                       
155                        if (recv(RT_INFO->input_fd, (void*)&deny_hdr, 
156                                                sizeof(rt_deny_conn_t),
157                                                0) != sizeof(rt_deny_conn_t)) {
158                                reason = 0;
159                        }       
160                        reason = deny_hdr.reason;
161                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
162                                "Connection attempt is denied: %s",
163                                rt_deny_reason(reason));       
164                        return -1;
165                case RT_HELLO:
166                        /* do something with options */
167                        if (recv(RT_INFO->input_fd, (void*)&hello_opts, 
168                                                sizeof(rt_hello_t), 0)
169                                        != sizeof(rt_hello_t)) {
170                                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
171                                        "Failed to receive RT_HELLO options");
172                                return -1;
173                        }
174                        RT_INFO->reliable = hello_opts.reliable;
175                       
176                        return 0;
177                default:
178                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
179                                        "Unknown message type received: %d",
180                                        connect_msg.type);
181                        return -1;
182        }
183        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
184                        "Somehow you managed to reach this unreachable code");
185        return -1;
186}
187
188
189static int rt_init_input(libtrace_t *libtrace) {
190        char *scan;
191        char *uridata = libtrace->uridata;
192        libtrace->format_data = malloc(sizeof(struct rt_format_data_t));
193
194        RT_INFO->dummy_duck = NULL;
195        RT_INFO->dummy_erf = NULL;
196        RT_INFO->dummy_pcap = NULL;
197        RT_INFO->dummy_wag = NULL;
198        RT_INFO->pkt_buffer = NULL;
199        RT_INFO->buf_current = NULL;
200        RT_INFO->buf_filled = 0;
201       
202        if (strlen(uridata) == 0) {
203                RT_INFO->hostname =
204                        strdup("localhost");
205                RT_INFO->port =
206                        COLLECTOR_PORT;
207        } else {
208                if ((scan = strchr(uridata,':')) == NULL) {
209                        RT_INFO->hostname =
210                                strdup(uridata);
211                        RT_INFO->port =
212                                COLLECTOR_PORT;
213                } else {
214                        RT_INFO->hostname =
215                                (char *)strndup(uridata,
216                                                (scan - uridata));
217                        RT_INFO->port =
218                                atoi(++scan);
219                }
220        }
221
222        return rt_connect(libtrace);
223}
224       
225static int rt_start_input(libtrace_t *libtrace) {
226        rt_header_t start_msg;
227
228        start_msg.type = RT_START;
229        start_msg.length = 0; 
230
231       
232        /* Need to send start message to server */
233        if (send(RT_INFO->input_fd, (void*)&start_msg, sizeof(rt_header_t) +
234                                start_msg.length, 0) != sizeof(rt_header_t)) {
235                printf("Failed to send start message to server\n");
236                return -1;
237        }
238
239        return 0;
240}
241
242static int rt_fin_input(libtrace_t *libtrace) {
243        rt_header_t close_msg;
244
245        close_msg.type = RT_CLOSE;
246        close_msg.length = 0; 
247       
248        /* Send a close message to the server */
249        if (send(RT_INFO->input_fd, (void*)&close_msg, sizeof(rt_header_t) + 
250                                close_msg.length, 0) != sizeof(rt_header_t)
251                                + close_msg.length) {
252                printf("Failed to send close message to server\n");
253       
254        }
255        if (RT_INFO->dummy_duck)
256                trace_destroy_dead(RT_INFO->dummy_duck);
257
258        if (RT_INFO->dummy_erf) 
259                trace_destroy_dead(RT_INFO->dummy_erf);
260               
261        if (RT_INFO->dummy_pcap)
262                trace_destroy_dead(RT_INFO->dummy_pcap);
263
264        if (RT_INFO->dummy_wag)
265                trace_destroy_dead(RT_INFO->dummy_wag);
266        close(RT_INFO->input_fd);
267        free(libtrace->format_data);
268        return 0;
269}
270
271#define RT_BUF_SIZE 4000
272
273static int rt_read(libtrace_t *libtrace, void **buffer, size_t len, int block) {
274        int numbytes;
275        rt_header_t *test_hdr;
276       
277        assert(len <= RT_BUF_SIZE);
278       
279        if (!RT_INFO->pkt_buffer) {
280                RT_INFO->pkt_buffer = malloc(RT_BUF_SIZE);
281                RT_INFO->buf_current = RT_INFO->pkt_buffer;
282                RT_INFO->buf_filled = 0;
283        }
284
285#ifndef MSG_DONTWAIT
286#define MSG_DONTWAIT 0
287#endif
288
289        if (block)
290                block=0;
291        else
292                block=MSG_DONTWAIT;
293
294       
295        if (len > RT_INFO->buf_filled) {
296                memcpy(RT_INFO->pkt_buffer, RT_INFO->buf_current, 
297                                RT_INFO->buf_filled);
298                RT_INFO->buf_current = RT_INFO->pkt_buffer;
299               
300#ifndef MSG_NOSIGNAL
301#  define MSG_NOSIGNAL 0
302#endif
303                while (len > RT_INFO->buf_filled) {
304                        if ((numbytes = recv(RT_INFO->input_fd,
305                                                RT_INFO->buf_current + 
306                                                RT_INFO->buf_filled,
307                                                RT_BUF_SIZE-RT_INFO->buf_filled,
308                                                MSG_NOSIGNAL|block)) <= 0) {
309                                if (numbytes == 0) {
310                                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, 
311                                                        "No data received");
312                                        return -1;
313                                }
314                               
315                                if (errno == EINTR) {
316                                        /* ignore EINTR in case
317                                         * a caller is using signals
318                                         */
319                                        continue;
320                                }
321                                if (errno == EAGAIN) {
322                                        trace_set_err(libtrace,
323                                                        EAGAIN,
324                                                        "EAGAIN");
325                                        return -1;
326                                }
327                               
328                                perror("recv");
329                                trace_set_err(libtrace, errno,
330                                                "Failed to read data into rt recv buffer");
331                                return -1;
332                        }
333                        /*
334                        buf_ptr = RT_INFO->pkt_buffer;
335                        for (i = 0; i < RT_BUF_SIZE ; i++) {
336                                       
337                                printf("%02x", (unsigned char)*buf_ptr);
338                                buf_ptr ++;
339                        }
340                        printf("\n");
341                        */
342                        RT_INFO->buf_filled+=numbytes;
343                }
344
345        }
346        *buffer = RT_INFO->buf_current;
347        RT_INFO->buf_current += len;
348        RT_INFO->buf_filled -= len;
349        assert(RT_INFO->buf_filled >= 0);
350        return len;
351}
352
353
354static int rt_set_format(libtrace_t *libtrace, libtrace_packet_t *packet) 
355{
356       
357        if (packet->type >= RT_DATA_PCAP) {
358                if (!RT_INFO->dummy_pcap) {
359                        RT_INFO->dummy_pcap = trace_create_dead("pcap:-");
360                }
361                packet->trace = RT_INFO->dummy_pcap;
362                return 0;       
363        }
364
365        switch (packet->type) {
366                case RT_DUCK_2_4:
367                case RT_DUCK_2_5:
368                        if (!RT_INFO->dummy_duck) {
369                                RT_INFO->dummy_duck = trace_create_dead("duck:dummy");
370                        }
371                        packet->trace = RT_INFO->dummy_duck;
372                        break;
373                case RT_DATA_ERF:
374                        if (!RT_INFO->dummy_erf) {
375                                RT_INFO->dummy_erf = trace_create_dead("erf:-");
376                        }
377                        packet->trace = RT_INFO->dummy_erf;
378                        break;
379                case RT_DATA_WAG:
380                        if (!RT_INFO->dummy_wag) {
381                                RT_INFO->dummy_wag = trace_create_dead("wtf:-");
382                        }
383                        packet->trace = RT_INFO->dummy_wag;
384                        break;
385                case RT_DATA_LEGACY_ETH:
386                case RT_DATA_LEGACY_ATM:
387                case RT_DATA_LEGACY_POS:
388                case RT_DATA_LINUX_NATIVE:
389                        printf("Sending legacy over RT is currently not supported\n");
390                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, "Legacy packet cannot be sent over rt");
391                        return -1;
392                default:
393                        printf("Unrecognised format: %d\n", packet->type);
394                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, "Unrecognised packet format");
395                        return -1;
396        }
397        return 0; /* success */
398}               
399
400static void rt_set_payload(libtrace_packet_t *packet) {
401        dag_record_t *erfptr;
402       
403        switch (packet->type) {
404                case RT_DATA_ERF:
405                        erfptr = (dag_record_t *)packet->header;
406                       
407                        if (erfptr->flags.rxerror == 1) {
408                                packet->payload = NULL;
409                                break;
410                        }
411                        /* else drop into the default case */
412                default:
413                        packet->payload = (char *)packet->buffer +
414                                trace_get_framing_length(packet);
415                        break;
416        }
417}
418
419static int rt_send_ack(libtrace_t *libtrace, 
420                uint32_t seqno)  {
421       
422        static char *ack_buffer = 0;
423        char *buf_ptr;
424        int numbytes = 0;
425        int to_write = 0;
426        rt_header_t *hdr;
427        rt_ack_t *ack_hdr;
428       
429        if (!ack_buffer) {
430                ack_buffer = malloc(sizeof(rt_header_t) + sizeof(rt_ack_t));
431        }
432       
433        hdr = (rt_header_t *) ack_buffer;
434        ack_hdr = (rt_ack_t *) (ack_buffer + sizeof(rt_header_t));
435       
436        hdr->type = RT_ACK;
437        hdr->length = sizeof(rt_ack_t);
438
439        ack_hdr->sequence = seqno;
440       
441        to_write = hdr->length + sizeof(rt_header_t);
442        buf_ptr = ack_buffer;
443
444        while (to_write > 0) {
445                numbytes = send(RT_INFO->input_fd, buf_ptr, to_write, 0); 
446                if (numbytes == -1) {
447                        if (errno == EINTR || errno == EAGAIN) {
448                                continue;
449                        }
450                        else {
451                                printf("Error sending ack\n");
452                                trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, 
453                                                "Error sending ack");
454                                return -1;
455                        }
456                }
457                to_write = to_write - numbytes;
458                buf_ptr = buf_ptr + to_write;
459               
460        }
461
462        return 1;
463}
464
465       
466static int rt_read_packet_versatile(libtrace_t *libtrace,
467                libtrace_packet_t *packet,int blocking) {
468        rt_header_t rt_hdr;
469        static rt_header_t *pkt_hdr = 0;
470        int pkt_size = 0;
471        uint32_t seqno;
472       
473        if (pkt_hdr == 0)
474                pkt_hdr = malloc(sizeof(rt_header_t));
475       
476        if (packet->buf_control == TRACE_CTRL_EXTERNAL || !packet->buffer) {
477                packet->buf_control = TRACE_CTRL_PACKET;
478                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
479        } 
480
481
482        /* FIXME: Better error handling required */
483        if (rt_read(libtrace, (void **)&pkt_hdr, sizeof(rt_header_t),blocking) !=
484                        sizeof(rt_header_t)) {
485                return -1;
486        }
487
488        /* Need to salvage these in case the next rt_read overwrites the
489         * buffer they came from! */
490        packet->type = pkt_hdr->type;
491        pkt_size = pkt_hdr->length;
492        packet->size = pkt_hdr->length;
493        seqno = pkt_hdr->sequence;
494
495        if (packet->type >= RT_DATA_SIMPLE) {
496                if (rt_read(libtrace, &packet->buffer, pkt_size,1) != pkt_size) {
497                        printf("Error receiving packet\n");
498                        return -1;
499                }
500                packet->header = packet->buffer;
501               
502                if (rt_set_format(libtrace, packet) < 0) {
503                        return -1;
504                }
505                rt_set_payload(packet);
506                if (RT_INFO->reliable > 0) {
507                        if (rt_send_ack(libtrace, seqno) 
508                                        == -1)
509                        {
510                                return -1;
511                        }
512                }
513        } else {
514                switch(packet->type) {
515                        case RT_STATUS:
516                                if (rt_read(libtrace, &packet->buffer, 
517                                                        pkt_size,1) !=
518                                                pkt_size) {
519                                        printf("Error receiving status packet\n");
520                                        return -1;
521                                }
522                                packet->header = 0;
523                                packet->payload = packet->buffer;
524                                break;
525                        case RT_DUCK_2_4:
526                        case RT_DUCK_2_5:
527                                if (rt_read(libtrace, &packet->buffer,
528                                                        pkt_size, 1) !=
529                                                pkt_size) {
530                                        printf("Error receiving DUCK packet\n");
531                                        return -1;
532                                }
533                                if (rt_set_format(libtrace, packet) < 0) {
534                                        return -1;
535                                }
536                                packet->header = 0;
537                                packet->payload = packet->buffer;
538                                break;
539                        case RT_END_DATA:
540                                return 0;
541                        case RT_PAUSE_ACK:
542                                /* FIXME: Do something useful */
543                                break;
544                        case RT_OPTION:
545                                /* FIXME: Do something useful here as well */
546                                break;
547                        case RT_KEYCHANGE:
548                                break;
549                        default:
550                                printf("Bad rt type for client receipt: %d\n",
551                                        packet->type);
552                                return -1;
553                }
554        }
555        /* Return the number of bytes read from the stream */
556        return packet->size; 
557}
558
559static int rt_read_packet(libtrace_t *libtrace,
560                libtrace_packet_t *packet) {
561        return rt_read_packet_versatile(libtrace,packet,1);
562}
563
564
565static int rt_get_capture_length(const libtrace_packet_t *packet) {
566        switch (packet->type) {
567                case RT_STATUS:
568                        return sizeof(rt_status_t);
569                case RT_HELLO:
570                        return sizeof(rt_hello_t);
571                case RT_START:
572                        return 0;
573                case RT_ACK:
574                        return sizeof(rt_ack_t);
575                case RT_END_DATA:
576                        return 0;
577                case RT_CLOSE:
578                        return 0;
579                case RT_DENY_CONN:
580                        return sizeof(rt_deny_conn_t);
581                case RT_PAUSE:
582                        return 0; 
583                case RT_PAUSE_ACK:
584                        return 0;
585                case RT_OPTION:
586                        return 0; /* FIXME */
587                case RT_KEYCHANGE:
588                        return 0;
589        }
590        printf("Unknown type: %d\n", packet->type);
591        return 0;
592}
593
594static int rt_get_wire_length(const libtrace_packet_t *packet) {
595        return 0;
596}
597                       
598static int rt_get_framing_length(const libtrace_packet_t *packet) {
599        return 0;
600}
601
602static int rt_get_fd(const libtrace_t *trace) {
603        return ((struct rt_format_data_t *)trace->format_data)->input_fd;
604}
605
606libtrace_eventobj_t trace_event_rt(libtrace_t *trace, libtrace_packet_t *packet) {
607        libtrace_eventobj_t event = {0,0,0.0,0};
608        libtrace_err_t read_err;
609
610        assert(trace);
611        assert(packet);
612       
613        if (trace->format->get_fd) {
614                event.fd = trace->format->get_fd(trace);
615        } else {
616                event.fd = 0;
617        }
618
619        event.size = rt_read_packet_versatile(trace, packet, 0);
620        if (event.size == -1) {
621                read_err = trace_get_err(trace);
622                if (read_err.err_num == EAGAIN) {
623                        event.type = TRACE_EVENT_IOWAIT;
624                }
625                else {
626                        printf("packet error\n");
627                        event.type = TRACE_EVENT_PACKET;
628                }
629        } else if (event.size == 0) {
630                event.type = TRACE_EVENT_TERMINATE;
631               
632        }       
633        else {
634                event.type = TRACE_EVENT_PACKET;
635        }
636       
637        return event;
638}
639
640static void rt_help() {
641        printf("rt format module\n");
642        printf("Supported input URIs:\n");
643        printf("\trt:hostname:port\n");
644        printf("\trt:hostname (connects on default port)\n");
645        printf("\n");
646        printf("\te.g.: rt:localhost\n");
647        printf("\te.g.: rt:localhost:32500\n");
648        printf("\n");
649
650}
651
652
653static struct libtrace_format_t rt = {
654        "rt",
655        "$Id$",
656        TRACE_FORMAT_RT,
657        rt_init_input,                  /* init_input */
658        NULL,                           /* config_input */
659        rt_start_input,                 /* start_input */
660        NULL,                           /* init_output */
661        NULL,                           /* config_output */
662        NULL,                           /* start_output */
663        NULL,                           /* pause_output */
664        rt_fin_input,                   /* fin_input */
665        NULL,                           /* fin_output */
666        rt_read_packet,                 /* read_packet */
667        NULL,                           /* fin_packet */
668        NULL,                           /* write_packet */
669        NULL,                           /* get_link_type */
670        NULL,                           /* get_direction */
671        NULL,                           /* set_direction */
672        NULL,                           /* get_erf_timestamp */
673        NULL,                           /* get_timeval */
674        NULL,                           /* get_seconds */
675        NULL,                           /* seek_erf */
676        NULL,                           /* seek_timeval */
677        NULL,                           /* seek_seconds */
678        rt_get_capture_length,          /* get_capture_length */
679        rt_get_wire_length,                     /* get_wire_length */
680        rt_get_framing_length,          /* get_framing_length */
681        NULL,                           /* set_capture_length */
682        rt_get_fd,                      /* get_fd */
683        trace_event_rt,             /* trace_event */
684        rt_help,                        /* help */
685        NULL                            /* next pointer */
686};
687
688void CONSTRUCTOR rt_constructor() {
689        register_format(&rt);
690}
Note: See TracBrowser for help on using the repository browser.