source: lib/format_rt.c @ 6dbc47a

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivegetfragoffhelplibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 6dbc47a was 6dbc47a, checked in by Shane Alcock <salcock@…>, 15 years ago

Updated all formats to use TRACE_FORMAT_* instead of nasty strings
format_rt is in a state where it provides basic client services
Added trace_get_format() function
libtrace_packet_t has a type field for storing format now

  • Property mode set to 100644
File size: 14.7 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2004 The University of Waikato, Hamilton, New Zealand.
5 * Authors: Daniel Lawson
6 *          Perry Lorier
7 *          Shane Alcock
8 *
9 * All rights reserved.
10 *
11 * This code has been developed by the University of Waikato WAND
12 * research group. For further information please see http://www.wand.net.nz/
13 *
14 * libtrace is free software; you can redistribute it and/or modify
15 * it under the terms of the GNU General Public License as published by
16 * the Free Software Foundation; either version 2 of the License, or
17 * (at your option) any later version.
18 *
19 * libtrace is distributed in the hope that it will be useful,
20 * but WITHOUT ANY WARRANTY; without even the implied warranty of
21 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22 * GNU General Public License for more details.
23 *
24 * You should have received a copy of the GNU General Public License
25 * along with libtrace; if not, write to the Free Software
26 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
27 *
28 * $Id$
29 *
30 */
31
32#define _GNU_SOURCE
33
34#include "config.h"
35#include "common.h"
36#include "libtrace.h"
37#include "libtrace_int.h"
38#include "format_helper.h"
39#include "parse_cmd.h"
40#include "rt_protocol.h"
41
42#ifdef HAVE_INTTYPES_H
43#  include <inttypes.h>
44#else
45#  error "Can't find inttypes.h - this needs to be fixed"
46#endif
47
48#ifdef HAVE_STDDEF_H
49#  include <stddef.h>
50#else
51# error "Can't find stddef.h - do you define ptrdiff_t elsewhere?"
52#endif
53#include <sys/types.h>
54#include <sys/socket.h>
55#include <sys/un.h>
56#include <sys/mman.h>
57#include <sys/stat.h>
58#include <unistd.h>
59#include <assert.h>
60#include <errno.h>
61#include <netdb.h>
62#include <fcntl.h>
63#include <getopt.h>
64#include <stdio.h>
65#include <string.h>
66#include <stdlib.h>
67
68#define RT_INFO libtrace->format_data
69
70int reliability = 0;
71
72char *rt_deny_reason(uint8_t reason) {
73        char *string = 0;
74
75        switch(reason) {
76                case RT_DENY_WRAPPER:
77                        string = "Rejected by TCP Wrappers";
78                        break;
79                case RT_DENY_FULL:
80                        string = "Max connections reached on server";
81                        break;
82                case RT_DENY_AUTH:
83                        string = "Authentication failed";
84                        break;
85                default:
86                        string = "Unknown reason";
87        }
88
89        return string;
90}
91
92
93struct libtrace_format_data_t {
94        char *hostname;
95        int port;
96        int input_fd;
97        int reliable;
98
99        struct libtrace_t *dummy_erf;
100        struct libtrace_t *dummy_pcap;
101        struct libtrace_t *dummy_wag;
102};
103
104static struct libtrace_format_t rt;
105
106static int rt_connect(struct libtrace_t *libtrace) {
107        struct hostent *he;
108        struct sockaddr_in remote;
109        rt_header_t connect_msg;
110        rt_deny_conn_t deny_hdr;       
111        rt_hello_t hello_opts;
112        uint8_t reason;
113       
114        if ((he=gethostbyname(RT_INFO->hostname)) == NULL) {
115                perror("gethostbyname");
116                return 0;
117        }
118        if ((RT_INFO->input_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
119                perror("socket");
120                return 0;
121        }
122
123        remote.sin_family = AF_INET;
124        remote.sin_port = htons(RT_INFO->port);
125        remote.sin_addr = *((struct in_addr *)he->h_addr);
126        bzero(&(remote.sin_zero), 8);
127
128        if (connect(RT_INFO->input_fd, (struct sockaddr *)&remote,
129                                sizeof(struct sockaddr)) == -1) {
130                perror("connect (inet)");
131                return 0;
132        }
133       
134        /* We are connected, now receive message from server */
135       
136        if (recv(RT_INFO->input_fd, &connect_msg, sizeof(rt_header_t), 0) != sizeof(rt_header_t) ) {
137                printf("An error occured while connecting to %s\n", RT_INFO->hostname);
138                return 0;
139        }
140
141        switch (connect_msg.type) {
142                case RT_DENY_CONN:
143                       
144                        if (recv(RT_INFO->input_fd, &deny_hdr, 
145                                                sizeof(rt_deny_conn_t),
146                                                0) != sizeof(rt_deny_conn_t)) {
147                                reason = 0;
148                        }       
149                        reason = deny_hdr.reason;
150                        printf("Connection attempt is denied by the server: %s\n",
151                                        rt_deny_reason(reason));
152                        return 0;
153                case RT_HELLO:
154                        // do something with options
155                        printf("Hello\n");     
156                        if (recv(RT_INFO->input_fd, &hello_opts, 
157                                                sizeof(rt_hello_t), 0)
158                                        != sizeof(rt_hello_t)) {
159                                printf("Failed to read hello options\n");
160                                return 0;
161                        }
162                        reliability = hello_opts.reliable;
163                       
164                        return 1;
165                case RT_DATA:
166                        printf("Server needs to send RT_HELLO before sending data to clients\n");
167                        return 0;
168                default:
169                        printf("Unexpected message type: %d\n", connect_msg.type);
170                        return 0;
171        }
172       
173        return 0;
174}
175
176
177static int rt_init_input(struct libtrace_t *libtrace) {
178        char *scan;
179        char *uridata = libtrace->uridata;
180        libtrace->format_data = (struct libtrace_format_data_t *)
181                malloc(sizeof(struct libtrace_format_data_t));
182
183        RT_INFO->dummy_erf = NULL;
184        RT_INFO->dummy_pcap = NULL;
185        RT_INFO->dummy_wag = NULL;
186       
187        if (strlen(uridata) == 0) {
188                RT_INFO->hostname =
189                        strdup("localhost");
190                RT_INFO->port =
191                        COLLECTOR_PORT;
192        } else {
193                if ((scan = strchr(uridata,':')) == NULL) {
194                        RT_INFO->hostname =
195                                strdup(uridata);
196                        RT_INFO->port =
197                                COLLECTOR_PORT;
198                } else {
199                        RT_INFO->hostname =
200                                (char *)strndup(uridata,
201                                                (scan - uridata));
202                        RT_INFO->port =
203                                atoi(++scan);
204                }
205        }
206
207        return rt_connect(libtrace);
208}
209       
210static int rt_start_input(struct libtrace_t *libtrace) {
211        rt_header_t start_msg;
212
213        start_msg.type = RT_START;
214        start_msg.length = sizeof(rt_start_t);
215
216        printf("Sending start - len: %d\n", start_msg.length);
217       
218        // Need to send start message to server
219        if (send(RT_INFO->input_fd, &start_msg, sizeof(rt_header_t), 0) != sizeof(rt_header_t)) {
220                printf("Failed to send start message to server\n");
221                return -1;
222        }
223
224        return 0;
225}
226
227static int rt_fin_input(struct libtrace_t *libtrace) {
228        rt_header_t close_msg;
229
230        close_msg.type = RT_CLOSE;
231        close_msg.length = sizeof(rt_close_t);
232       
233        // Send a close message to the server
234        if (send(RT_INFO->input_fd, &close_msg, sizeof(rt_header_t), 0) != sizeof(rt_header_t)) {
235                printf("Failed to send close message to server\n");
236       
237        }
238        if (RT_INFO->dummy_erf) 
239                trace_destroy_dead(RT_INFO->dummy_erf);
240               
241        if (RT_INFO->dummy_pcap)
242                trace_destroy_dead(RT_INFO->dummy_pcap);
243
244        if (RT_INFO->dummy_wag)
245                trace_destroy_dead(RT_INFO->dummy_wag);
246        close(RT_INFO->input_fd);
247        return 0;
248}
249
250static int rt_read(struct libtrace_t *libtrace, void *buffer, size_t len) {
251        int numbytes;
252
253        if (buffer == 0)
254                buffer = malloc(len);
255        while(1) {
256#ifndef MSG_NOSIGNAL
257#  define MSG_NOSIGNAL 0
258#endif
259                if ((numbytes = recv(RT_INFO->input_fd,
260                                                buffer,
261                                                len,
262                                                MSG_NOSIGNAL)) == -1) {
263                        if (errno == EINTR) {
264                                //ignore EINTR in case
265                                // a caller is using signals
266                                continue;
267                        }
268                        perror("recv");
269                        return -1;
270                }
271                break;
272
273        }
274        return numbytes;
275}
276
277
278static int rt_set_format(struct libtrace_t *libtrace, 
279                struct libtrace_packet_t *packet, uint16_t format) {
280        switch (format) {
281                case TRACE_FORMAT_ERF:
282                        if (!RT_INFO->dummy_erf) {
283                                RT_INFO->dummy_erf = trace_create_dead("erf:-");
284                        }
285                        packet->trace = RT_INFO->dummy_erf;
286                        break;
287                case TRACE_FORMAT_PCAP:
288                        if (!RT_INFO->dummy_pcap) {
289                                RT_INFO->dummy_pcap = trace_create_dead("pcap:-");
290                        }
291                        packet->trace = RT_INFO->dummy_pcap;
292                        break;
293                case TRACE_FORMAT_WAG:
294                        if (!RT_INFO->dummy_wag) {
295                                RT_INFO->dummy_wag = trace_create_dead("wtf:-");
296                        }
297                        packet->trace = RT_INFO->dummy_wag;
298                        break;
299                case TRACE_FORMAT_RT:
300                        printf("Format in a data packet should NOT be set to RT\n");
301                        return -1;
302                case TRACE_FORMAT_LEGACY:
303                        printf("Sending legacy over RT is currently not supported\n");
304                        return -1;
305                default:
306                        printf("Unrecognised format: %d\n", format);
307                        return -1;
308        }
309        return 1;
310}               
311
312static void rt_set_payload(struct libtrace_packet_t *packet, uint16_t format) {
313        dag_record_t *erfptr;
314       
315        switch (format) {
316                case TRACE_FORMAT_ERF:
317                        erfptr = (dag_record_t *)packet->header;
318                       
319                        if (erfptr->flags.rxerror == 1) {
320                                packet->payload = NULL;
321                        } else {
322                                packet->payload = (char *)packet->buffer
323                                        + trace_get_framing_length(packet);
324                        }
325                        break;
326                default:
327                        packet->payload = (char *)packet->buffer +
328                                trace_get_framing_length(packet);
329                        break;
330        }
331}
332
333static int rt_send_ack(struct libtrace_t *libtrace, 
334                struct libtrace_packet_t *packet, uint32_t seqno)  {
335       
336        static char *ack_buffer = 0;
337        char *buf_ptr;
338        int numbytes = 0;
339        int to_write = 0;
340        rt_header_t *hdr;
341        rt_ack_t *ack_hdr;
342       
343        if (!ack_buffer) {
344                ack_buffer = malloc(sizeof(rt_header_t) + sizeof(rt_ack_t));
345        }
346       
347        hdr = (rt_header_t *) ack_buffer;
348        ack_hdr = (rt_ack_t *) (ack_buffer + sizeof(rt_header_t));
349       
350        hdr->type = RT_ACK;
351        hdr->length = sizeof(rt_ack_t);
352
353        ack_hdr->sequence = seqno;
354       
355        to_write = hdr->length + sizeof(rt_header_t);
356        buf_ptr = ack_buffer;
357
358       
359        while (to_write > 0) {
360                numbytes = send(RT_INFO->input_fd, buf_ptr, to_write, 0); 
361                if (numbytes == -1) {
362                        if (errno == EINTR || errno == EAGAIN) {
363                                continue;
364                        }
365                        else {
366                                printf("Error sending ack\n");
367                                return -1;
368                        }
369                }
370                to_write = to_write - numbytes;
371                buf_ptr = buf_ptr + to_write;
372               
373        }
374
375        return 1;
376}
377       
378static int rt_read_packet(struct libtrace_t *libtrace, 
379                struct libtrace_packet_t *packet) {
380       
381        int numbytes = 0;
382        char buf[RP_BUFSIZE];
383        int read_required = 0;
384        rt_header_t pkt_hdr;
385        char msg_buf[RP_BUFSIZE];
386
387        rt_data_t data_hdr;
388        void *buffer = 0;
389
390        packet->trace = libtrace;
391
392        if (packet->buf_control == EXTERNAL) {
393                packet->buf_control = PACKET;
394                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
395        } else {
396                if (!packet->buffer)
397                        packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
398        }
399
400        buffer = packet->buffer;
401        packet->header = packet->buffer;
402
403
404        do {
405                if (tracefifo_out_available(libtrace->fifo) == 0 || read_required) {
406                        if ((numbytes = rt_read(
407                                        libtrace,buf,RP_BUFSIZE))<=0) {
408                                return numbytes;
409                        }
410                        tracefifo_write(libtrace->fifo,buf,numbytes);
411                        read_required = 0;
412                }
413                // Read rt header
414                if (tracefifo_out_read(libtrace->fifo,
415                                &pkt_hdr, sizeof(rt_header_t)) == 0) {
416                        read_required = 1;
417                        continue;
418                }
419                tracefifo_out_update(libtrace->fifo, sizeof(rt_header_t));
420               
421                packet->size = pkt_hdr.length + sizeof(rt_header_t);
422                packet->type = pkt_hdr.type;
423               
424                switch (packet->type) {
425                        case RT_DATA:
426                                if (tracefifo_out_read(libtrace->fifo, 
427                                                        &data_hdr, 
428                                                        sizeof(rt_data_t)) == 0) 
429                                {
430                                        tracefifo_out_reset(libtrace->fifo);
431                                        read_required = 1;
432                                        break;
433                                }
434                                if (tracefifo_out_read(libtrace->fifo, buffer, 
435                                                        packet->size - sizeof(rt_data_t) - sizeof(rt_header_t))== 0)
436                                {
437                                        tracefifo_out_reset(libtrace->fifo);
438                                        read_required = 1;
439                                        break;
440                                }
441                                // set packet->trace
442                                if (rt_set_format(libtrace, packet, 
443                                                        data_hdr.format) < 0) {
444                                        return -1;
445                                }
446                                // set packet->payload
447                                rt_set_payload(packet, data_hdr.format);
448                               
449                                if (reliability > 0) {
450                                        // send ack
451                                        if (rt_send_ack(libtrace, packet,
452                                                                data_hdr.sequence) 
453                                                        == -1)
454                                        {
455                                                return -1;
456                                        }
457                                }       
458                                break;
459                        case RT_STATUS:
460                                if (tracefifo_out_read(libtrace->fifo, buffer,
461                                                sizeof(rt_status_t)) == 0)
462                                {
463                                        tracefifo_out_reset(libtrace->fifo);
464                                        read_required = 1;
465                                        break;
466                                }
467                                break;
468                        case RT_DUCK:
469                                if (tracefifo_out_read(libtrace->fifo, buffer,
470                                                sizeof(rt_duck_t)) == 0)
471                                {
472                                        tracefifo_out_reset(libtrace->fifo);
473                                        read_required = 1;
474                                        break;
475                                }
476                                break;
477
478                        case RT_END_DATA:
479                                // need to do something sensible here
480                                return 0;       
481
482                        case RT_PAUSE_ACK:
483                                // Check if we asked for a pause
484                               
485                               
486                                break;
487
488                        case RT_OPTION:
489                                // Server is requesting some option?
490
491                                break;
492
493                        default:
494                                printf("Bad rt client type: %d\n", packet->type);
495                                return -1;
496                               
497                }
498                if (read_required)
499                        continue;
500                               
501               
502                // got in our whole packet, so...
503                tracefifo_out_update(libtrace->fifo,packet->size - sizeof(rt_header_t));
504
505                tracefifo_ack_update(libtrace->fifo,packet->size);
506                return 1;
507        } while(1);
508       
509}
510
511static int rt_get_fd(const struct libtrace_packet_t *packet) {
512        return packet->trace->format_data->input_fd;
513}
514
515
516
517static void rt_help() {
518        printf("rt format module\n");
519        printf("Supported input URIs:\n");
520        printf("\trt:hostname:port\n");
521        printf("\trt:hostname (connects on default port)\n");
522        printf("\n");
523        printf("\te.g.: rt:localhost\n");
524        printf("\te.g.: rt:localhost:32500\n");
525        printf("\n");
526
527}
528
529
530static struct libtrace_format_t rt = {
531        "rt",
532        "$Id$",
533        TRACE_FORMAT_RT,
534        rt_init_input,                  /* init_input */
535        NULL,                           /* config_input */
536        rt_start_input,                 /* start_input */
537        NULL,                           /* init_output */
538        NULL,                           /* config_output */
539        NULL,                           /* start_output */
540        NULL,                           /* pause_output */
541        rt_fin_input,                   /* fin_input */
542        NULL,                           /* fin_output */
543        rt_read_packet,                 /* read_packet */
544        NULL,                           /* write_packet */
545        NULL,                           /* get_link_type */
546        NULL,                           /* get_direction */
547        NULL,                           /* set_direction */
548        NULL,                           /* get_erf_timestamp */
549        NULL,                           /* get_timeval */
550        NULL,                           /* get_seconds */
551        NULL,                           /* seek_erf */
552        NULL,                           /* seek_timeval */
553        NULL,                           /* seek_seconds */
554        NULL,                           /* get_capture_length */
555        NULL,                           /* get_wire_length */
556        NULL,                           /* get_framing_length */
557        NULL,                           /* set_capture_length */
558        rt_get_fd,                /* get_fd */
559        trace_event_device,             /* trace_event */
560        rt_help                   /* help */
561};
562
563void __attribute__((constructor)) rt_constructor() {
564        register_format(&rt);
565}
Note: See TracBrowser for help on using the repository browser.