source: lib/format_rt.c @ 68155f1

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivegetfragoffhelplibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 68155f1 was 68155f1, checked in by Perry Lorier <perry@…>, 16 years ago

Libtrace does not need getopt!

  • Property mode set to 100644
File size: 17.1 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2004 The University of Waikato, Hamilton, New Zealand.
5 * Authors: Daniel Lawson
6 *          Perry Lorier
7 *          Shane Alcock
8 *
9 * All rights reserved.
10 *
11 * This code has been developed by the University of Waikato WAND
12 * research group. For further information please see http://www.wand.net.nz/
13 *
14 * libtrace is free software; you can redistribute it and/or modify
15 * it under the terms of the GNU General Public License as published by
16 * the Free Software Foundation; either version 2 of the License, or
17 * (at your option) any later version.
18 *
19 * libtrace is distributed in the hope that it will be useful,
20 * but WITHOUT ANY WARRANTY; without even the implied warranty of
21 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22 * GNU General Public License for more details.
23 *
24 * You should have received a copy of the GNU General Public License
25 * along with libtrace; if not, write to the Free Software
26 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
27 *
28 * $Id$
29 *
30 */
31
32#define _GNU_SOURCE
33
34#include "config.h"
35#include "common.h"
36#include "libtrace.h"
37#include "libtrace_int.h"
38#include "format_helper.h"
39#include "parse_cmd.h"
40#include "rt_protocol.h"
41
42#ifdef HAVE_INTTYPES_H
43#  include <inttypes.h>
44#else
45#  error "Can't find inttypes.h - this needs to be fixed"
46#endif
47
48#ifdef HAVE_STDDEF_H
49#  include <stddef.h>
50#else
51# error "Can't find stddef.h - do you define ptrdiff_t elsewhere?"
52#endif
53#include <sys/types.h>
54#include <sys/socket.h>
55#include <sys/un.h>
56#include <sys/mman.h>
57#include <sys/stat.h>
58#include <unistd.h>
59#include <assert.h>
60#include <errno.h>
61#include <netdb.h>
62#include <fcntl.h>
63#include <stdio.h>
64#include <string.h>
65#include <stdlib.h>
66
67#define RT_INFO ((struct rt_format_data_t*)libtrace->format_data)
68
69int reliability = 0;
70
71char *rt_deny_reason(uint8_t reason) {
72        char *string = 0;
73
74        switch(reason) {
75                case RT_DENY_WRAPPER:
76                        string = "Rejected by TCP Wrappers";
77                        break;
78                case RT_DENY_FULL:
79                        string = "Max connections reached on server";
80                        break;
81                case RT_DENY_AUTH:
82                        string = "Authentication failed";
83                        break;
84                default:
85                        string = "Unknown reason";
86        }
87
88        return string;
89}
90
91
92struct rt_format_data_t {
93        char *hostname;
94        int port;
95        int input_fd;
96        int reliable;
97        char *pkt_buffer;
98        char *buf_current;
99        int buf_left;
100
101       
102        struct libtrace_t *dummy_erf;
103        struct libtrace_t *dummy_pcap;
104        struct libtrace_t *dummy_wag;
105};
106
107static struct libtrace_format_t rt;
108
109static int rt_connect(struct libtrace_t *libtrace) {
110        struct hostent *he;
111        struct sockaddr_in remote;
112        rt_header_t connect_msg;
113        rt_deny_conn_t deny_hdr;       
114        rt_hello_t hello_opts;
115        uint8_t reason;
116        int oldflags;
117
118       
119        if ((he=gethostbyname(RT_INFO->hostname)) == NULL) {
120                perror("gethostbyname");
121                return -1;
122        }
123        if ((RT_INFO->input_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
124                perror("socket");
125                return -1;
126        }
127
128        remote.sin_family = AF_INET;
129        remote.sin_port = htons(RT_INFO->port);
130        remote.sin_addr = *((struct in_addr *)he->h_addr);
131        bzero(&(remote.sin_zero), 8);
132
133        if (connect(RT_INFO->input_fd, (struct sockaddr *)&remote,
134                                sizeof(struct sockaddr)) == -1) {
135                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
136                                "Could not connect to host %s on port %d",
137                                RT_INFO->hostname, RT_INFO->port);
138                return -1;
139        }
140
141       
142#if 0
143        oldflags = fcntl(RT_INFO->input_fd, F_GETFL, 0);
144        if (oldflags == -1) {
145                trace_set_err(libtrace, errno,
146                                "Could not get fd flags from fd %d\n",
147                                RT_INFO->input_fd);
148                return -1;
149        }
150        oldflags |= O_NONBLOCK;
151        if (fcntl(RT_INFO->input_fd, F_SETFL, oldflags) == -1) {
152                trace_set_err(libtrace, errno,
153                                "Could not set fd flags for fd %d\n",
154                                RT_INFO->input_fd);
155                return -1;
156        }
157#endif
158       
159       
160        /* We are connected, now receive message from server */
161       
162        if (recv(RT_INFO->input_fd, &connect_msg, sizeof(rt_header_t), 0) != sizeof(rt_header_t) ) {
163                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
164                                "Could not receive connection message from %s",
165                                RT_INFO->hostname);
166                return -1;
167        }
168       
169        switch (connect_msg.type) {
170                case RT_DENY_CONN:
171                       
172                        if (recv(RT_INFO->input_fd, &deny_hdr, 
173                                                sizeof(rt_deny_conn_t),
174                                                0) != sizeof(rt_deny_conn_t)) {
175                                reason = 0;
176                        }       
177                        reason = deny_hdr.reason;
178                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
179                                "Connection attempt is denied: %s",
180                                rt_deny_reason(reason));       
181                        return -1;
182                case RT_HELLO:
183                        /* do something with options */
184                        if (recv(RT_INFO->input_fd, &hello_opts, 
185                                                sizeof(rt_hello_t), 0)
186                                        != sizeof(rt_hello_t)) {
187                                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
188                                        "Failed to receive RT_HELLO options");
189                                return -1;
190                        }
191                        reliability = hello_opts.reliable;
192                       
193                        return 0;
194                default:
195                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
196                                        "Unknown message type received: %d",
197                                        connect_msg.type);
198                        return -1;
199        }
200        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
201                        "Somehow you managed to reach this unreachable code");
202        return -1;
203}
204
205
206static int rt_init_input(struct libtrace_t *libtrace) {
207        char *scan;
208        char *uridata = libtrace->uridata;
209        libtrace->format_data = malloc(sizeof(struct rt_format_data_t));
210
211        RT_INFO->dummy_erf = NULL;
212        RT_INFO->dummy_pcap = NULL;
213        RT_INFO->dummy_wag = NULL;
214        RT_INFO->pkt_buffer = NULL;
215        RT_INFO->buf_current = NULL;
216        RT_INFO->buf_left = 0;
217       
218        if (strlen(uridata) == 0) {
219                RT_INFO->hostname =
220                        strdup("localhost");
221                RT_INFO->port =
222                        COLLECTOR_PORT;
223        } else {
224                if ((scan = strchr(uridata,':')) == NULL) {
225                        RT_INFO->hostname =
226                                strdup(uridata);
227                        RT_INFO->port =
228                                COLLECTOR_PORT;
229                } else {
230                        RT_INFO->hostname =
231                                (char *)strndup(uridata,
232                                                (scan - uridata));
233                        RT_INFO->port =
234                                atoi(++scan);
235                }
236        }
237
238        return rt_connect(libtrace);
239}
240       
241static int rt_start_input(struct libtrace_t *libtrace) {
242        rt_header_t start_msg;
243
244        start_msg.type = RT_START;
245        start_msg.length = sizeof(rt_start_t);
246
247       
248        /* Need to send start message to server */
249        if (send(RT_INFO->input_fd, &start_msg, sizeof(rt_header_t) +
250                                start_msg.length, 0) != sizeof(rt_header_t)) {
251                printf("Failed to send start message to server\n");
252                return -1;
253        }
254
255        return 0;
256}
257
258static int rt_fin_input(struct libtrace_t *libtrace) {
259        rt_header_t close_msg;
260
261        close_msg.type = RT_CLOSE;
262        close_msg.length = sizeof(rt_close_t);
263       
264        /* Send a close message to the server */
265        if (send(RT_INFO->input_fd, &close_msg, sizeof(rt_header_t) + 
266                                close_msg.length, 0) != sizeof(rt_header_t)
267                                + close_msg.length) {
268                printf("Failed to send close message to server\n");
269       
270        }
271        if (RT_INFO->dummy_erf) 
272                trace_destroy_dead(RT_INFO->dummy_erf);
273               
274        if (RT_INFO->dummy_pcap)
275                trace_destroy_dead(RT_INFO->dummy_pcap);
276
277        if (RT_INFO->dummy_wag)
278                trace_destroy_dead(RT_INFO->dummy_wag);
279        close(RT_INFO->input_fd);
280        free(libtrace->format_data);
281        return 0;
282}
283
284#define RT_BUF_SIZE 4000
285
286static int rt_read(struct libtrace_t *libtrace, void **buffer, size_t len, int block) {
287        int numbytes;
288        int i;
289        char *buf_ptr;
290
291        assert(len <= RT_BUF_SIZE);
292       
293        if (!RT_INFO->pkt_buffer) {
294                RT_INFO->pkt_buffer = malloc(RT_BUF_SIZE);
295                RT_INFO->buf_current = RT_INFO->pkt_buffer;
296                RT_INFO->buf_left = 0;
297        }
298
299        if (block)
300                block=0;
301        else
302                block=MSG_DONTWAIT;
303
304       
305        if (len > RT_INFO->buf_left) {
306                memcpy(RT_INFO->pkt_buffer, RT_INFO->buf_current, 
307                                RT_INFO->buf_left);
308                RT_INFO->buf_current = RT_INFO->pkt_buffer;
309
310#ifndef MSG_NOSIGNAL
311#  define MSG_NOSIGNAL 0
312#endif
313                while (len > RT_INFO->buf_left) {
314                        if ((numbytes = recv(RT_INFO->input_fd,
315                                                RT_INFO->pkt_buffer + 
316                                                RT_INFO->buf_left,
317                                                RT_BUF_SIZE-RT_INFO->buf_left,
318                                                MSG_NOSIGNAL|block)) <= 0) {
319                                if (numbytes == 0) {
320                                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, 
321                                                        "No data received");
322                                        return -1;
323                                }
324                               
325                                if (errno == EINTR) {
326                                        /* ignore EINTR in case
327                                         * a caller is using signals
328                                         */
329                                        continue;
330                                }
331                                if (errno == EAGAIN) {
332                                        trace_set_err(libtrace,
333                                                        EAGAIN,
334                                                        "EAGAIN");
335                                        return -1;
336                                }
337                               
338                                perror("recv");
339                                trace_set_err(libtrace, TRACE_ERR_RECV_FAILED,
340                                                "Failed to read data into rt recv buffer");
341                                return -1;
342                        }
343                        /*
344                        buf_ptr = RT_INFO->pkt_buffer;
345                        for (i = 0; i < RT_BUF_SIZE ; i++) {
346                                       
347                                printf("%02x", (unsigned char)*buf_ptr);
348                                buf_ptr ++;
349                        }
350                        printf("\n");
351                        */
352                        RT_INFO->buf_left+=numbytes;
353                }
354
355        }
356        *buffer = RT_INFO->buf_current;
357        RT_INFO->buf_current += len;
358        RT_INFO->buf_left -= len;
359        assert(RT_INFO->buf_left >= 0);
360        return len;
361}
362
363
364static int rt_set_format(libtrace_t *libtrace, libtrace_packet_t *packet) 
365{
366       
367        if (packet->type >= RT_DATA_PCAP) {
368                if (!RT_INFO->dummy_pcap) {
369                        RT_INFO->dummy_pcap = trace_create_dead("pcap:-");
370                }
371                packet->trace = RT_INFO->dummy_pcap;
372                return 0;       
373        }
374
375        switch (packet->type) {
376                case RT_DATA_ERF:
377                        if (!RT_INFO->dummy_erf) {
378                                RT_INFO->dummy_erf = trace_create_dead("erf:-");
379                        }
380                        packet->trace = RT_INFO->dummy_erf;
381                        break;
382                case RT_DATA_WAG:
383                        if (!RT_INFO->dummy_wag) {
384                                RT_INFO->dummy_wag = trace_create_dead("wtf:-");
385                        }
386                        packet->trace = RT_INFO->dummy_wag;
387                        break;
388                case RT_DATA_LEGACY_ETH:
389                case RT_DATA_LEGACY_ATM:
390                case RT_DATA_LEGACY_POS:
391                        printf("Sending legacy over RT is currently not supported\n");
392                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, "Legacy packet cannot be sent over rt");
393                        return -1;
394                default:
395                        printf("Unrecognised format: %d\n", packet->type);
396                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, "Unrecognised packet format");
397                        return -1;
398        }
399        return 0; /* success */
400}               
401
402static void rt_set_payload(struct libtrace_packet_t *packet) {
403        dag_record_t *erfptr;
404       
405        switch (packet->type) {
406                case RT_DATA_ERF:
407                        erfptr = (dag_record_t *)packet->header;
408                       
409                        if (erfptr->flags.rxerror == 1) {
410                                packet->payload = NULL;
411                                break;
412                        }
413                        /* else drop into the default case */
414                default:
415                        packet->payload = (char *)packet->buffer +
416                                trace_get_framing_length(packet);
417                        break;
418        }
419}
420
421static int rt_send_ack(struct libtrace_t *libtrace, 
422                uint32_t seqno)  {
423       
424        static char *ack_buffer = 0;
425        char *buf_ptr;
426        int numbytes = 0;
427        int to_write = 0;
428        rt_header_t *hdr;
429        rt_ack_t *ack_hdr;
430       
431        if (!ack_buffer) {
432                ack_buffer = malloc(sizeof(rt_header_t) + sizeof(rt_ack_t));
433        }
434       
435        hdr = (rt_header_t *) ack_buffer;
436        ack_hdr = (rt_ack_t *) (ack_buffer + sizeof(rt_header_t));
437       
438        hdr->type = RT_ACK;
439        hdr->length = sizeof(rt_ack_t);
440
441        ack_hdr->sequence = seqno;
442       
443        to_write = hdr->length + sizeof(rt_header_t);
444        buf_ptr = ack_buffer;
445
446       
447        while (to_write > 0) {
448                numbytes = send(RT_INFO->input_fd, buf_ptr, to_write, 0); 
449                if (numbytes == -1) {
450                        if (errno == EINTR || errno == EAGAIN) {
451                                continue;
452                        }
453                        else {
454                                printf("Error sending ack\n");
455                                trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, 
456                                                "Error sending ack");
457                                return -1;
458                        }
459                }
460                to_write = to_write - numbytes;
461                buf_ptr = buf_ptr + to_write;
462               
463        }
464
465        return 1;
466}
467
468       
469static int rt_read_packet_versatile(libtrace_t *libtrace,
470                libtrace_packet_t *packet,int blocking) {
471        rt_header_t rt_hdr;
472        rt_header_t *pkt_hdr = &rt_hdr;
473        int pkt_size = 0;
474       
475       
476        if (packet->buf_control == TRACE_CTRL_EXTERNAL || !packet->buffer) {
477                packet->buf_control = TRACE_CTRL_PACKET;
478                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
479        } 
480
481
482        /* FIXME: Better error handling required */
483        if (rt_read(libtrace, (void **)&pkt_hdr, sizeof(rt_header_t),blocking) !=
484                        sizeof(rt_header_t)) {
485                return -1;
486        }
487
488        packet->type = pkt_hdr->type;
489        pkt_size = pkt_hdr->length;
490        packet->size = pkt_hdr->length;
491
492        if (packet->type >= RT_DATA_SIMPLE) {
493                if (rt_read(libtrace, &packet->buffer, pkt_size,1) != pkt_size) {
494                        printf("Error receiving packet\n");
495                        return -1;
496                }
497                packet->header = packet->buffer;
498               
499                if (rt_set_format(libtrace, packet) < 0) {
500                        return -1;
501                }
502                rt_set_payload(packet);
503
504                if (reliability > 0) {
505                       
506                        if (rt_send_ack(libtrace, pkt_hdr->sequence) 
507                                        == -1)
508                        {
509                                return -1;
510                        }
511                }
512        } else {
513                switch(packet->type) {
514                        case RT_STATUS:
515                        case RT_DUCK:
516                                if (rt_read(libtrace, &packet->buffer, 
517                                                        pkt_size,1) !=
518                                                pkt_size) {
519                                        printf("Error receiving status packet\n");
520                                        return -1;
521                                }
522                                packet->header = 0;
523                                packet->payload = packet->buffer;
524                                break;
525                        case RT_END_DATA:
526                                return 0;
527                        case RT_PAUSE_ACK:
528                                /* FIXME: Do something useful */
529                                break;
530                        case RT_OPTION:
531                                /* FIXME: Do something useful here as well */
532                                break;
533                        case RT_KEYCHANGE:
534                                break;
535                        default:
536                                printf("Bad rt type for client receipt: %d\n",
537                                        pkt_hdr->type);
538                }
539        }
540        /* Return the number of bytes read from the stream */
541        return packet->size; 
542}
543
544static int rt_read_packet(libtrace_t *libtrace,
545                libtrace_packet_t *packet) {
546        return rt_read_packet_versatile(libtrace,packet,1);
547}
548
549
550static int rt_get_capture_length(const struct libtrace_packet_t *packet) {
551        switch (packet->type) {
552                case RT_DUCK:
553                        return sizeof(rt_duck_t);
554                case RT_STATUS:
555                        return sizeof(rt_status_t);
556                case RT_HELLO:
557                        return sizeof(rt_hello_t);
558                case RT_START:
559                        return sizeof(rt_start_t);
560                case RT_ACK:
561                        return sizeof(rt_ack_t);
562                case RT_END_DATA:
563                        return sizeof(rt_end_data_t);
564                case RT_CLOSE:
565                        return sizeof(rt_close_t);
566                case RT_DENY_CONN:
567                        return sizeof(rt_deny_conn_t);
568                case RT_PAUSE:
569                        return sizeof(rt_pause_t);
570                case RT_PAUSE_ACK:
571                        return sizeof(rt_pause_ack_t);
572                case RT_OPTION:
573                        return sizeof(rt_option_t);
574                case RT_KEYCHANGE:
575                        return sizeof(rt_keychange_t);
576        }
577        printf("Unknown type: %d\n", packet->type);
578        return 0;
579}
580
581static int rt_get_wire_length(const libtrace_packet_t *packet) {
582        return 0;
583}
584                       
585static int rt_get_framing_length(const libtrace_packet_t *packet) {
586        return 0;
587}
588
589static int rt_get_fd(const libtrace_t *trace) {
590        return ((struct rt_format_data_t *)trace->format_data)->input_fd;
591}
592
593struct libtrace_eventobj_t trace_event_rt(struct libtrace_t *trace, struct libtrace_packet_t *packet) {
594        struct libtrace_eventobj_t event = {0,0,0.0,0};
595        libtrace_err_t read_err;
596        int data;
597
598        assert(trace);
599        assert(packet);
600       
601        if (trace->format->get_fd) {
602                event.fd = trace->format->get_fd(trace);
603        } else {
604                event.fd = 0;
605        }
606
607        event.size = rt_read_packet_versatile(trace, packet, 0);
608        if (event.size == -1) {
609                read_err = trace_get_err(trace);
610                if (read_err.err_num == EAGAIN) {
611                        event.type = TRACE_EVENT_IOWAIT;
612                }
613                else {
614                        printf("packet error\n");
615                        event.type = TRACE_EVENT_PACKET;
616                }
617        } else if (event.size == 0) {
618                event.type = TRACE_EVENT_TERMINATE;
619               
620        }       
621        else {
622                event.type = TRACE_EVENT_PACKET;
623        }
624       
625        return event;
626}
627
628static void rt_help() {
629        printf("rt format module\n");
630        printf("Supported input URIs:\n");
631        printf("\trt:hostname:port\n");
632        printf("\trt:hostname (connects on default port)\n");
633        printf("\n");
634        printf("\te.g.: rt:localhost\n");
635        printf("\te.g.: rt:localhost:32500\n");
636        printf("\n");
637
638}
639
640
641static struct libtrace_format_t rt = {
642        "rt",
643        "$Id$",
644        TRACE_FORMAT_RT,
645        rt_init_input,                  /* init_input */
646        NULL,                           /* config_input */
647        rt_start_input,                 /* start_input */
648        NULL,                           /* init_output */
649        NULL,                           /* config_output */
650        NULL,                           /* start_output */
651        NULL,                           /* pause_output */
652        rt_fin_input,                   /* fin_input */
653        NULL,                           /* fin_output */
654        rt_read_packet,                 /* read_packet */
655        NULL,                           /* fin_packet */
656        NULL,                           /* write_packet */
657        NULL,                           /* get_link_type */
658        NULL,                           /* get_direction */
659        NULL,                           /* set_direction */
660        NULL,                           /* get_erf_timestamp */
661        NULL,                           /* get_timeval */
662        NULL,                           /* get_seconds */
663        NULL,                           /* seek_erf */
664        NULL,                           /* seek_timeval */
665        NULL,                           /* seek_seconds */
666        rt_get_capture_length,          /* get_capture_length */
667        rt_get_wire_length,                     /* get_wire_length */
668        rt_get_framing_length,          /* get_framing_length */
669        NULL,                           /* set_capture_length */
670        rt_get_fd,                      /* get_fd */
671        trace_event_rt,             /* trace_event */
672        rt_help,                        /* help */
673        NULL                            /* next pointer */
674};
675
676void __attribute__((constructor)) rt_constructor() {
677        register_format(&rt);
678}
Note: See TracBrowser for help on using the repository browser.