source: lib/format_rt.c @ acc80de

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivegetfragoffhelplibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since acc80de was acc80de, checked in by Shane Alcock <salcock@…>, 16 years ago

RT now supports linux native socket packets

  • Property mode set to 100644
File size: 17.6 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2004 The University of Waikato, Hamilton, New Zealand.
5 * Authors: Daniel Lawson
6 *          Perry Lorier
7 *          Shane Alcock
8 *
9 * All rights reserved.
10 *
11 * This code has been developed by the University of Waikato WAND
12 * research group. For further information please see http://www.wand.net.nz/
13 *
14 * libtrace is free software; you can redistribute it and/or modify
15 * it under the terms of the GNU General Public License as published by
16 * the Free Software Foundation; either version 2 of the License, or
17 * (at your option) any later version.
18 *
19 * libtrace is distributed in the hope that it will be useful,
20 * but WITHOUT ANY WARRANTY; without even the implied warranty of
21 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22 * GNU General Public License for more details.
23 *
24 * You should have received a copy of the GNU General Public License
25 * along with libtrace; if not, write to the Free Software
26 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
27 *
28 * $Id$
29 *
30 */
31
32#define _GNU_SOURCE
33
34#include "config.h"
35#include "common.h"
36#include "libtrace.h"
37#include "libtrace_int.h"
38#include "format_helper.h"
39#include "parse_cmd.h"
40#include "rt_protocol.h"
41
42#include <sys/stat.h>
43#include <assert.h>
44#include <errno.h>
45#include <fcntl.h>
46#include <stdio.h>
47#include <string.h>
48#include <stdlib.h>
49
50#ifndef WIN32
51# include <netdb.h>
52#endif
53
54#define RT_INFO ((struct rt_format_data_t*)libtrace->format_data)
55
56char *rt_deny_reason(uint8_t reason) {
57        char *string = 0;
58
59        switch(reason) {
60                case RT_DENY_WRAPPER:
61                        string = "Rejected by TCP Wrappers";
62                        break;
63                case RT_DENY_FULL:
64                        string = "Max connections reached on server";
65                        break;
66                case RT_DENY_AUTH:
67                        string = "Authentication failed";
68                        break;
69                default:
70                        string = "Unknown reason";
71        }
72
73        return string;
74}
75
76
77struct rt_format_data_t {
78        char *hostname;
79        int port;
80        int input_fd;
81        int reliable;
82        char *pkt_buffer;
83        char *buf_current;
84        int buf_filled;
85
86        libtrace_t *dummy_duck;
87        libtrace_t *dummy_erf;
88        libtrace_t *dummy_pcap;
89        libtrace_t *dummy_wag;
90        libtrace_t *dummy_linux;
91};
92
93static struct libtrace_format_t rt;
94
95static int rt_connect(libtrace_t *libtrace) {
96        struct hostent *he;
97        struct sockaddr_in remote;
98        rt_header_t connect_msg;
99        rt_deny_conn_t deny_hdr;       
100        rt_hello_t hello_opts;
101        uint8_t reason;
102       
103        if ((he=gethostbyname(RT_INFO->hostname)) == NULL) {
104                perror("gethostbyname");
105                return -1;
106        }
107        if ((RT_INFO->input_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
108                perror("socket");
109                return -1;
110        }
111
112        remote.sin_family = AF_INET;
113        remote.sin_port = htons(RT_INFO->port);
114        remote.sin_addr = *((struct in_addr *)he->h_addr);
115        memset(&(remote.sin_zero), 0, 8);
116
117        if (connect(RT_INFO->input_fd, (struct sockaddr *)&remote,
118                                sizeof(struct sockaddr)) == -1) {
119                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
120                                "Could not connect to host %s on port %d",
121                                RT_INFO->hostname, RT_INFO->port);
122                return -1;
123        }
124
125       
126#if 0
127        oldflags = fcntl(RT_INFO->input_fd, F_GETFL, 0);
128        if (oldflags == -1) {
129                trace_set_err(libtrace, errno,
130                                "Could not get fd flags from fd %d\n",
131                                RT_INFO->input_fd);
132                return -1;
133        }
134        oldflags |= O_NONBLOCK;
135        if (fcntl(RT_INFO->input_fd, F_SETFL, oldflags) == -1) {
136                trace_set_err(libtrace, errno,
137                                "Could not set fd flags for fd %d\n",
138                                RT_INFO->input_fd);
139                return -1;
140        }
141#endif
142       
143       
144        /* We are connected, now receive message from server */
145       
146        if (recv(RT_INFO->input_fd, (void*)&connect_msg, sizeof(rt_header_t), 0) != sizeof(rt_header_t) ) {
147                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
148                                "Could not receive connection message from %s",
149                                RT_INFO->hostname);
150                return -1;
151        }
152       
153        switch (connect_msg.type) {
154                case RT_DENY_CONN:
155                       
156                        if (recv(RT_INFO->input_fd, (void*)&deny_hdr, 
157                                                sizeof(rt_deny_conn_t),
158                                                0) != sizeof(rt_deny_conn_t)) {
159                                reason = 0;
160                        }       
161                        reason = deny_hdr.reason;
162                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
163                                "Connection attempt is denied: %s",
164                                rt_deny_reason(reason));       
165                        return -1;
166                case RT_HELLO:
167                        /* do something with options */
168                        if (recv(RT_INFO->input_fd, (void*)&hello_opts, 
169                                                sizeof(rt_hello_t), 0)
170                                        != sizeof(rt_hello_t)) {
171                                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
172                                        "Failed to receive RT_HELLO options");
173                                return -1;
174                        }
175                        RT_INFO->reliable = hello_opts.reliable;
176                       
177                        return 0;
178                default:
179                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
180                                        "Unknown message type received: %d",
181                                        connect_msg.type);
182                        return -1;
183        }
184        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
185                        "Somehow you managed to reach this unreachable code");
186        return -1;
187}
188
189
190static int rt_init_input(libtrace_t *libtrace) {
191        char *scan;
192        char *uridata = libtrace->uridata;
193        libtrace->format_data = malloc(sizeof(struct rt_format_data_t));
194
195        RT_INFO->dummy_duck = NULL;
196        RT_INFO->dummy_erf = NULL;
197        RT_INFO->dummy_pcap = NULL;
198        RT_INFO->dummy_wag = NULL;
199        RT_INFO->dummy_linux = NULL;
200        RT_INFO->pkt_buffer = NULL;
201        RT_INFO->buf_current = NULL;
202        RT_INFO->buf_filled = 0;
203       
204        if (strlen(uridata) == 0) {
205                RT_INFO->hostname =
206                        strdup("localhost");
207                RT_INFO->port =
208                        COLLECTOR_PORT;
209        } else {
210                if ((scan = strchr(uridata,':')) == NULL) {
211                        RT_INFO->hostname =
212                                strdup(uridata);
213                        RT_INFO->port =
214                                COLLECTOR_PORT;
215                } else {
216                        RT_INFO->hostname =
217                                (char *)strndup(uridata,
218                                                (scan - uridata));
219                        RT_INFO->port =
220                                atoi(++scan);
221                }
222        }
223
224        return rt_connect(libtrace);
225}
226       
227static int rt_start_input(libtrace_t *libtrace) {
228        rt_header_t start_msg;
229
230        start_msg.type = RT_START;
231        start_msg.length = 0; 
232
233       
234        /* Need to send start message to server */
235        if (send(RT_INFO->input_fd, (void*)&start_msg, sizeof(rt_header_t) +
236                                start_msg.length, 0) != sizeof(rt_header_t)) {
237                printf("Failed to send start message to server\n");
238                return -1;
239        }
240
241        return 0;
242}
243
244static int rt_fin_input(libtrace_t *libtrace) {
245        rt_header_t close_msg;
246
247        close_msg.type = RT_CLOSE;
248        close_msg.length = 0; 
249       
250        /* Send a close message to the server */
251        if (send(RT_INFO->input_fd, (void*)&close_msg, sizeof(rt_header_t) + 
252                                close_msg.length, 0) != sizeof(rt_header_t)
253                                + close_msg.length) {
254                printf("Failed to send close message to server\n");
255       
256        }
257        if (RT_INFO->dummy_duck)
258                trace_destroy_dead(RT_INFO->dummy_duck);
259
260        if (RT_INFO->dummy_erf) 
261                trace_destroy_dead(RT_INFO->dummy_erf);
262               
263        if (RT_INFO->dummy_pcap)
264                trace_destroy_dead(RT_INFO->dummy_pcap);
265
266        if (RT_INFO->dummy_wag)
267                trace_destroy_dead(RT_INFO->dummy_wag);
268
269        if (RT_INFO->dummy_linux)
270                trace_destroy_dead(RT_INFO->dummy_linux);
271
272        close(RT_INFO->input_fd);
273        free(libtrace->format_data);
274        return 0;
275}
276
277#define RT_BUF_SIZE 4000
278
279static int rt_read(libtrace_t *libtrace, void **buffer, size_t len, int block) {
280        int numbytes;
281        rt_header_t *test_hdr;
282       
283        assert(len <= RT_BUF_SIZE);
284       
285        if (!RT_INFO->pkt_buffer) {
286                RT_INFO->pkt_buffer = malloc(RT_BUF_SIZE);
287                RT_INFO->buf_current = RT_INFO->pkt_buffer;
288                RT_INFO->buf_filled = 0;
289        }
290
291#ifndef MSG_DONTWAIT
292#define MSG_DONTWAIT 0
293#endif
294
295        if (block)
296                block=0;
297        else
298                block=MSG_DONTWAIT;
299
300       
301        if (len > RT_INFO->buf_filled) {
302                memcpy(RT_INFO->pkt_buffer, RT_INFO->buf_current, 
303                                RT_INFO->buf_filled);
304                RT_INFO->buf_current = RT_INFO->pkt_buffer;
305               
306#ifndef MSG_NOSIGNAL
307#  define MSG_NOSIGNAL 0
308#endif
309                while (len > RT_INFO->buf_filled) {
310                        if ((numbytes = recv(RT_INFO->input_fd,
311                                                RT_INFO->buf_current + 
312                                                RT_INFO->buf_filled,
313                                                RT_BUF_SIZE-RT_INFO->buf_filled,
314                                                MSG_NOSIGNAL|block)) <= 0) {
315                                if (numbytes == 0) {
316                                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, 
317                                                        "No data received");
318                                        return -1;
319                                }
320                               
321                                if (errno == EINTR) {
322                                        /* ignore EINTR in case
323                                         * a caller is using signals
324                                         */
325                                        continue;
326                                }
327                                if (errno == EAGAIN) {
328                                        trace_set_err(libtrace,
329                                                        EAGAIN,
330                                                        "EAGAIN");
331                                        return -1;
332                                }
333                               
334                                perror("recv");
335                                trace_set_err(libtrace, errno,
336                                                "Failed to read data into rt recv buffer");
337                                return -1;
338                        }
339                        /*
340                        buf_ptr = RT_INFO->pkt_buffer;
341                        for (i = 0; i < RT_BUF_SIZE ; i++) {
342                                       
343                                printf("%02x", (unsigned char)*buf_ptr);
344                                buf_ptr ++;
345                        }
346                        printf("\n");
347                        */
348                        RT_INFO->buf_filled+=numbytes;
349                }
350
351        }
352        *buffer = RT_INFO->buf_current;
353        RT_INFO->buf_current += len;
354        RT_INFO->buf_filled -= len;
355        assert(RT_INFO->buf_filled >= 0);
356        return len;
357}
358
359
360static int rt_set_format(libtrace_t *libtrace, libtrace_packet_t *packet) 
361{
362       
363        if (packet->type >= RT_DATA_PCAP) {
364                if (!RT_INFO->dummy_pcap) {
365                        RT_INFO->dummy_pcap = trace_create_dead("pcap:-");
366                }
367                packet->trace = RT_INFO->dummy_pcap;
368                return 0;       
369        }
370
371        switch (packet->type) {
372                case RT_DUCK_2_4:
373                case RT_DUCK_2_5:
374                        if (!RT_INFO->dummy_duck) {
375                                RT_INFO->dummy_duck = trace_create_dead("duck:dummy");
376                        }
377                        packet->trace = RT_INFO->dummy_duck;
378                        break;
379                case RT_DATA_ERF:
380                        if (!RT_INFO->dummy_erf) {
381                                RT_INFO->dummy_erf = trace_create_dead("erf:-");
382                        }
383                        packet->trace = RT_INFO->dummy_erf;
384                        break;
385                case RT_DATA_WAG:
386                        if (!RT_INFO->dummy_wag) {
387                                RT_INFO->dummy_wag = trace_create_dead("wtf:-");
388                        }
389                        packet->trace = RT_INFO->dummy_wag;
390                        break;
391                case RT_DATA_LINUX_NATIVE:
392                        if (!RT_INFO->dummy_linux) {
393                                RT_INFO->dummy_linux = trace_create_dead("int:");
394                        }
395                        packet->trace = RT_INFO->dummy_linux;
396                        break;
397                case RT_DATA_LEGACY_ETH:
398                case RT_DATA_LEGACY_ATM:
399                case RT_DATA_LEGACY_POS:
400                        printf("Sending legacy over RT is currently not supported\n");
401                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, "Legacy packet cannot be sent over rt");
402                        return -1;
403                default:
404                        printf("Unrecognised format: %d\n", packet->type);
405                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, "Unrecognised packet format");
406                        return -1;
407        }
408        return 0; /* success */
409}               
410
411static void rt_set_payload(libtrace_packet_t *packet) {
412        dag_record_t *erfptr;
413       
414        switch (packet->type) {
415                case RT_DATA_ERF:
416                        erfptr = (dag_record_t *)packet->header;
417                       
418                        if (erfptr->flags.rxerror == 1) {
419                                packet->payload = NULL;
420                                break;
421                        }
422                        /* else drop into the default case */
423                default:
424                        packet->payload = (char *)packet->buffer +
425                                trace_get_framing_length(packet);
426                        break;
427        }
428}
429
430static int rt_send_ack(libtrace_t *libtrace, 
431                uint32_t seqno)  {
432       
433        static char *ack_buffer = 0;
434        char *buf_ptr;
435        int numbytes = 0;
436        int to_write = 0;
437        rt_header_t *hdr;
438        rt_ack_t *ack_hdr;
439       
440        if (!ack_buffer) {
441                ack_buffer = malloc(sizeof(rt_header_t) + sizeof(rt_ack_t));
442        }
443       
444        hdr = (rt_header_t *) ack_buffer;
445        ack_hdr = (rt_ack_t *) (ack_buffer + sizeof(rt_header_t));
446       
447        hdr->type = RT_ACK;
448        hdr->length = sizeof(rt_ack_t);
449
450        ack_hdr->sequence = seqno;
451       
452        to_write = hdr->length + sizeof(rt_header_t);
453        buf_ptr = ack_buffer;
454
455        while (to_write > 0) {
456                numbytes = send(RT_INFO->input_fd, buf_ptr, to_write, 0); 
457                if (numbytes == -1) {
458                        if (errno == EINTR || errno == EAGAIN) {
459                                continue;
460                        }
461                        else {
462                                printf("Error sending ack\n");
463                                trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, 
464                                                "Error sending ack");
465                                return -1;
466                        }
467                }
468                to_write = to_write - numbytes;
469                buf_ptr = buf_ptr + to_write;
470               
471        }
472
473        return 1;
474}
475
476       
477static int rt_read_packet_versatile(libtrace_t *libtrace,
478                libtrace_packet_t *packet,int blocking) {
479        rt_header_t rt_hdr;
480        static rt_header_t *pkt_hdr = 0;
481        int pkt_size = 0;
482        uint32_t seqno;
483       
484        if (pkt_hdr == 0)
485                pkt_hdr = malloc(sizeof(rt_header_t));
486       
487        if (packet->buf_control == TRACE_CTRL_EXTERNAL || !packet->buffer) {
488                packet->buf_control = TRACE_CTRL_PACKET;
489                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
490        } 
491
492
493        /* FIXME: Better error handling required */
494        if (rt_read(libtrace, (void **)&pkt_hdr, sizeof(rt_header_t),blocking) !=
495                        sizeof(rt_header_t)) {
496                return -1;
497        }
498
499        /* Need to salvage these in case the next rt_read overwrites the
500         * buffer they came from! */
501        packet->type = pkt_hdr->type;
502        pkt_size = pkt_hdr->length;
503        packet->size = pkt_hdr->length;
504        seqno = pkt_hdr->sequence;
505
506        if (packet->type >= RT_DATA_SIMPLE) {
507                if (rt_read(libtrace, &packet->buffer, pkt_size,1) != pkt_size) {
508                        printf("Error receiving packet\n");
509                        return -1;
510                }
511                packet->header = packet->buffer;
512               
513                if (rt_set_format(libtrace, packet) < 0) {
514                        return -1;
515                }
516                rt_set_payload(packet);
517                if (RT_INFO->reliable > 0) {
518                        if (rt_send_ack(libtrace, seqno) 
519                                        == -1)
520                        {
521                                return -1;
522                        }
523                }
524        } else {
525                switch(packet->type) {
526                        case RT_STATUS:
527                                if (rt_read(libtrace, &packet->buffer, 
528                                                        pkt_size,1) !=
529                                                pkt_size) {
530                                        printf("Error receiving status packet\n");
531                                        return -1;
532                                }
533                                packet->header = 0;
534                                packet->payload = packet->buffer;
535                                break;
536                        case RT_DUCK_2_4:
537                        case RT_DUCK_2_5:
538                                if (rt_read(libtrace, &packet->buffer,
539                                                        pkt_size, 1) !=
540                                                pkt_size) {
541                                        printf("Error receiving DUCK packet\n");
542                                        return -1;
543                                }
544                                if (rt_set_format(libtrace, packet) < 0) {
545                                        return -1;
546                                }
547                                packet->header = 0;
548                                packet->payload = packet->buffer;
549                                break;
550                        case RT_END_DATA:
551                                return 0;
552                        case RT_PAUSE_ACK:
553                                /* FIXME: Do something useful */
554                                break;
555                        case RT_OPTION:
556                                /* FIXME: Do something useful here as well */
557                                break;
558                        case RT_KEYCHANGE:
559                                break;
560                        default:
561                                printf("Bad rt type for client receipt: %d\n",
562                                        packet->type);
563                                return -1;
564                }
565        }
566        /* Return the number of bytes read from the stream */
567        return packet->size; 
568}
569
570static int rt_read_packet(libtrace_t *libtrace,
571                libtrace_packet_t *packet) {
572        return rt_read_packet_versatile(libtrace,packet,1);
573}
574
575
576static int rt_get_capture_length(const libtrace_packet_t *packet) {
577        switch (packet->type) {
578                case RT_STATUS:
579                        return sizeof(rt_status_t);
580                case RT_HELLO:
581                        return sizeof(rt_hello_t);
582                case RT_START:
583                        return 0;
584                case RT_ACK:
585                        return sizeof(rt_ack_t);
586                case RT_END_DATA:
587                        return 0;
588                case RT_CLOSE:
589                        return 0;
590                case RT_DENY_CONN:
591                        return sizeof(rt_deny_conn_t);
592                case RT_PAUSE:
593                        return 0; 
594                case RT_PAUSE_ACK:
595                        return 0;
596                case RT_OPTION:
597                        return 0; /* FIXME */
598                case RT_KEYCHANGE:
599                        return 0;
600        }
601        printf("Unknown type: %d\n", packet->type);
602        return 0;
603}
604
605static int rt_get_wire_length(const libtrace_packet_t *packet) {
606        return 0;
607}
608                       
609static int rt_get_framing_length(const libtrace_packet_t *packet) {
610        return 0;
611}
612
613static int rt_get_fd(const libtrace_t *trace) {
614        return ((struct rt_format_data_t *)trace->format_data)->input_fd;
615}
616
617libtrace_eventobj_t trace_event_rt(libtrace_t *trace, libtrace_packet_t *packet) {
618        libtrace_eventobj_t event = {0,0,0.0,0};
619        libtrace_err_t read_err;
620
621        assert(trace);
622        assert(packet);
623       
624        if (trace->format->get_fd) {
625                event.fd = trace->format->get_fd(trace);
626        } else {
627                event.fd = 0;
628        }
629
630        event.size = rt_read_packet_versatile(trace, packet, 0);
631        if (event.size == -1) {
632                read_err = trace_get_err(trace);
633                if (read_err.err_num == EAGAIN) {
634                        event.type = TRACE_EVENT_IOWAIT;
635                }
636                else {
637                        printf("packet error\n");
638                        event.type = TRACE_EVENT_PACKET;
639                }
640        } else if (event.size == 0) {
641                event.type = TRACE_EVENT_TERMINATE;
642               
643        }       
644        else {
645                event.type = TRACE_EVENT_PACKET;
646        }
647       
648        return event;
649}
650
651static void rt_help() {
652        printf("rt format module\n");
653        printf("Supported input URIs:\n");
654        printf("\trt:hostname:port\n");
655        printf("\trt:hostname (connects on default port)\n");
656        printf("\n");
657        printf("\te.g.: rt:localhost\n");
658        printf("\te.g.: rt:localhost:32500\n");
659        printf("\n");
660
661}
662
663
664static struct libtrace_format_t rt = {
665        "rt",
666        "$Id$",
667        TRACE_FORMAT_RT,
668        rt_init_input,                  /* init_input */
669        NULL,                           /* config_input */
670        rt_start_input,                 /* start_input */
671        NULL,                           /* init_output */
672        NULL,                           /* config_output */
673        NULL,                           /* start_output */
674        NULL,                           /* pause_output */
675        rt_fin_input,                   /* fin_input */
676        NULL,                           /* fin_output */
677        rt_read_packet,                 /* read_packet */
678        NULL,                           /* fin_packet */
679        NULL,                           /* write_packet */
680        NULL,                           /* get_link_type */
681        NULL,                           /* get_direction */
682        NULL,                           /* set_direction */
683        NULL,                           /* get_erf_timestamp */
684        NULL,                           /* get_timeval */
685        NULL,                           /* get_seconds */
686        NULL,                           /* seek_erf */
687        NULL,                           /* seek_timeval */
688        NULL,                           /* seek_seconds */
689        rt_get_capture_length,          /* get_capture_length */
690        rt_get_wire_length,                     /* get_wire_length */
691        rt_get_framing_length,          /* get_framing_length */
692        NULL,                           /* set_capture_length */
693        rt_get_fd,                      /* get_fd */
694        trace_event_rt,             /* trace_event */
695        rt_help,                        /* help */
696        NULL                            /* next pointer */
697};
698
699void CONSTRUCTOR rt_constructor() {
700        register_format(&rt);
701}
Note: See TracBrowser for help on using the repository browser.