source: lib/format_rt.c @ e7bc155

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivegetfragoffhelplibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since e7bc155 was 33e1501, checked in by Perry Lorier <perry@…>, 15 years ago

WIN32 compile fixes

  • Property mode set to 100644
File size: 16.8 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2004 The University of Waikato, Hamilton, New Zealand.
5 * Authors: Daniel Lawson
6 *          Perry Lorier
7 *          Shane Alcock
8 *
9 * All rights reserved.
10 *
11 * This code has been developed by the University of Waikato WAND
12 * research group. For further information please see http://www.wand.net.nz/
13 *
14 * libtrace is free software; you can redistribute it and/or modify
15 * it under the terms of the GNU General Public License as published by
16 * the Free Software Foundation; either version 2 of the License, or
17 * (at your option) any later version.
18 *
19 * libtrace is distributed in the hope that it will be useful,
20 * but WITHOUT ANY WARRANTY; without even the implied warranty of
21 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22 * GNU General Public License for more details.
23 *
24 * You should have received a copy of the GNU General Public License
25 * along with libtrace; if not, write to the Free Software
26 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
27 *
28 * $Id$
29 *
30 */
31
32#define _GNU_SOURCE
33
34#include "config.h"
35#include "common.h"
36#include "libtrace.h"
37#include "libtrace_int.h"
38#include "format_helper.h"
39#include "parse_cmd.h"
40#include "rt_protocol.h"
41
42#include <sys/stat.h>
43#include <assert.h>
44#include <errno.h>
45#include <fcntl.h>
46#include <stdio.h>
47#include <string.h>
48#include <stdlib.h>
49
50#ifndef WIN32
51# include <netdb.h>
52#endif
53
54#define RT_INFO ((struct rt_format_data_t*)libtrace->format_data)
55
56int reliability = 0;
57
58char *rt_deny_reason(uint8_t reason) {
59        char *string = 0;
60
61        switch(reason) {
62                case RT_DENY_WRAPPER:
63                        string = "Rejected by TCP Wrappers";
64                        break;
65                case RT_DENY_FULL:
66                        string = "Max connections reached on server";
67                        break;
68                case RT_DENY_AUTH:
69                        string = "Authentication failed";
70                        break;
71                default:
72                        string = "Unknown reason";
73        }
74
75        return string;
76}
77
78
79struct rt_format_data_t {
80        char *hostname;
81        int port;
82        int input_fd;
83        int reliable;
84        char *pkt_buffer;
85        char *buf_current;
86        int buf_left;
87
88       
89        struct libtrace_t *dummy_erf;
90        struct libtrace_t *dummy_pcap;
91        struct libtrace_t *dummy_wag;
92};
93
94static struct libtrace_format_t rt;
95
96static int rt_connect(struct libtrace_t *libtrace) {
97        struct hostent *he;
98        struct sockaddr_in remote;
99        rt_header_t connect_msg;
100        rt_deny_conn_t deny_hdr;       
101        rt_hello_t hello_opts;
102        uint8_t reason;
103       
104        if ((he=gethostbyname(RT_INFO->hostname)) == NULL) {
105                perror("gethostbyname");
106                return -1;
107        }
108        if ((RT_INFO->input_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
109                perror("socket");
110                return -1;
111        }
112
113        remote.sin_family = AF_INET;
114        remote.sin_port = htons(RT_INFO->port);
115        remote.sin_addr = *((struct in_addr *)he->h_addr);
116        memset(&(remote.sin_zero), 0, 8);
117
118        if (connect(RT_INFO->input_fd, (struct sockaddr *)&remote,
119                                sizeof(struct sockaddr)) == -1) {
120                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
121                                "Could not connect to host %s on port %d",
122                                RT_INFO->hostname, RT_INFO->port);
123                return -1;
124        }
125
126       
127#if 0
128        oldflags = fcntl(RT_INFO->input_fd, F_GETFL, 0);
129        if (oldflags == -1) {
130                trace_set_err(libtrace, errno,
131                                "Could not get fd flags from fd %d\n",
132                                RT_INFO->input_fd);
133                return -1;
134        }
135        oldflags |= O_NONBLOCK;
136        if (fcntl(RT_INFO->input_fd, F_SETFL, oldflags) == -1) {
137                trace_set_err(libtrace, errno,
138                                "Could not set fd flags for fd %d\n",
139                                RT_INFO->input_fd);
140                return -1;
141        }
142#endif
143       
144       
145        /* We are connected, now receive message from server */
146       
147        if (recv(RT_INFO->input_fd, &connect_msg, sizeof(rt_header_t), 0) != sizeof(rt_header_t) ) {
148                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
149                                "Could not receive connection message from %s",
150                                RT_INFO->hostname);
151                return -1;
152        }
153       
154        switch (connect_msg.type) {
155                case RT_DENY_CONN:
156                       
157                        if (recv(RT_INFO->input_fd, &deny_hdr, 
158                                                sizeof(rt_deny_conn_t),
159                                                0) != sizeof(rt_deny_conn_t)) {
160                                reason = 0;
161                        }       
162                        reason = deny_hdr.reason;
163                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
164                                "Connection attempt is denied: %s",
165                                rt_deny_reason(reason));       
166                        return -1;
167                case RT_HELLO:
168                        /* do something with options */
169                        if (recv(RT_INFO->input_fd, &hello_opts, 
170                                                sizeof(rt_hello_t), 0)
171                                        != sizeof(rt_hello_t)) {
172                                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
173                                        "Failed to receive RT_HELLO options");
174                                return -1;
175                        }
176                        reliability = hello_opts.reliable;
177                       
178                        return 0;
179                default:
180                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
181                                        "Unknown message type received: %d",
182                                        connect_msg.type);
183                        return -1;
184        }
185        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
186                        "Somehow you managed to reach this unreachable code");
187        return -1;
188}
189
190
191static int rt_init_input(struct libtrace_t *libtrace) {
192        char *scan;
193        char *uridata = libtrace->uridata;
194        libtrace->format_data = malloc(sizeof(struct rt_format_data_t));
195
196        RT_INFO->dummy_erf = NULL;
197        RT_INFO->dummy_pcap = NULL;
198        RT_INFO->dummy_wag = NULL;
199        RT_INFO->pkt_buffer = NULL;
200        RT_INFO->buf_current = NULL;
201        RT_INFO->buf_left = 0;
202       
203        if (strlen(uridata) == 0) {
204                RT_INFO->hostname =
205                        strdup("localhost");
206                RT_INFO->port =
207                        COLLECTOR_PORT;
208        } else {
209                if ((scan = strchr(uridata,':')) == NULL) {
210                        RT_INFO->hostname =
211                                strdup(uridata);
212                        RT_INFO->port =
213                                COLLECTOR_PORT;
214                } else {
215                        RT_INFO->hostname =
216                                (char *)strndup(uridata,
217                                                (scan - uridata));
218                        RT_INFO->port =
219                                atoi(++scan);
220                }
221        }
222
223        return rt_connect(libtrace);
224}
225       
226static int rt_start_input(struct libtrace_t *libtrace) {
227        rt_header_t start_msg;
228
229        start_msg.type = RT_START;
230        start_msg.length = sizeof(rt_start_t);
231
232       
233        /* Need to send start message to server */
234        if (send(RT_INFO->input_fd, &start_msg, sizeof(rt_header_t) +
235                                start_msg.length, 0) != sizeof(rt_header_t)) {
236                printf("Failed to send start message to server\n");
237                return -1;
238        }
239
240        return 0;
241}
242
243static int rt_fin_input(struct libtrace_t *libtrace) {
244        rt_header_t close_msg;
245
246        close_msg.type = RT_CLOSE;
247        close_msg.length = sizeof(rt_close_t);
248       
249        /* Send a close message to the server */
250        if (send(RT_INFO->input_fd, &close_msg, sizeof(rt_header_t) + 
251                                close_msg.length, 0) != sizeof(rt_header_t)
252                                + close_msg.length) {
253                printf("Failed to send close message to server\n");
254       
255        }
256        if (RT_INFO->dummy_erf) 
257                trace_destroy_dead(RT_INFO->dummy_erf);
258               
259        if (RT_INFO->dummy_pcap)
260                trace_destroy_dead(RT_INFO->dummy_pcap);
261
262        if (RT_INFO->dummy_wag)
263                trace_destroy_dead(RT_INFO->dummy_wag);
264        close(RT_INFO->input_fd);
265        free(libtrace->format_data);
266        return 0;
267}
268
269#define RT_BUF_SIZE 4000
270
271static int rt_read(struct libtrace_t *libtrace, void **buffer, size_t len, int block) {
272        int numbytes;
273        char *buf_ptr;
274
275        assert(len <= RT_BUF_SIZE);
276       
277        if (!RT_INFO->pkt_buffer) {
278                RT_INFO->pkt_buffer = malloc(RT_BUF_SIZE);
279                RT_INFO->buf_current = RT_INFO->pkt_buffer;
280                RT_INFO->buf_left = 0;
281        }
282
283#ifndef MSG_DONTWAIT
284#define MSG_DONTWAIT 0
285#endif
286
287        if (block)
288                block=0;
289        else
290                block=MSG_DONTWAIT;
291
292       
293        if (len > RT_INFO->buf_left) {
294                memcpy(RT_INFO->pkt_buffer, RT_INFO->buf_current, 
295                                RT_INFO->buf_left);
296                RT_INFO->buf_current = RT_INFO->pkt_buffer;
297
298#ifndef MSG_NOSIGNAL
299#  define MSG_NOSIGNAL 0
300#endif
301                while (len > RT_INFO->buf_left) {
302                        if ((numbytes = recv(RT_INFO->input_fd,
303                                                RT_INFO->pkt_buffer + 
304                                                RT_INFO->buf_left,
305                                                RT_BUF_SIZE-RT_INFO->buf_left,
306                                                MSG_NOSIGNAL|block)) <= 0) {
307                                if (numbytes == 0) {
308                                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, 
309                                                        "No data received");
310                                        return -1;
311                                }
312                               
313                                if (errno == EINTR) {
314                                        /* ignore EINTR in case
315                                         * a caller is using signals
316                                         */
317                                        continue;
318                                }
319                                if (errno == EAGAIN) {
320                                        trace_set_err(libtrace,
321                                                        EAGAIN,
322                                                        "EAGAIN");
323                                        return -1;
324                                }
325                               
326                                perror("recv");
327                                trace_set_err(libtrace, TRACE_ERR_RECV_FAILED,
328                                                "Failed to read data into rt recv buffer");
329                                return -1;
330                        }
331                        /*
332                        buf_ptr = RT_INFO->pkt_buffer;
333                        for (i = 0; i < RT_BUF_SIZE ; i++) {
334                                       
335                                printf("%02x", (unsigned char)*buf_ptr);
336                                buf_ptr ++;
337                        }
338                        printf("\n");
339                        */
340                        RT_INFO->buf_left+=numbytes;
341                }
342
343        }
344        *buffer = RT_INFO->buf_current;
345        RT_INFO->buf_current += len;
346        RT_INFO->buf_left -= len;
347        assert(RT_INFO->buf_left >= 0);
348        return len;
349}
350
351
352static int rt_set_format(libtrace_t *libtrace, libtrace_packet_t *packet) 
353{
354       
355        if (packet->type >= RT_DATA_PCAP) {
356                if (!RT_INFO->dummy_pcap) {
357                        RT_INFO->dummy_pcap = trace_create_dead("pcap:-");
358                }
359                packet->trace = RT_INFO->dummy_pcap;
360                return 0;       
361        }
362
363        switch (packet->type) {
364                case RT_DATA_ERF:
365                        if (!RT_INFO->dummy_erf) {
366                                RT_INFO->dummy_erf = trace_create_dead("erf:-");
367                        }
368                        packet->trace = RT_INFO->dummy_erf;
369                        break;
370                case RT_DATA_WAG:
371                        if (!RT_INFO->dummy_wag) {
372                                RT_INFO->dummy_wag = trace_create_dead("wtf:-");
373                        }
374                        packet->trace = RT_INFO->dummy_wag;
375                        break;
376                case RT_DATA_LEGACY_ETH:
377                case RT_DATA_LEGACY_ATM:
378                case RT_DATA_LEGACY_POS:
379                        printf("Sending legacy over RT is currently not supported\n");
380                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, "Legacy packet cannot be sent over rt");
381                        return -1;
382                default:
383                        printf("Unrecognised format: %d\n", packet->type);
384                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, "Unrecognised packet format");
385                        return -1;
386        }
387        return 0; /* success */
388}               
389
390static void rt_set_payload(struct libtrace_packet_t *packet) {
391        dag_record_t *erfptr;
392       
393        switch (packet->type) {
394                case RT_DATA_ERF:
395                        erfptr = (dag_record_t *)packet->header;
396                       
397                        if (erfptr->flags.rxerror == 1) {
398                                packet->payload = NULL;
399                                break;
400                        }
401                        /* else drop into the default case */
402                default:
403                        packet->payload = (char *)packet->buffer +
404                                trace_get_framing_length(packet);
405                        break;
406        }
407}
408
409static int rt_send_ack(struct libtrace_t *libtrace, 
410                uint32_t seqno)  {
411       
412        static char *ack_buffer = 0;
413        char *buf_ptr;
414        int numbytes = 0;
415        int to_write = 0;
416        rt_header_t *hdr;
417        rt_ack_t *ack_hdr;
418       
419        if (!ack_buffer) {
420                ack_buffer = malloc(sizeof(rt_header_t) + sizeof(rt_ack_t));
421        }
422       
423        hdr = (rt_header_t *) ack_buffer;
424        ack_hdr = (rt_ack_t *) (ack_buffer + sizeof(rt_header_t));
425       
426        hdr->type = RT_ACK;
427        hdr->length = sizeof(rt_ack_t);
428
429        ack_hdr->sequence = seqno;
430       
431        to_write = hdr->length + sizeof(rt_header_t);
432        buf_ptr = ack_buffer;
433
434       
435        while (to_write > 0) {
436                numbytes = send(RT_INFO->input_fd, buf_ptr, to_write, 0); 
437                if (numbytes == -1) {
438                        if (errno == EINTR || errno == EAGAIN) {
439                                continue;
440                        }
441                        else {
442                                printf("Error sending ack\n");
443                                trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, 
444                                                "Error sending ack");
445                                return -1;
446                        }
447                }
448                to_write = to_write - numbytes;
449                buf_ptr = buf_ptr + to_write;
450               
451        }
452
453        return 1;
454}
455
456       
457static int rt_read_packet_versatile(libtrace_t *libtrace,
458                libtrace_packet_t *packet,int blocking) {
459        rt_header_t rt_hdr;
460        rt_header_t *pkt_hdr = &rt_hdr;
461        int pkt_size = 0;
462       
463       
464        if (packet->buf_control == TRACE_CTRL_EXTERNAL || !packet->buffer) {
465                packet->buf_control = TRACE_CTRL_PACKET;
466                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
467        } 
468
469
470        /* FIXME: Better error handling required */
471        if (rt_read(libtrace, (void **)&pkt_hdr, sizeof(rt_header_t),blocking) !=
472                        sizeof(rt_header_t)) {
473                return -1;
474        }
475
476        packet->type = pkt_hdr->type;
477        pkt_size = pkt_hdr->length;
478        packet->size = pkt_hdr->length;
479
480        if (packet->type >= RT_DATA_SIMPLE) {
481                if (rt_read(libtrace, &packet->buffer, pkt_size,1) != pkt_size) {
482                        printf("Error receiving packet\n");
483                        return -1;
484                }
485                packet->header = packet->buffer;
486               
487                if (rt_set_format(libtrace, packet) < 0) {
488                        return -1;
489                }
490                rt_set_payload(packet);
491
492                if (reliability > 0) {
493                       
494                        if (rt_send_ack(libtrace, pkt_hdr->sequence) 
495                                        == -1)
496                        {
497                                return -1;
498                        }
499                }
500        } else {
501                switch(packet->type) {
502                        case RT_STATUS:
503                        case RT_DUCK:
504                                if (rt_read(libtrace, &packet->buffer, 
505                                                        pkt_size,1) !=
506                                                pkt_size) {
507                                        printf("Error receiving status packet\n");
508                                        return -1;
509                                }
510                                packet->header = 0;
511                                packet->payload = packet->buffer;
512                                break;
513                        case RT_END_DATA:
514                                return 0;
515                        case RT_PAUSE_ACK:
516                                /* FIXME: Do something useful */
517                                break;
518                        case RT_OPTION:
519                                /* FIXME: Do something useful here as well */
520                                break;
521                        case RT_KEYCHANGE:
522                                break;
523                        default:
524                                printf("Bad rt type for client receipt: %d\n",
525                                        pkt_hdr->type);
526                }
527        }
528        /* Return the number of bytes read from the stream */
529        return packet->size; 
530}
531
532static int rt_read_packet(libtrace_t *libtrace,
533                libtrace_packet_t *packet) {
534        return rt_read_packet_versatile(libtrace,packet,1);
535}
536
537
538static int rt_get_capture_length(const struct libtrace_packet_t *packet) {
539        switch (packet->type) {
540                case RT_DUCK:
541                        return sizeof(rt_duck_t);
542                case RT_STATUS:
543                        return sizeof(rt_status_t);
544                case RT_HELLO:
545                        return sizeof(rt_hello_t);
546                case RT_START:
547                        return sizeof(rt_start_t);
548                case RT_ACK:
549                        return sizeof(rt_ack_t);
550                case RT_END_DATA:
551                        return sizeof(rt_end_data_t);
552                case RT_CLOSE:
553                        return sizeof(rt_close_t);
554                case RT_DENY_CONN:
555                        return sizeof(rt_deny_conn_t);
556                case RT_PAUSE:
557                        return sizeof(rt_pause_t);
558                case RT_PAUSE_ACK:
559                        return sizeof(rt_pause_ack_t);
560                case RT_OPTION:
561                        return sizeof(rt_option_t);
562                case RT_KEYCHANGE:
563                        return sizeof(rt_keychange_t);
564        }
565        printf("Unknown type: %d\n", packet->type);
566        return 0;
567}
568
569static int rt_get_wire_length(const libtrace_packet_t *packet) {
570        return 0;
571}
572                       
573static int rt_get_framing_length(const libtrace_packet_t *packet) {
574        return 0;
575}
576
577static int rt_get_fd(const libtrace_t *trace) {
578        return ((struct rt_format_data_t *)trace->format_data)->input_fd;
579}
580
581struct libtrace_eventobj_t trace_event_rt(struct libtrace_t *trace, struct libtrace_packet_t *packet) {
582        struct libtrace_eventobj_t event = {0,0,0.0,0};
583        libtrace_err_t read_err;
584        int data;
585
586        assert(trace);
587        assert(packet);
588       
589        if (trace->format->get_fd) {
590                event.fd = trace->format->get_fd(trace);
591        } else {
592                event.fd = 0;
593        }
594
595        event.size = rt_read_packet_versatile(trace, packet, 0);
596        if (event.size == -1) {
597                read_err = trace_get_err(trace);
598                if (read_err.err_num == EAGAIN) {
599                        event.type = TRACE_EVENT_IOWAIT;
600                }
601                else {
602                        printf("packet error\n");
603                        event.type = TRACE_EVENT_PACKET;
604                }
605        } else if (event.size == 0) {
606                event.type = TRACE_EVENT_TERMINATE;
607               
608        }       
609        else {
610                event.type = TRACE_EVENT_PACKET;
611        }
612       
613        return event;
614}
615
616static void rt_help() {
617        printf("rt format module\n");
618        printf("Supported input URIs:\n");
619        printf("\trt:hostname:port\n");
620        printf("\trt:hostname (connects on default port)\n");
621        printf("\n");
622        printf("\te.g.: rt:localhost\n");
623        printf("\te.g.: rt:localhost:32500\n");
624        printf("\n");
625
626}
627
628
629static struct libtrace_format_t rt = {
630        "rt",
631        "$Id$",
632        TRACE_FORMAT_RT,
633        rt_init_input,                  /* init_input */
634        NULL,                           /* config_input */
635        rt_start_input,                 /* start_input */
636        NULL,                           /* init_output */
637        NULL,                           /* config_output */
638        NULL,                           /* start_output */
639        NULL,                           /* pause_output */
640        rt_fin_input,                   /* fin_input */
641        NULL,                           /* fin_output */
642        rt_read_packet,                 /* read_packet */
643        NULL,                           /* fin_packet */
644        NULL,                           /* write_packet */
645        NULL,                           /* get_link_type */
646        NULL,                           /* get_direction */
647        NULL,                           /* set_direction */
648        NULL,                           /* get_erf_timestamp */
649        NULL,                           /* get_timeval */
650        NULL,                           /* get_seconds */
651        NULL,                           /* seek_erf */
652        NULL,                           /* seek_timeval */
653        NULL,                           /* seek_seconds */
654        rt_get_capture_length,          /* get_capture_length */
655        rt_get_wire_length,                     /* get_wire_length */
656        rt_get_framing_length,          /* get_framing_length */
657        NULL,                           /* set_capture_length */
658        rt_get_fd,                      /* get_fd */
659        trace_event_rt,             /* trace_event */
660        rt_help,                        /* help */
661        NULL                            /* next pointer */
662};
663
664void CONSTRUCTOR rt_constructor() {
665        register_format(&rt);
666}
Note: See TracBrowser for help on using the repository browser.