source: lib/format_rt.c @ e6d963c

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivegetfragoffhelplibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since e6d963c was e6d963c, checked in by Perry Lorier <perry@…>, 15 years ago

Style fixes, get rid of unnecessary "struct"'s

  • Property mode set to 100644
File size: 16.8 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2004 The University of Waikato, Hamilton, New Zealand.
5 * Authors: Daniel Lawson
6 *          Perry Lorier
7 *          Shane Alcock
8 *
9 * All rights reserved.
10 *
11 * This code has been developed by the University of Waikato WAND
12 * research group. For further information please see http://www.wand.net.nz/
13 *
14 * libtrace is free software; you can redistribute it and/or modify
15 * it under the terms of the GNU General Public License as published by
16 * the Free Software Foundation; either version 2 of the License, or
17 * (at your option) any later version.
18 *
19 * libtrace is distributed in the hope that it will be useful,
20 * but WITHOUT ANY WARRANTY; without even the implied warranty of
21 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22 * GNU General Public License for more details.
23 *
24 * You should have received a copy of the GNU General Public License
25 * along with libtrace; if not, write to the Free Software
26 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
27 *
28 * $Id$
29 *
30 */
31
32#define _GNU_SOURCE
33
34#include "config.h"
35#include "common.h"
36#include "libtrace.h"
37#include "libtrace_int.h"
38#include "format_helper.h"
39#include "parse_cmd.h"
40#include "rt_protocol.h"
41
42#include <sys/stat.h>
43#include <assert.h>
44#include <errno.h>
45#include <fcntl.h>
46#include <stdio.h>
47#include <string.h>
48#include <stdlib.h>
49
50#ifndef WIN32
51# include <netdb.h>
52#endif
53
54#define RT_INFO ((struct rt_format_data_t*)libtrace->format_data)
55
56char *rt_deny_reason(uint8_t reason) {
57        char *string = 0;
58
59        switch(reason) {
60                case RT_DENY_WRAPPER:
61                        string = "Rejected by TCP Wrappers";
62                        break;
63                case RT_DENY_FULL:
64                        string = "Max connections reached on server";
65                        break;
66                case RT_DENY_AUTH:
67                        string = "Authentication failed";
68                        break;
69                default:
70                        string = "Unknown reason";
71        }
72
73        return string;
74}
75
76
77struct rt_format_data_t {
78        char *hostname;
79        int port;
80        int input_fd;
81        int reliable;
82        char *pkt_buffer;
83        char *buf_current;
84        int buf_filled;
85
86       
87        libtrace_t *dummy_erf;
88        libtrace_t *dummy_pcap;
89        libtrace_t *dummy_wag;
90};
91
92static struct libtrace_format_t rt;
93
94static int rt_connect(libtrace_t *libtrace) {
95        struct hostent *he;
96        struct sockaddr_in remote;
97        rt_header_t connect_msg;
98        rt_deny_conn_t deny_hdr;       
99        rt_hello_t hello_opts;
100        uint8_t reason;
101       
102        if ((he=gethostbyname(RT_INFO->hostname)) == NULL) {
103                perror("gethostbyname");
104                return -1;
105        }
106        if ((RT_INFO->input_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
107                perror("socket");
108                return -1;
109        }
110
111        remote.sin_family = AF_INET;
112        remote.sin_port = htons(RT_INFO->port);
113        remote.sin_addr = *((struct in_addr *)he->h_addr);
114        memset(&(remote.sin_zero), 0, 8);
115
116        if (connect(RT_INFO->input_fd, (struct sockaddr *)&remote,
117                                sizeof(struct sockaddr)) == -1) {
118                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
119                                "Could not connect to host %s on port %d",
120                                RT_INFO->hostname, RT_INFO->port);
121                return -1;
122        }
123
124       
125#if 0
126        oldflags = fcntl(RT_INFO->input_fd, F_GETFL, 0);
127        if (oldflags == -1) {
128                trace_set_err(libtrace, errno,
129                                "Could not get fd flags from fd %d\n",
130                                RT_INFO->input_fd);
131                return -1;
132        }
133        oldflags |= O_NONBLOCK;
134        if (fcntl(RT_INFO->input_fd, F_SETFL, oldflags) == -1) {
135                trace_set_err(libtrace, errno,
136                                "Could not set fd flags for fd %d\n",
137                                RT_INFO->input_fd);
138                return -1;
139        }
140#endif
141       
142       
143        /* We are connected, now receive message from server */
144       
145        if (recv(RT_INFO->input_fd, (void*)&connect_msg, sizeof(rt_header_t), 0) != sizeof(rt_header_t) ) {
146                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
147                                "Could not receive connection message from %s",
148                                RT_INFO->hostname);
149                return -1;
150        }
151       
152        switch (connect_msg.type) {
153                case RT_DENY_CONN:
154                       
155                        if (recv(RT_INFO->input_fd, (void*)&deny_hdr, 
156                                                sizeof(rt_deny_conn_t),
157                                                0) != sizeof(rt_deny_conn_t)) {
158                                reason = 0;
159                        }       
160                        reason = deny_hdr.reason;
161                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
162                                "Connection attempt is denied: %s",
163                                rt_deny_reason(reason));       
164                        return -1;
165                case RT_HELLO:
166                        /* do something with options */
167                        if (recv(RT_INFO->input_fd, (void*)&hello_opts, 
168                                                sizeof(rt_hello_t), 0)
169                                        != sizeof(rt_hello_t)) {
170                                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
171                                        "Failed to receive RT_HELLO options");
172                                return -1;
173                        }
174                        RT_INFO->reliable = hello_opts.reliable;
175                       
176                        return 0;
177                default:
178                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
179                                        "Unknown message type received: %d",
180                                        connect_msg.type);
181                        return -1;
182        }
183        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
184                        "Somehow you managed to reach this unreachable code");
185        return -1;
186}
187
188
189static int rt_init_input(libtrace_t *libtrace) {
190        char *scan;
191        char *uridata = libtrace->uridata;
192        libtrace->format_data = malloc(sizeof(struct rt_format_data_t));
193
194        RT_INFO->dummy_erf = NULL;
195        RT_INFO->dummy_pcap = NULL;
196        RT_INFO->dummy_wag = NULL;
197        RT_INFO->pkt_buffer = NULL;
198        RT_INFO->buf_current = NULL;
199        RT_INFO->buf_filled = 0;
200       
201        if (strlen(uridata) == 0) {
202                RT_INFO->hostname =
203                        strdup("localhost");
204                RT_INFO->port =
205                        COLLECTOR_PORT;
206        } else {
207                if ((scan = strchr(uridata,':')) == NULL) {
208                        RT_INFO->hostname =
209                                strdup(uridata);
210                        RT_INFO->port =
211                                COLLECTOR_PORT;
212                } else {
213                        RT_INFO->hostname =
214                                (char *)strndup(uridata,
215                                                (scan - uridata));
216                        RT_INFO->port =
217                                atoi(++scan);
218                }
219        }
220
221        return rt_connect(libtrace);
222}
223       
224static int rt_start_input(libtrace_t *libtrace) {
225        rt_header_t start_msg;
226
227        start_msg.type = RT_START;
228        start_msg.length = 0; 
229
230       
231        /* Need to send start message to server */
232        if (send(RT_INFO->input_fd, (void*)&start_msg, sizeof(rt_header_t) +
233                                start_msg.length, 0) != sizeof(rt_header_t)) {
234                printf("Failed to send start message to server\n");
235                return -1;
236        }
237
238        return 0;
239}
240
241static int rt_fin_input(libtrace_t *libtrace) {
242        rt_header_t close_msg;
243
244        close_msg.type = RT_CLOSE;
245        close_msg.length = 0; 
246       
247        /* Send a close message to the server */
248        if (send(RT_INFO->input_fd, (void*)&close_msg, sizeof(rt_header_t) + 
249                                close_msg.length, 0) != sizeof(rt_header_t)
250                                + close_msg.length) {
251                printf("Failed to send close message to server\n");
252       
253        }
254        if (RT_INFO->dummy_erf) 
255                trace_destroy_dead(RT_INFO->dummy_erf);
256               
257        if (RT_INFO->dummy_pcap)
258                trace_destroy_dead(RT_INFO->dummy_pcap);
259
260        if (RT_INFO->dummy_wag)
261                trace_destroy_dead(RT_INFO->dummy_wag);
262        close(RT_INFO->input_fd);
263        free(libtrace->format_data);
264        return 0;
265}
266
267#define RT_BUF_SIZE 4000
268
269static int rt_read(libtrace_t *libtrace, void **buffer, size_t len, int block) {
270        int numbytes;
271        rt_header_t *test_hdr;
272       
273        assert(len <= RT_BUF_SIZE);
274       
275        if (!RT_INFO->pkt_buffer) {
276                RT_INFO->pkt_buffer = malloc(RT_BUF_SIZE);
277                RT_INFO->buf_current = RT_INFO->pkt_buffer;
278                RT_INFO->buf_filled = 0;
279        }
280
281#ifndef MSG_DONTWAIT
282#define MSG_DONTWAIT 0
283#endif
284
285        if (block)
286                block=0;
287        else
288                block=MSG_DONTWAIT;
289
290       
291        if (len > RT_INFO->buf_filled) {
292                memcpy(RT_INFO->pkt_buffer, RT_INFO->buf_current, 
293                                RT_INFO->buf_filled);
294                RT_INFO->buf_current = RT_INFO->pkt_buffer;
295               
296#ifndef MSG_NOSIGNAL
297#  define MSG_NOSIGNAL 0
298#endif
299                while (len > RT_INFO->buf_filled) {
300                        if ((numbytes = recv(RT_INFO->input_fd,
301                                                RT_INFO->buf_current + 
302                                                RT_INFO->buf_filled,
303                                                RT_BUF_SIZE-RT_INFO->buf_filled,
304                                                MSG_NOSIGNAL|block)) <= 0) {
305                                if (numbytes == 0) {
306                                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, 
307                                                        "No data received");
308                                        return -1;
309                                }
310                               
311                                if (errno == EINTR) {
312                                        /* ignore EINTR in case
313                                         * a caller is using signals
314                                         */
315                                        continue;
316                                }
317                                if (errno == EAGAIN) {
318                                        trace_set_err(libtrace,
319                                                        EAGAIN,
320                                                        "EAGAIN");
321                                        return -1;
322                                }
323                               
324                                perror("recv");
325                                trace_set_err(libtrace, errno,
326                                                "Failed to read data into rt recv buffer");
327                                return -1;
328                        }
329                        /*
330                        buf_ptr = RT_INFO->pkt_buffer;
331                        for (i = 0; i < RT_BUF_SIZE ; i++) {
332                                       
333                                printf("%02x", (unsigned char)*buf_ptr);
334                                buf_ptr ++;
335                        }
336                        printf("\n");
337                        */
338                        RT_INFO->buf_filled+=numbytes;
339                }
340
341        }
342        *buffer = RT_INFO->buf_current;
343        RT_INFO->buf_current += len;
344        RT_INFO->buf_filled -= len;
345        assert(RT_INFO->buf_filled >= 0);
346        return len;
347}
348
349
350static int rt_set_format(libtrace_t *libtrace, libtrace_packet_t *packet) 
351{
352       
353        if (packet->type >= RT_DATA_PCAP) {
354                if (!RT_INFO->dummy_pcap) {
355                        RT_INFO->dummy_pcap = trace_create_dead("pcap:-");
356                }
357                packet->trace = RT_INFO->dummy_pcap;
358                return 0;       
359        }
360
361        switch (packet->type) {
362                case RT_DATA_ERF:
363                        if (!RT_INFO->dummy_erf) {
364                                RT_INFO->dummy_erf = trace_create_dead("erf:-");
365                        }
366                        packet->trace = RT_INFO->dummy_erf;
367                        break;
368                case RT_DATA_WAG:
369                        if (!RT_INFO->dummy_wag) {
370                                RT_INFO->dummy_wag = trace_create_dead("wtf:-");
371                        }
372                        packet->trace = RT_INFO->dummy_wag;
373                        break;
374                case RT_DATA_LEGACY_ETH:
375                case RT_DATA_LEGACY_ATM:
376                case RT_DATA_LEGACY_POS:
377                        printf("Sending legacy over RT is currently not supported\n");
378                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, "Legacy packet cannot be sent over rt");
379                        return -1;
380                default:
381                        printf("Unrecognised format: %d\n", packet->type);
382                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, "Unrecognised packet format");
383                        return -1;
384        }
385        return 0; /* success */
386}               
387
388static void rt_set_payload(libtrace_packet_t *packet) {
389        dag_record_t *erfptr;
390       
391        switch (packet->type) {
392                case RT_DATA_ERF:
393                        erfptr = (dag_record_t *)packet->header;
394                       
395                        if (erfptr->flags.rxerror == 1) {
396                                packet->payload = NULL;
397                                break;
398                        }
399                        /* else drop into the default case */
400                default:
401                        packet->payload = (char *)packet->buffer +
402                                trace_get_framing_length(packet);
403                        break;
404        }
405}
406
407static int rt_send_ack(libtrace_t *libtrace, 
408                uint32_t seqno)  {
409       
410        static char *ack_buffer = 0;
411        char *buf_ptr;
412        int numbytes = 0;
413        int to_write = 0;
414        rt_header_t *hdr;
415        rt_ack_t *ack_hdr;
416       
417        if (!ack_buffer) {
418                ack_buffer = malloc(sizeof(rt_header_t) + sizeof(rt_ack_t));
419        }
420       
421        hdr = (rt_header_t *) ack_buffer;
422        ack_hdr = (rt_ack_t *) (ack_buffer + sizeof(rt_header_t));
423       
424        hdr->type = RT_ACK;
425        hdr->length = sizeof(rt_ack_t);
426
427        ack_hdr->sequence = seqno;
428       
429        to_write = hdr->length + sizeof(rt_header_t);
430        buf_ptr = ack_buffer;
431
432        while (to_write > 0) {
433                numbytes = send(RT_INFO->input_fd, buf_ptr, to_write, 0); 
434                if (numbytes == -1) {
435                        if (errno == EINTR || errno == EAGAIN) {
436                                continue;
437                        }
438                        else {
439                                printf("Error sending ack\n");
440                                trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, 
441                                                "Error sending ack");
442                                return -1;
443                        }
444                }
445                to_write = to_write - numbytes;
446                buf_ptr = buf_ptr + to_write;
447               
448        }
449
450        return 1;
451}
452
453       
454static int rt_read_packet_versatile(libtrace_t *libtrace,
455                libtrace_packet_t *packet,int blocking) {
456        rt_header_t rt_hdr;
457        static rt_header_t *pkt_hdr = 0;
458        int pkt_size = 0;
459        uint32_t seqno;
460       
461        if (pkt_hdr == 0)
462                pkt_hdr = malloc(sizeof(rt_header_t));
463       
464        if (packet->buf_control == TRACE_CTRL_EXTERNAL || !packet->buffer) {
465                packet->buf_control = TRACE_CTRL_PACKET;
466                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
467        } 
468
469
470        /* FIXME: Better error handling required */
471        if (rt_read(libtrace, (void **)&pkt_hdr, sizeof(rt_header_t),blocking) !=
472                        sizeof(rt_header_t)) {
473                return -1;
474        }
475
476        /* Need to salvage these in case the next rt_read overwrites the
477         * buffer they came from! */
478        packet->type = pkt_hdr->type;
479        pkt_size = pkt_hdr->length;
480        packet->size = pkt_hdr->length;
481        seqno = pkt_hdr->sequence;
482
483        if (packet->type >= RT_DATA_SIMPLE) {
484                if (rt_read(libtrace, &packet->buffer, pkt_size,1) != pkt_size) {
485                        printf("Error receiving packet\n");
486                        return -1;
487                }
488                packet->header = packet->buffer;
489               
490                if (rt_set_format(libtrace, packet) < 0) {
491                        return -1;
492                }
493                rt_set_payload(packet);
494                if (RT_INFO->reliable > 0) {
495                        if (rt_send_ack(libtrace, seqno) 
496                                        == -1)
497                        {
498                                return -1;
499                        }
500                }
501        } else {
502                switch(packet->type) {
503                        case RT_STATUS:
504                        case RT_DUCK:
505                                if (rt_read(libtrace, &packet->buffer, 
506                                                        pkt_size,1) !=
507                                                pkt_size) {
508                                        printf("Error receiving status packet\n");
509                                        return -1;
510                                }
511                                packet->header = 0;
512                                packet->payload = packet->buffer;
513                                break;
514                        case RT_END_DATA:
515                                return 0;
516                        case RT_PAUSE_ACK:
517                                /* FIXME: Do something useful */
518                                break;
519                        case RT_OPTION:
520                                /* FIXME: Do something useful here as well */
521                                break;
522                        case RT_KEYCHANGE:
523                                break;
524                        default:
525                                printf("Bad rt type for client receipt: %d\n",
526                                        packet->type);
527                }
528        }
529        /* Return the number of bytes read from the stream */
530        return packet->size; 
531}
532
533static int rt_read_packet(libtrace_t *libtrace,
534                libtrace_packet_t *packet) {
535        return rt_read_packet_versatile(libtrace,packet,1);
536}
537
538
539static int rt_get_capture_length(const libtrace_packet_t *packet) {
540        switch (packet->type) {
541                case RT_DUCK:
542                        return 0; /* FIXME */
543                case RT_STATUS:
544                        return sizeof(rt_status_t);
545                case RT_HELLO:
546                        return sizeof(rt_hello_t);
547                case RT_START:
548                        return 0;
549                case RT_ACK:
550                        return sizeof(rt_ack_t);
551                case RT_END_DATA:
552                        return 0;
553                case RT_CLOSE:
554                        return 0;
555                case RT_DENY_CONN:
556                        return sizeof(rt_deny_conn_t);
557                case RT_PAUSE:
558                        return 0; 
559                case RT_PAUSE_ACK:
560                        return 0;
561                case RT_OPTION:
562                        return 0; /* FIXME */
563                case RT_KEYCHANGE:
564                        return 0;
565        }
566        printf("Unknown type: %d\n", packet->type);
567        return 0;
568}
569
570static int rt_get_wire_length(const libtrace_packet_t *packet) {
571        return 0;
572}
573                       
574static int rt_get_framing_length(const libtrace_packet_t *packet) {
575        return 0;
576}
577
578static int rt_get_fd(const libtrace_t *trace) {
579        return ((struct rt_format_data_t *)trace->format_data)->input_fd;
580}
581
582libtrace_eventobj_t trace_event_rt(libtrace_t *trace, libtrace_packet_t *packet) {
583        libtrace_eventobj_t event = {0,0,0.0,0};
584        libtrace_err_t read_err;
585
586        assert(trace);
587        assert(packet);
588       
589        if (trace->format->get_fd) {
590                event.fd = trace->format->get_fd(trace);
591        } else {
592                event.fd = 0;
593        }
594
595        event.size = rt_read_packet_versatile(trace, packet, 0);
596        if (event.size == -1) {
597                read_err = trace_get_err(trace);
598                if (read_err.err_num == EAGAIN) {
599                        event.type = TRACE_EVENT_IOWAIT;
600                }
601                else {
602                        printf("packet error\n");
603                        event.type = TRACE_EVENT_PACKET;
604                }
605        } else if (event.size == 0) {
606                event.type = TRACE_EVENT_TERMINATE;
607               
608        }       
609        else {
610                event.type = TRACE_EVENT_PACKET;
611        }
612       
613        return event;
614}
615
616static void rt_help() {
617        printf("rt format module\n");
618        printf("Supported input URIs:\n");
619        printf("\trt:hostname:port\n");
620        printf("\trt:hostname (connects on default port)\n");
621        printf("\n");
622        printf("\te.g.: rt:localhost\n");
623        printf("\te.g.: rt:localhost:32500\n");
624        printf("\n");
625
626}
627
628
629static struct libtrace_format_t rt = {
630        "rt",
631        "$Id$",
632        TRACE_FORMAT_RT,
633        rt_init_input,                  /* init_input */
634        NULL,                           /* config_input */
635        rt_start_input,                 /* start_input */
636        NULL,                           /* init_output */
637        NULL,                           /* config_output */
638        NULL,                           /* start_output */
639        NULL,                           /* pause_output */
640        rt_fin_input,                   /* fin_input */
641        NULL,                           /* fin_output */
642        rt_read_packet,                 /* read_packet */
643        NULL,                           /* fin_packet */
644        NULL,                           /* write_packet */
645        NULL,                           /* get_link_type */
646        NULL,                           /* get_direction */
647        NULL,                           /* set_direction */
648        NULL,                           /* get_erf_timestamp */
649        NULL,                           /* get_timeval */
650        NULL,                           /* get_seconds */
651        NULL,                           /* seek_erf */
652        NULL,                           /* seek_timeval */
653        NULL,                           /* seek_seconds */
654        rt_get_capture_length,          /* get_capture_length */
655        rt_get_wire_length,                     /* get_wire_length */
656        rt_get_framing_length,          /* get_framing_length */
657        NULL,                           /* set_capture_length */
658        rt_get_fd,                      /* get_fd */
659        trace_event_rt,             /* trace_event */
660        rt_help,                        /* help */
661        NULL                            /* next pointer */
662};
663
664void CONSTRUCTOR rt_constructor() {
665        register_format(&rt);
666}
Note: See TracBrowser for help on using the repository browser.