source: lib/format_rt.c @ c0cd256

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivegetfragoffhelplibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since c0cd256 was c0cd256, checked in by Perry Lorier <perry@…>, 15 years ago

Remove old CONSTRUCTOR initialisation code, and only rely on the newer
trace_init() function.

  • Property mode set to 100644
File size: 17.8 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2004 The University of Waikato, Hamilton, New Zealand.
5 * Authors: Daniel Lawson
6 *          Perry Lorier
7 *          Shane Alcock
8 *
9 * All rights reserved.
10 *
11 * This code has been developed by the University of Waikato WAND
12 * research group. For further information please see http://www.wand.net.nz/
13 *
14 * libtrace is free software; you can redistribute it and/or modify
15 * it under the terms of the GNU General Public License as published by
16 * the Free Software Foundation; either version 2 of the License, or
17 * (at your option) any later version.
18 *
19 * libtrace is distributed in the hope that it will be useful,
20 * but WITHOUT ANY WARRANTY; without even the implied warranty of
21 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22 * GNU General Public License for more details.
23 *
24 * You should have received a copy of the GNU General Public License
25 * along with libtrace; if not, write to the Free Software
26 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
27 *
28 * $Id$
29 *
30 */
31
32#define _GNU_SOURCE
33
34#include "config.h"
35#include "common.h"
36#include "libtrace.h"
37#include "libtrace_int.h"
38#include "format_helper.h"
39#include "parse_cmd.h"
40#include "rt_protocol.h"
41
42#include <sys/stat.h>
43#include <assert.h>
44#include <errno.h>
45#include <fcntl.h>
46#include <stdio.h>
47#include <string.h>
48#include <stdlib.h>
49
50#ifndef WIN32
51# include <netdb.h>
52#endif
53
54#define RT_INFO ((struct rt_format_data_t*)libtrace->format_data)
55
56char *rt_deny_reason(uint8_t reason) {
57        char *string = 0;
58
59        switch(reason) {
60                case RT_DENY_WRAPPER:
61                        string = "Rejected by TCP Wrappers";
62                        break;
63                case RT_DENY_FULL:
64                        string = "Max connections reached on server";
65                        break;
66                case RT_DENY_AUTH:
67                        string = "Authentication failed";
68                        break;
69                default:
70                        string = "Unknown reason";
71        }
72
73        return string;
74}
75
76
77struct rt_format_data_t {
78        char *hostname;
79        int port;
80        int input_fd;
81        int reliable;
82        char *pkt_buffer;
83        char *buf_current;
84        int buf_filled;
85
86        libtrace_t *dummy_duck;
87        libtrace_t *dummy_erf;
88        libtrace_t *dummy_pcap;
89        libtrace_t *dummy_wag;
90        libtrace_t *dummy_linux;
91};
92
93static struct libtrace_format_t rt;
94
95static int rt_connect(libtrace_t *libtrace) {
96        struct hostent *he;
97        struct sockaddr_in remote;
98        rt_header_t connect_msg;
99        rt_deny_conn_t deny_hdr;       
100        rt_hello_t hello_opts;
101        uint8_t reason;
102       
103        if ((he=gethostbyname(RT_INFO->hostname)) == NULL) {
104                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
105                                "Failed to convert hostname %s to address",
106                                RT_INFO->hostname);
107                return -1;
108        }
109        if ((RT_INFO->input_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
110                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
111                                "Could not create socket");
112                return -1;
113        }
114
115        remote.sin_family = AF_INET;
116        remote.sin_port = htons(RT_INFO->port);
117        remote.sin_addr = *((struct in_addr *)he->h_addr);
118        memset(&(remote.sin_zero), 0, 8);
119
120        if (connect(RT_INFO->input_fd, (struct sockaddr *)&remote,
121                                sizeof(struct sockaddr)) == -1) {
122                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
123                                "Could not connect to host %s on port %d",
124                                RT_INFO->hostname, RT_INFO->port);
125                return -1;
126        }
127
128       
129#if 0
130        oldflags = fcntl(RT_INFO->input_fd, F_GETFL, 0);
131        if (oldflags == -1) {
132                trace_set_err(libtrace, errno,
133                                "Could not get fd flags from fd %d\n",
134                                RT_INFO->input_fd);
135                return -1;
136        }
137        oldflags |= O_NONBLOCK;
138        if (fcntl(RT_INFO->input_fd, F_SETFL, oldflags) == -1) {
139                trace_set_err(libtrace, errno,
140                                "Could not set fd flags for fd %d\n",
141                                RT_INFO->input_fd);
142                return -1;
143        }
144#endif
145       
146       
147        /* We are connected, now receive message from server */
148       
149        if (recv(RT_INFO->input_fd, (void*)&connect_msg, sizeof(rt_header_t), 0) != sizeof(rt_header_t) ) {
150                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
151                                "Could not receive connection message from %s",
152                                RT_INFO->hostname);
153                return -1;
154        }
155       
156        switch (connect_msg.type) {
157                case RT_DENY_CONN:
158                       
159                        if (recv(RT_INFO->input_fd, (void*)&deny_hdr, 
160                                                sizeof(rt_deny_conn_t),
161                                                0) != sizeof(rt_deny_conn_t)) {
162                                reason = 0;
163                        }       
164                        reason = deny_hdr.reason;
165                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
166                                "Connection attempt is denied: %s",
167                                rt_deny_reason(reason));       
168                        return -1;
169                case RT_HELLO:
170                        /* do something with options */
171                        if (recv(RT_INFO->input_fd, (void*)&hello_opts, 
172                                                sizeof(rt_hello_t), 0)
173                                        != sizeof(rt_hello_t)) {
174                                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
175                                        "Failed to receive RT_HELLO options");
176                                return -1;
177                        }
178                        RT_INFO->reliable = hello_opts.reliable;
179                       
180                        return 0;
181                default:
182                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
183                                        "Unknown message type received: %d",
184                                        connect_msg.type);
185                        return -1;
186        }
187        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
188                        "Somehow you managed to reach this unreachable code");
189        return -1;
190}
191
192
193static int rt_init_input(libtrace_t *libtrace) {
194        char *scan;
195        char *uridata = libtrace->uridata;
196        libtrace->format_data = malloc(sizeof(struct rt_format_data_t));
197
198        RT_INFO->dummy_duck = NULL;
199        RT_INFO->dummy_erf = NULL;
200        RT_INFO->dummy_pcap = NULL;
201        RT_INFO->dummy_wag = NULL;
202        RT_INFO->dummy_linux = NULL;
203        RT_INFO->pkt_buffer = NULL;
204        RT_INFO->buf_current = NULL;
205        RT_INFO->buf_filled = 0;
206       
207        if (strlen(uridata) == 0) {
208                RT_INFO->hostname =
209                        strdup("localhost");
210                RT_INFO->port =
211                        COLLECTOR_PORT;
212        } else {
213                if ((scan = strchr(uridata,':')) == NULL) {
214                        RT_INFO->hostname =
215                                strdup(uridata);
216                        RT_INFO->port =
217                                COLLECTOR_PORT;
218                } else {
219                        RT_INFO->hostname =
220                                (char *)strndup(uridata,
221                                                (scan - uridata));
222                        RT_INFO->port =
223                                atoi(++scan);
224                }
225        }
226
227        return rt_connect(libtrace);
228}
229       
230static int rt_start_input(libtrace_t *libtrace) {
231        rt_header_t start_msg;
232
233        start_msg.type = RT_START;
234        start_msg.length = 0; 
235
236       
237        /* Need to send start message to server */
238        if (send(RT_INFO->input_fd, (void*)&start_msg, sizeof(rt_header_t) +
239                                start_msg.length, 0) != sizeof(rt_header_t)) {
240                printf("Failed to send start message to server\n");
241                return -1;
242        }
243
244        return 0;
245}
246
247static int rt_fin_input(libtrace_t *libtrace) {
248        rt_header_t close_msg;
249
250        close_msg.type = RT_CLOSE;
251        close_msg.length = 0; 
252       
253        /* Send a close message to the server */
254        if (send(RT_INFO->input_fd, (void*)&close_msg, sizeof(rt_header_t) + 
255                                close_msg.length, 0) != sizeof(rt_header_t)
256                                + close_msg.length) {
257                printf("Failed to send close message to server\n");
258       
259        }
260        if (RT_INFO->dummy_duck)
261                trace_destroy_dead(RT_INFO->dummy_duck);
262
263        if (RT_INFO->dummy_erf) 
264                trace_destroy_dead(RT_INFO->dummy_erf);
265               
266        if (RT_INFO->dummy_pcap)
267                trace_destroy_dead(RT_INFO->dummy_pcap);
268
269        if (RT_INFO->dummy_wag)
270                trace_destroy_dead(RT_INFO->dummy_wag);
271
272        if (RT_INFO->dummy_linux)
273                trace_destroy_dead(RT_INFO->dummy_linux);
274
275        close(RT_INFO->input_fd);
276        free(libtrace->format_data);
277        return 0;
278}
279
280#define RT_BUF_SIZE 4000
281
282static int rt_read(libtrace_t *libtrace, void **buffer, size_t len, int block) {
283        int numbytes;
284        rt_header_t *test_hdr;
285       
286        assert(len <= RT_BUF_SIZE);
287       
288        if (!RT_INFO->pkt_buffer) {
289                RT_INFO->pkt_buffer = malloc(RT_BUF_SIZE);
290                RT_INFO->buf_current = RT_INFO->pkt_buffer;
291                RT_INFO->buf_filled = 0;
292        }
293
294#ifndef MSG_DONTWAIT
295#define MSG_DONTWAIT 0
296#endif
297
298        if (block)
299                block=0;
300        else
301                block=MSG_DONTWAIT;
302
303       
304        if (len > RT_INFO->buf_filled) {
305                memcpy(RT_INFO->pkt_buffer, RT_INFO->buf_current, 
306                                RT_INFO->buf_filled);
307                RT_INFO->buf_current = RT_INFO->pkt_buffer;
308               
309#ifndef MSG_NOSIGNAL
310#  define MSG_NOSIGNAL 0
311#endif
312                while (len > RT_INFO->buf_filled) {
313                        if ((numbytes = recv(RT_INFO->input_fd,
314                                                RT_INFO->buf_current + 
315                                                RT_INFO->buf_filled,
316                                                RT_BUF_SIZE-RT_INFO->buf_filled,
317                                                MSG_NOSIGNAL|block)) <= 0) {
318                                if (numbytes == 0) {
319                                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, 
320                                                        "No data received");
321                                        return -1;
322                                }
323                               
324                                if (errno == EINTR) {
325                                        /* ignore EINTR in case
326                                         * a caller is using signals
327                                         */
328                                        continue;
329                                }
330                                if (errno == EAGAIN) {
331                                        trace_set_err(libtrace,
332                                                        EAGAIN,
333                                                        "EAGAIN");
334                                        return -1;
335                                }
336                               
337                                perror("recv");
338                                trace_set_err(libtrace, errno,
339                                                "Failed to read data into rt recv buffer");
340                                return -1;
341                        }
342                        /*
343                        buf_ptr = RT_INFO->pkt_buffer;
344                        for (i = 0; i < RT_BUF_SIZE ; i++) {
345                                       
346                                printf("%02x", (unsigned char)*buf_ptr);
347                                buf_ptr ++;
348                        }
349                        printf("\n");
350                        */
351                        RT_INFO->buf_filled+=numbytes;
352                }
353
354        }
355        *buffer = RT_INFO->buf_current;
356        RT_INFO->buf_current += len;
357        RT_INFO->buf_filled -= len;
358        assert(RT_INFO->buf_filled >= 0);
359        return len;
360}
361
362
363static int rt_set_format(libtrace_t *libtrace, libtrace_packet_t *packet) 
364{
365       
366        if (packet->type >= RT_DATA_PCAP) {
367                if (!RT_INFO->dummy_pcap) {
368                        RT_INFO->dummy_pcap = trace_create_dead("pcap:-");
369                }
370                packet->trace = RT_INFO->dummy_pcap;
371                return 0;       
372        }
373
374        switch (packet->type) {
375                case RT_DUCK_2_4:
376                case RT_DUCK_2_5:
377                        if (!RT_INFO->dummy_duck) {
378                                RT_INFO->dummy_duck = trace_create_dead("duck:dummy");
379                        }
380                        packet->trace = RT_INFO->dummy_duck;
381                        break;
382                case RT_DATA_ERF:
383                        if (!RT_INFO->dummy_erf) {
384                                RT_INFO->dummy_erf = trace_create_dead("erf:-");
385                        }
386                        packet->trace = RT_INFO->dummy_erf;
387                        break;
388                case RT_DATA_WAG:
389                        if (!RT_INFO->dummy_wag) {
390                                RT_INFO->dummy_wag = trace_create_dead("wtf:-");
391                        }
392                        packet->trace = RT_INFO->dummy_wag;
393                        break;
394                case RT_DATA_LINUX_NATIVE:
395                        if (!RT_INFO->dummy_linux) {
396                                RT_INFO->dummy_linux = trace_create_dead("int:");
397                        }
398                        packet->trace = RT_INFO->dummy_linux;
399                        break;
400                case RT_DATA_LEGACY_ETH:
401                case RT_DATA_LEGACY_ATM:
402                case RT_DATA_LEGACY_POS:
403                        printf("Sending legacy over RT is currently not supported\n");
404                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, "Legacy packet cannot be sent over rt");
405                        return -1;
406                default:
407                        printf("Unrecognised format: %d\n", packet->type);
408                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, "Unrecognised packet format");
409                        return -1;
410        }
411        return 0; /* success */
412}               
413
414static void rt_set_payload(libtrace_packet_t *packet) {
415        dag_record_t *erfptr;
416       
417        switch (packet->type) {
418                case RT_DATA_ERF:
419                        erfptr = (dag_record_t *)packet->header;
420                       
421                        if (erfptr->flags.rxerror == 1) {
422                                packet->payload = NULL;
423                                break;
424                        }
425                        /* else drop into the default case */
426                default:
427                        packet->payload = (char *)packet->buffer +
428                                trace_get_framing_length(packet);
429                        break;
430        }
431}
432
433static int rt_send_ack(libtrace_t *libtrace, 
434                uint32_t seqno)  {
435       
436        static char *ack_buffer = 0;
437        char *buf_ptr;
438        int numbytes = 0;
439        int to_write = 0;
440        rt_header_t *hdr;
441        rt_ack_t *ack_hdr;
442       
443        if (!ack_buffer) {
444                ack_buffer = malloc(sizeof(rt_header_t) + sizeof(rt_ack_t));
445        }
446       
447        hdr = (rt_header_t *) ack_buffer;
448        ack_hdr = (rt_ack_t *) (ack_buffer + sizeof(rt_header_t));
449       
450        hdr->type = RT_ACK;
451        hdr->length = sizeof(rt_ack_t);
452
453        ack_hdr->sequence = seqno;
454       
455        to_write = hdr->length + sizeof(rt_header_t);
456        buf_ptr = ack_buffer;
457
458        while (to_write > 0) {
459                numbytes = send(RT_INFO->input_fd, buf_ptr, to_write, 0); 
460                if (numbytes == -1) {
461                        if (errno == EINTR || errno == EAGAIN) {
462                                continue;
463                        }
464                        else {
465                                printf("Error sending ack\n");
466                                trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, 
467                                                "Error sending ack");
468                                return -1;
469                        }
470                }
471                to_write = to_write - numbytes;
472                buf_ptr = buf_ptr + to_write;
473               
474        }
475
476        return 1;
477}
478
479       
480static int rt_read_packet_versatile(libtrace_t *libtrace,
481                libtrace_packet_t *packet,int blocking) {
482        rt_header_t rt_hdr;
483        static rt_header_t *pkt_hdr = 0;
484        int pkt_size = 0;
485        uint32_t seqno;
486       
487        if (pkt_hdr == 0)
488                pkt_hdr = malloc(sizeof(rt_header_t));
489       
490        if (packet->buf_control == TRACE_CTRL_EXTERNAL || !packet->buffer) {
491                packet->buf_control = TRACE_CTRL_PACKET;
492                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
493        } 
494
495
496        /* FIXME: Better error handling required */
497        if (rt_read(libtrace, (void **)&pkt_hdr, sizeof(rt_header_t),blocking) !=
498                        sizeof(rt_header_t)) {
499                return -1;
500        }
501
502        /* Need to salvage these in case the next rt_read overwrites the
503         * buffer they came from! */
504        packet->type = pkt_hdr->type;
505        pkt_size = pkt_hdr->length;
506        packet->size = pkt_hdr->length;
507        seqno = pkt_hdr->sequence;
508
509        if (packet->type >= RT_DATA_SIMPLE) {
510                if (rt_read(libtrace, &packet->buffer, pkt_size,1) != pkt_size) {
511                        printf("Error receiving packet\n");
512                        return -1;
513                }
514                packet->header = packet->buffer;
515               
516                if (rt_set_format(libtrace, packet) < 0) {
517                        return -1;
518                }
519                rt_set_payload(packet);
520                if (RT_INFO->reliable > 0) {
521                        if (rt_send_ack(libtrace, seqno) 
522                                        == -1)
523                        {
524                                return -1;
525                        }
526                }
527        } else {
528                switch(packet->type) {
529                        case RT_STATUS:
530                                if (rt_read(libtrace, &packet->buffer, 
531                                                        pkt_size,1) !=
532                                                pkt_size) {
533                                        printf("Error receiving status packet\n");
534                                        return -1;
535                                }
536                                packet->header = 0;
537                                packet->payload = packet->buffer;
538                                break;
539                        case RT_DUCK_2_4:
540                        case RT_DUCK_2_5:
541                                if (rt_read(libtrace, &packet->buffer,
542                                                        pkt_size, 1) !=
543                                                pkt_size) {
544                                        printf("Error receiving DUCK packet\n");
545                                        return -1;
546                                }
547                                if (rt_set_format(libtrace, packet) < 0) {
548                                        return -1;
549                                }
550                                packet->header = 0;
551                                packet->payload = packet->buffer;
552                                break;
553                        case RT_END_DATA:
554                                return 0;
555                        case RT_PAUSE_ACK:
556                                /* FIXME: Do something useful */
557                                break;
558                        case RT_OPTION:
559                                /* FIXME: Do something useful here as well */
560                                break;
561                        case RT_KEYCHANGE:
562                                break;
563                        default:
564                                printf("Bad rt type for client receipt: %d\n",
565                                        packet->type);
566                                return -1;
567                }
568        }
569        /* Return the number of bytes read from the stream */
570        return packet->size; 
571}
572
573static int rt_read_packet(libtrace_t *libtrace,
574                libtrace_packet_t *packet) {
575        return rt_read_packet_versatile(libtrace,packet,1);
576}
577
578
579static int rt_get_capture_length(const libtrace_packet_t *packet) {
580        switch (packet->type) {
581                case RT_STATUS:
582                        return sizeof(rt_status_t);
583                case RT_HELLO:
584                        return sizeof(rt_hello_t);
585                case RT_START:
586                        return 0;
587                case RT_ACK:
588                        return sizeof(rt_ack_t);
589                case RT_END_DATA:
590                        return 0;
591                case RT_CLOSE:
592                        return 0;
593                case RT_DENY_CONN:
594                        return sizeof(rt_deny_conn_t);
595                case RT_PAUSE:
596                        return 0; 
597                case RT_PAUSE_ACK:
598                        return 0;
599                case RT_OPTION:
600                        return 0; /* FIXME */
601                case RT_KEYCHANGE:
602                        return 0;
603        }
604        printf("Unknown type: %d\n", packet->type);
605        return 0;
606}
607
608static int rt_get_wire_length(const libtrace_packet_t *packet) {
609        return 0;
610}
611                       
612static int rt_get_framing_length(const libtrace_packet_t *packet) {
613        return 0;
614}
615
616static int rt_get_fd(const libtrace_t *trace) {
617        return ((struct rt_format_data_t *)trace->format_data)->input_fd;
618}
619
620libtrace_eventobj_t trace_event_rt(libtrace_t *trace, libtrace_packet_t *packet) {
621        libtrace_eventobj_t event = {0,0,0.0,0};
622        libtrace_err_t read_err;
623
624        assert(trace);
625        assert(packet);
626       
627        if (trace->format->get_fd) {
628                event.fd = trace->format->get_fd(trace);
629        } else {
630                event.fd = 0;
631        }
632
633        event.size = rt_read_packet_versatile(trace, packet, 0);
634        if (event.size == -1) {
635                read_err = trace_get_err(trace);
636                if (read_err.err_num == EAGAIN) {
637                        event.type = TRACE_EVENT_IOWAIT;
638                }
639                else {
640                        printf("packet error\n");
641                        event.type = TRACE_EVENT_PACKET;
642                }
643        } else if (event.size == 0) {
644                event.type = TRACE_EVENT_TERMINATE;
645               
646        }       
647        else {
648                event.type = TRACE_EVENT_PACKET;
649        }
650       
651        return event;
652}
653
654static void rt_help() {
655        printf("rt format module\n");
656        printf("Supported input URIs:\n");
657        printf("\trt:hostname:port\n");
658        printf("\trt:hostname (connects on default port)\n");
659        printf("\n");
660        printf("\te.g.: rt:localhost\n");
661        printf("\te.g.: rt:localhost:32500\n");
662        printf("\n");
663
664}
665
666
667static struct libtrace_format_t rt = {
668        "rt",
669        "$Id$",
670        TRACE_FORMAT_RT,
671        rt_init_input,                  /* init_input */
672        NULL,                           /* config_input */
673        rt_start_input,                 /* start_input */
674        NULL,                           /* init_output */
675        NULL,                           /* config_output */
676        NULL,                           /* start_output */
677        NULL,                           /* pause_output */
678        rt_fin_input,                   /* fin_input */
679        NULL,                           /* fin_output */
680        rt_read_packet,                 /* read_packet */
681        NULL,                           /* fin_packet */
682        NULL,                           /* write_packet */
683        NULL,                           /* get_link_type */
684        NULL,                           /* get_direction */
685        NULL,                           /* set_direction */
686        NULL,                           /* get_erf_timestamp */
687        NULL,                           /* get_timeval */
688        NULL,                           /* get_seconds */
689        NULL,                           /* seek_erf */
690        NULL,                           /* seek_timeval */
691        NULL,                           /* seek_seconds */
692        rt_get_capture_length,          /* get_capture_length */
693        rt_get_wire_length,                     /* get_wire_length */
694        rt_get_framing_length,          /* get_framing_length */
695        NULL,                           /* set_capture_length */
696        rt_get_fd,                      /* get_fd */
697        trace_event_rt,             /* trace_event */
698        rt_help,                        /* help */
699        NULL                            /* next pointer */
700};
701
702void rt_constructor() {
703        register_format(&rt);
704}
Note: See TracBrowser for help on using the repository browser.