source: lib/format_rt.c @ afd0b73

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivegetfragoffhelplibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since afd0b73 was afd0b73, checked in by Shane Alcock <salcock@…>, 15 years ago

Changed rt protocol to not require subheaders

  • Property mode set to 100644
File size: 13.9 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2004 The University of Waikato, Hamilton, New Zealand.
5 * Authors: Daniel Lawson
6 *          Perry Lorier
7 *          Shane Alcock
8 *
9 * All rights reserved.
10 *
11 * This code has been developed by the University of Waikato WAND
12 * research group. For further information please see http://www.wand.net.nz/
13 *
14 * libtrace is free software; you can redistribute it and/or modify
15 * it under the terms of the GNU General Public License as published by
16 * the Free Software Foundation; either version 2 of the License, or
17 * (at your option) any later version.
18 *
19 * libtrace is distributed in the hope that it will be useful,
20 * but WITHOUT ANY WARRANTY; without even the implied warranty of
21 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22 * GNU General Public License for more details.
23 *
24 * You should have received a copy of the GNU General Public License
25 * along with libtrace; if not, write to the Free Software
26 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
27 *
28 * $Id$
29 *
30 */
31
32#define _GNU_SOURCE
33
34#include "config.h"
35#include "common.h"
36#include "libtrace.h"
37#include "libtrace_int.h"
38#include "format_helper.h"
39#include "parse_cmd.h"
40#include "rt_protocol.h"
41
42#ifdef HAVE_INTTYPES_H
43#  include <inttypes.h>
44#else
45#  error "Can't find inttypes.h - this needs to be fixed"
46#endif
47
48#ifdef HAVE_STDDEF_H
49#  include <stddef.h>
50#else
51# error "Can't find stddef.h - do you define ptrdiff_t elsewhere?"
52#endif
53#include <sys/types.h>
54#include <sys/socket.h>
55#include <sys/un.h>
56#include <sys/mman.h>
57#include <sys/stat.h>
58#include <unistd.h>
59#include <assert.h>
60#include <errno.h>
61#include <netdb.h>
62#include <fcntl.h>
63#include <getopt.h>
64#include <stdio.h>
65#include <string.h>
66#include <stdlib.h>
67
68#define RT_INFO libtrace->format_data
69
70int reliability = 0;
71
72char *rt_deny_reason(uint8_t reason) {
73        char *string = 0;
74
75        switch(reason) {
76                case RT_DENY_WRAPPER:
77                        string = "Rejected by TCP Wrappers";
78                        break;
79                case RT_DENY_FULL:
80                        string = "Max connections reached on server";
81                        break;
82                case RT_DENY_AUTH:
83                        string = "Authentication failed";
84                        break;
85                default:
86                        string = "Unknown reason";
87        }
88
89        return string;
90}
91
92
93struct libtrace_format_data_t {
94        char *hostname;
95        int port;
96        int input_fd;
97        int reliable;
98
99        struct libtrace_t *dummy_erf;
100        struct libtrace_t *dummy_pcap;
101        struct libtrace_t *dummy_wag;
102};
103
104static struct libtrace_format_t rt;
105
106static int rt_connect(struct libtrace_t *libtrace) {
107        struct hostent *he;
108        struct sockaddr_in remote;
109        rt_header_t connect_msg;
110        rt_deny_conn_t deny_hdr;       
111        rt_hello_t hello_opts;
112        uint8_t reason;
113       
114        if ((he=gethostbyname(RT_INFO->hostname)) == NULL) {
115                perror("gethostbyname");
116                return 0;
117        }
118        if ((RT_INFO->input_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
119                perror("socket");
120                return 0;
121        }
122
123        remote.sin_family = AF_INET;
124        remote.sin_port = htons(RT_INFO->port);
125        remote.sin_addr = *((struct in_addr *)he->h_addr);
126        bzero(&(remote.sin_zero), 8);
127
128        if (connect(RT_INFO->input_fd, (struct sockaddr *)&remote,
129                                sizeof(struct sockaddr)) == -1) {
130                perror("connect (inet)");
131                return 0;
132        }
133       
134        /* We are connected, now receive message from server */
135       
136        if (recv(RT_INFO->input_fd, &connect_msg, sizeof(rt_header_t), 0) != sizeof(rt_header_t) ) {
137                printf("An error occured while connecting to %s\n", RT_INFO->hostname);
138                return 0;
139        }
140
141        switch (connect_msg.type) {
142                case RT_DENY_CONN:
143                       
144                        if (recv(RT_INFO->input_fd, &deny_hdr, 
145                                                sizeof(rt_deny_conn_t),
146                                                0) != sizeof(rt_deny_conn_t)) {
147                                reason = 0;
148                        }       
149                        reason = deny_hdr.reason;
150                        printf("Connection attempt is denied by the server: %s\n",
151                                        rt_deny_reason(reason));
152                        return 0;
153                case RT_HELLO:
154                        /* do something with options */
155                        printf("Hello\n");     
156                        if (recv(RT_INFO->input_fd, &hello_opts, 
157                                                sizeof(rt_hello_t), 0)
158                                        != sizeof(rt_hello_t)) {
159                                printf("Failed to read hello options\n");
160                                return 0;
161                        }
162                        reliability = hello_opts.reliable;
163                       
164                        return 1;
165                default:
166                        printf("Unexpected message type: %d\n", connect_msg.type);
167                        return 0;
168        }
169       
170        return 0;
171}
172
173
174static int rt_init_input(struct libtrace_t *libtrace) {
175        char *scan;
176        char *uridata = libtrace->uridata;
177        libtrace->format_data = (struct libtrace_format_data_t *)
178                malloc(sizeof(struct libtrace_format_data_t));
179
180        RT_INFO->dummy_erf = NULL;
181        RT_INFO->dummy_pcap = NULL;
182        RT_INFO->dummy_wag = NULL;
183       
184        if (strlen(uridata) == 0) {
185                RT_INFO->hostname =
186                        strdup("localhost");
187                RT_INFO->port =
188                        COLLECTOR_PORT;
189        } else {
190                if ((scan = strchr(uridata,':')) == NULL) {
191                        RT_INFO->hostname =
192                                strdup(uridata);
193                        RT_INFO->port =
194                                COLLECTOR_PORT;
195                } else {
196                        RT_INFO->hostname =
197                                (char *)strndup(uridata,
198                                                (scan - uridata));
199                        RT_INFO->port =
200                                atoi(++scan);
201                }
202        }
203
204        return rt_connect(libtrace);
205}
206       
207static int rt_start_input(struct libtrace_t *libtrace) {
208        rt_header_t start_msg;
209
210        start_msg.type = RT_START;
211        start_msg.length = sizeof(rt_start_t);
212
213        printf("Sending start - len: %d\n", start_msg.length);
214       
215        /* Need to send start message to server */
216        if (send(RT_INFO->input_fd, &start_msg, sizeof(rt_header_t) +
217                                start_msg.length, 0) != sizeof(rt_header_t)) {
218                printf("Failed to send start message to server\n");
219                return -1;
220        }
221
222        return 0;
223}
224
225static int rt_fin_input(struct libtrace_t *libtrace) {
226        rt_header_t close_msg;
227
228        close_msg.type = RT_CLOSE;
229        close_msg.length = sizeof(rt_close_t);
230       
231        /* Send a close message to the server */
232        if (send(RT_INFO->input_fd, &close_msg, sizeof(rt_header_t) + 
233                                close_msg.length, 0) != sizeof(rt_header_t)) {
234                printf("Failed to send close message to server\n");
235       
236        }
237        if (RT_INFO->dummy_erf) 
238                trace_destroy_dead(RT_INFO->dummy_erf);
239               
240        if (RT_INFO->dummy_pcap)
241                trace_destroy_dead(RT_INFO->dummy_pcap);
242
243        if (RT_INFO->dummy_wag)
244                trace_destroy_dead(RT_INFO->dummy_wag);
245        close(RT_INFO->input_fd);
246        free(libtrace->format_data);
247        return 0;
248}
249
250static int rt_read(struct libtrace_t *libtrace, void *buffer, size_t len) {
251        int numbytes;
252
253        while(1) {
254#ifndef MSG_NOSIGNAL
255#  define MSG_NOSIGNAL 0
256#endif
257                if ((numbytes = recv(RT_INFO->input_fd,
258                                                buffer,
259                                                len,
260                                                MSG_NOSIGNAL)) == -1) {
261                        if (errno == EINTR) {
262                                /* ignore EINTR in case
263                                 * a caller is using signals
264                                 */
265                                continue;
266                        }
267                        perror("recv");
268                        return -1;
269                }
270                break;
271
272        }
273        return numbytes;
274}
275
276
277static int rt_set_format(struct libtrace_t *libtrace, 
278                struct libtrace_packet_t *packet) {
279        switch (packet->type) {
280                case RT_DATA_ERF:
281                        if (!RT_INFO->dummy_erf) {
282                                RT_INFO->dummy_erf = trace_create_dead("erf:-");
283                        }
284                        packet->trace = RT_INFO->dummy_erf;
285                        break;
286                case RT_DATA_PCAP:
287                        if (!RT_INFO->dummy_pcap) {
288                                RT_INFO->dummy_pcap = trace_create_dead("pcap:-");
289                        }
290                        packet->trace = RT_INFO->dummy_pcap;
291                        break;
292                case RT_DATA_WAG:
293                        if (!RT_INFO->dummy_wag) {
294                                RT_INFO->dummy_wag = trace_create_dead("wtf:-");
295                        }
296                        packet->trace = RT_INFO->dummy_wag;
297                        break;
298                case RT_DATA_LEGACY_ETH:
299                case RT_DATA_LEGACY_ATM:
300                case RT_DATA_LEGACY_POS:
301                        printf("Sending legacy over RT is currently not supported\n");
302                        return -1;
303                default:
304                        printf("Unrecognised format: %d\n", packet->type);
305                        return -1;
306        }
307        return 1;
308}               
309
310static void rt_set_payload(struct libtrace_packet_t *packet) {
311        dag_record_t *erfptr;
312       
313        switch (packet->type) {
314                case RT_DATA_ERF:
315                        erfptr = (dag_record_t *)packet->header;
316                       
317                        if (erfptr->flags.rxerror == 1) {
318                                packet->payload = NULL;
319                                break;
320                        }
321                        /* else drop into the default case */
322                default:
323                        packet->payload = (char *)packet->buffer +
324                                trace_get_framing_length(packet);
325                        break;
326        }
327}
328
329static int rt_send_ack(struct libtrace_t *libtrace, 
330                struct libtrace_packet_t *packet, uint32_t seqno)  {
331       
332        static char *ack_buffer = 0;
333        char *buf_ptr;
334        int numbytes = 0;
335        int to_write = 0;
336        rt_header_t *hdr;
337        rt_ack_t *ack_hdr;
338       
339        if (!ack_buffer) {
340                ack_buffer = malloc(sizeof(rt_header_t) + sizeof(rt_ack_t));
341        }
342       
343        hdr = (rt_header_t *) ack_buffer;
344        ack_hdr = (rt_ack_t *) (ack_buffer + sizeof(rt_header_t));
345       
346        hdr->type = RT_ACK;
347        hdr->length = sizeof(rt_ack_t);
348
349        ack_hdr->sequence = seqno;
350       
351        to_write = hdr->length + sizeof(rt_header_t);
352        buf_ptr = ack_buffer;
353
354       
355        while (to_write > 0) {
356                numbytes = send(RT_INFO->input_fd, buf_ptr, to_write, 0); 
357                if (numbytes == -1) {
358                        if (errno == EINTR || errno == EAGAIN) {
359                                continue;
360                        }
361                        else {
362                                printf("Error sending ack\n");
363                                return -1;
364                        }
365                }
366                to_write = to_write - numbytes;
367                buf_ptr = buf_ptr + to_write;
368               
369        }
370
371        return 1;
372}
373       
374static int rt_read_packet(struct libtrace_t *libtrace, 
375                struct libtrace_packet_t *packet) {
376       
377        char buf[RP_BUFSIZE];
378        int read_required = 0;
379        rt_header_t pkt_hdr;
380        char msg_buf[RP_BUFSIZE];
381        int pkt_size = 0;
382       
383        void *buffer = 0;
384
385        if (packet->buf_control == TRACE_CTRL_EXTERNAL || !packet->buffer) {
386                packet->buf_control = TRACE_CTRL_PACKET;
387                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
388        } 
389
390        buffer = packet->buffer;
391        packet->header = packet->buffer;
392
393        /* FIXME: Better error handling required */
394        if (rt_read(libtrace, &pkt_hdr, sizeof(rt_header_t)) !=
395                        sizeof(rt_header_t)) {
396                printf("Error receiving rt header\n");
397                return -1;
398        }
399
400        packet->type = pkt_hdr.type;
401        pkt_size = pkt_hdr.length;
402
403        switch(packet->type) {
404                case RT_DATA_ERF:
405                case RT_DATA_PCAP:
406                case RT_DATA_WAG:
407                case RT_DATA_LEGACY_ETH:
408                case RT_DATA_LEGACY_POS:
409                case RT_DATA_LEGACY_ATM:
410                        if (rt_read(libtrace, buffer, pkt_size) != pkt_size) {
411                                printf("Error receiving packet\n");
412                                return -1;
413                        }
414                       
415                        if (rt_set_format(libtrace, packet) < 0) {
416                                return -1;
417                        }
418                        rt_set_payload(packet);
419
420                        if (reliability > 0) {
421                               
422                                if (rt_send_ack(libtrace, packet,
423                                       pkt_hdr.sequence) == -1)
424                                {
425                                        return -1;
426                                }
427                        }
428                        break;
429                case RT_STATUS:
430                case RT_DUCK:
431                        if (rt_read(libtrace, buffer, pkt_size) !=
432                                        pkt_size) {
433                                printf("Error receiving status packet\n");
434                                return -1;
435                        }
436                        break;
437                case RT_END_DATA:
438                        return 0;
439                case RT_PAUSE_ACK:
440                        /* FIXME: Do something useful */
441                        break;
442                case RT_OPTION:
443                        /* FIXME: Do something useful here as well */
444                        break;
445                default:
446                        printf("Bad rt type for client receipt: %d\n",
447                                        pkt_hdr.type);
448        }
449        return 1;
450       
451}
452
453static int rt_get_capture_length(const struct libtrace_packet_t *packet) {
454        switch (packet->type) {
455                case RT_DUCK:
456                        return sizeof(rt_duck_t);
457                case RT_STATUS:
458                        return sizeof(rt_status_t);
459                case RT_HELLO:
460                        return sizeof(rt_hello_t);
461                case RT_START:
462                        return sizeof(rt_start_t);
463                case RT_ACK:
464                        return sizeof(rt_ack_t);
465                case RT_END_DATA:
466                        return sizeof(rt_end_data_t);
467                case RT_CLOSE:
468                        return sizeof(rt_close_t);
469                case RT_DENY_CONN:
470                        return sizeof(rt_deny_conn_t);
471                case RT_PAUSE:
472                        return sizeof(rt_pause_t);
473                case RT_PAUSE_ACK:
474                        return sizeof(rt_pause_ack_t);
475                case RT_OPTION:
476                        return sizeof(rt_option_t);
477        }
478        printf("Unknown type: %d\n", packet->type);
479        return 0;
480}
481                       
482static int rt_get_framing_length(const struct libtrace_packet_t *packet) {
483        return 0;
484}
485
486
487static int rt_get_fd(const struct libtrace_t *trace) {
488        return trace->format_data->input_fd;
489}
490
491
492
493static void rt_help() {
494        printf("rt format module\n");
495        printf("Supported input URIs:\n");
496        printf("\trt:hostname:port\n");
497        printf("\trt:hostname (connects on default port)\n");
498        printf("\n");
499        printf("\te.g.: rt:localhost\n");
500        printf("\te.g.: rt:localhost:32500\n");
501        printf("\n");
502
503}
504
505
506static struct libtrace_format_t rt = {
507        "rt",
508        "$Id$",
509        TRACE_FORMAT_RT,
510        rt_init_input,                  /* init_input */
511        NULL,                           /* config_input */
512        rt_start_input,                 /* start_input */
513        NULL,                           /* init_output */
514        NULL,                           /* config_output */
515        NULL,                           /* start_output */
516        NULL,                           /* pause_output */
517        rt_fin_input,                   /* fin_input */
518        NULL,                           /* fin_output */
519        rt_read_packet,                 /* read_packet */
520        NULL,                           /* write_packet */
521        NULL,                           /* get_link_type */
522        NULL,                           /* get_direction */
523        NULL,                           /* set_direction */
524        NULL,                           /* get_erf_timestamp */
525        NULL,                           /* get_timeval */
526        NULL,                           /* get_seconds */
527        NULL,                           /* seek_erf */
528        NULL,                           /* seek_timeval */
529        NULL,                           /* seek_seconds */
530        rt_get_capture_length,          /* get_capture_length */
531        NULL,                           /* get_wire_length */
532        rt_get_framing_length,          /* get_framing_length */
533        NULL,                           /* set_capture_length */
534        rt_get_fd,                      /* get_fd */
535        trace_event_device,             /* trace_event */
536        rt_help                   /* help */
537};
538
539void __attribute__((constructor)) rt_constructor() {
540        register_format(&rt);
541}
Note: See TracBrowser for help on using the repository browser.