source: lib/format_rt.c @ 2ffda28

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivegetfragoffhelplibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 2ffda28 was 2ffda28, checked in by Perry Lorier <perry@…>, 15 years ago

Make sure we do large reads in format_rt to keep system % cpu down.

  • Property mode set to 100644
File size: 15.0 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2004 The University of Waikato, Hamilton, New Zealand.
5 * Authors: Daniel Lawson
6 *          Perry Lorier
7 *          Shane Alcock
8 *
9 * All rights reserved.
10 *
11 * This code has been developed by the University of Waikato WAND
12 * research group. For further information please see http://www.wand.net.nz/
13 *
14 * libtrace is free software; you can redistribute it and/or modify
15 * it under the terms of the GNU General Public License as published by
16 * the Free Software Foundation; either version 2 of the License, or
17 * (at your option) any later version.
18 *
19 * libtrace is distributed in the hope that it will be useful,
20 * but WITHOUT ANY WARRANTY; without even the implied warranty of
21 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22 * GNU General Public License for more details.
23 *
24 * You should have received a copy of the GNU General Public License
25 * along with libtrace; if not, write to the Free Software
26 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
27 *
28 * $Id$
29 *
30 */
31
32#define _GNU_SOURCE
33
34#include "config.h"
35#include "common.h"
36#include "libtrace.h"
37#include "libtrace_int.h"
38#include "format_helper.h"
39#include "parse_cmd.h"
40#include "rt_protocol.h"
41
42#ifdef HAVE_INTTYPES_H
43#  include <inttypes.h>
44#else
45#  error "Can't find inttypes.h - this needs to be fixed"
46#endif
47
48#ifdef HAVE_STDDEF_H
49#  include <stddef.h>
50#else
51# error "Can't find stddef.h - do you define ptrdiff_t elsewhere?"
52#endif
53#include <sys/types.h>
54#include <sys/socket.h>
55#include <sys/un.h>
56#include <sys/mman.h>
57#include <sys/stat.h>
58#include <unistd.h>
59#include <assert.h>
60#include <errno.h>
61#include <netdb.h>
62#include <fcntl.h>
63#include <getopt.h>
64#include <stdio.h>
65#include <string.h>
66#include <stdlib.h>
67
68#define RT_INFO ((struct rt_format_data_t*)libtrace->format_data)
69
70int reliability = 0;
71
72char *rt_deny_reason(uint8_t reason) {
73        char *string = 0;
74
75        switch(reason) {
76                case RT_DENY_WRAPPER:
77                        string = "Rejected by TCP Wrappers";
78                        break;
79                case RT_DENY_FULL:
80                        string = "Max connections reached on server";
81                        break;
82                case RT_DENY_AUTH:
83                        string = "Authentication failed";
84                        break;
85                default:
86                        string = "Unknown reason";
87        }
88
89        return string;
90}
91
92
93struct rt_format_data_t {
94        char *hostname;
95        int port;
96        int input_fd;
97        int reliable;
98        char *pkt_buffer;
99        char *buf_current;
100        int buf_left;
101
102       
103        struct libtrace_t *dummy_erf;
104        struct libtrace_t *dummy_pcap;
105        struct libtrace_t *dummy_wag;
106};
107
108static struct libtrace_format_t rt;
109
110static int rt_connect(struct libtrace_t *libtrace) {
111        struct hostent *he;
112        struct sockaddr_in remote;
113        rt_header_t connect_msg;
114        rt_deny_conn_t deny_hdr;       
115        rt_hello_t hello_opts;
116        uint8_t reason;
117       
118        if ((he=gethostbyname(RT_INFO->hostname)) == NULL) {
119                perror("gethostbyname");
120                return -1;
121        }
122        if ((RT_INFO->input_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
123                perror("socket");
124                return -1;
125        }
126
127        remote.sin_family = AF_INET;
128        remote.sin_port = htons(RT_INFO->port);
129        remote.sin_addr = *((struct in_addr *)he->h_addr);
130        bzero(&(remote.sin_zero), 8);
131
132        if (connect(RT_INFO->input_fd, (struct sockaddr *)&remote,
133                                sizeof(struct sockaddr)) == -1) {
134                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
135                                "Could not connect to host %s on port %d",
136                                RT_INFO->hostname, RT_INFO->port);
137                return -1;
138        }
139       
140        /* We are connected, now receive message from server */
141       
142        if (recv(RT_INFO->input_fd, &connect_msg, sizeof(rt_header_t), 0) != sizeof(rt_header_t) ) {
143                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
144                                "Could not receive connection message from %s",
145                                RT_INFO->hostname);
146                return -1;
147        }
148
149        switch (connect_msg.type) {
150                case RT_DENY_CONN:
151                       
152                        if (recv(RT_INFO->input_fd, &deny_hdr, 
153                                                sizeof(rt_deny_conn_t),
154                                                0) != sizeof(rt_deny_conn_t)) {
155                                reason = 0;
156                        }       
157                        reason = deny_hdr.reason;
158                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
159                                "Connection attempt is denied: %s",
160                                rt_deny_reason(reason));       
161                        return -1;
162                case RT_HELLO:
163                        /* do something with options */
164                        if (recv(RT_INFO->input_fd, &hello_opts, 
165                                                sizeof(rt_hello_t), 0)
166                                        != sizeof(rt_hello_t)) {
167                                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
168                                        "Failed to receive RT_HELLO options");
169                                return -1;
170                        }
171                        reliability = hello_opts.reliable;
172                       
173                        return 0;
174                default:
175                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
176                                        "Unknown message type received: %d",
177                                        connect_msg.type);
178                        return -1;
179        }
180        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
181                        "Somehow you managed to reach this unreachable code");
182        return -1;
183}
184
185
186static int rt_init_input(struct libtrace_t *libtrace) {
187        char *scan;
188        char *uridata = libtrace->uridata;
189        libtrace->format_data = malloc(sizeof(struct rt_format_data_t));
190
191        RT_INFO->dummy_erf = NULL;
192        RT_INFO->dummy_pcap = NULL;
193        RT_INFO->dummy_wag = NULL;
194        RT_INFO->pkt_buffer = NULL;
195        RT_INFO->buf_current = NULL;
196        RT_INFO->buf_left = 0;
197       
198        if (strlen(uridata) == 0) {
199                RT_INFO->hostname =
200                        strdup("localhost");
201                RT_INFO->port =
202                        COLLECTOR_PORT;
203        } else {
204                if ((scan = strchr(uridata,':')) == NULL) {
205                        RT_INFO->hostname =
206                                strdup(uridata);
207                        RT_INFO->port =
208                                COLLECTOR_PORT;
209                } else {
210                        RT_INFO->hostname =
211                                (char *)strndup(uridata,
212                                                (scan - uridata));
213                        RT_INFO->port =
214                                atoi(++scan);
215                }
216        }
217
218        return rt_connect(libtrace);
219}
220       
221static int rt_start_input(struct libtrace_t *libtrace) {
222        rt_header_t start_msg;
223
224        start_msg.type = RT_START;
225        start_msg.length = sizeof(rt_start_t);
226
227       
228        /* Need to send start message to server */
229        if (send(RT_INFO->input_fd, &start_msg, sizeof(rt_header_t) +
230                                start_msg.length, 0) != sizeof(rt_header_t)) {
231                printf("Failed to send start message to server\n");
232                return -1;
233        }
234
235        return 0;
236}
237
238static int rt_fin_input(struct libtrace_t *libtrace) {
239        rt_header_t close_msg;
240
241        close_msg.type = RT_CLOSE;
242        close_msg.length = sizeof(rt_close_t);
243       
244        /* Send a close message to the server */
245        if (send(RT_INFO->input_fd, &close_msg, sizeof(rt_header_t) + 
246                                close_msg.length, 0) != sizeof(rt_header_t)
247                                + close_msg.length) {
248                printf("Failed to send close message to server\n");
249       
250        }
251        if (RT_INFO->dummy_erf) 
252                trace_destroy_dead(RT_INFO->dummy_erf);
253               
254        if (RT_INFO->dummy_pcap)
255                trace_destroy_dead(RT_INFO->dummy_pcap);
256
257        if (RT_INFO->dummy_wag)
258                trace_destroy_dead(RT_INFO->dummy_wag);
259        close(RT_INFO->input_fd);
260        free(libtrace->format_data);
261        return 0;
262}
263
264#define RT_BUF_SIZE 4000
265
266static int rt_read(struct libtrace_t *libtrace, void **buffer, size_t len) {
267        int numbytes;
268
269        assert(len <= RT_BUF_SIZE);
270       
271        if (!RT_INFO->pkt_buffer) {
272                RT_INFO->pkt_buffer = malloc(RT_BUF_SIZE);
273                RT_INFO->buf_current = RT_INFO->pkt_buffer;
274                RT_INFO->buf_left = 0;
275        }
276
277       
278        if (len > RT_INFO->buf_left) {
279                memcpy(RT_INFO->pkt_buffer, RT_INFO->buf_current, 
280                                RT_INFO->buf_left);
281                RT_INFO->buf_current = RT_INFO->pkt_buffer;
282
283#ifndef MSG_NOSIGNAL
284#  define MSG_NOSIGNAL 0
285#endif
286                while (len > RT_INFO->buf_left) {
287                        if ((numbytes = recv(RT_INFO->input_fd,
288                                                RT_INFO->pkt_buffer + 
289                                                RT_INFO->buf_left,
290                                                RT_BUF_SIZE-RT_INFO->buf_left,
291                                                MSG_NOSIGNAL)) == -1) {
292                                if (errno == EINTR) {
293                                        /* ignore EINTR in case
294                                         * a caller is using signals
295                                         */
296                                        continue;
297                                }
298                                perror("recv");
299                                return -1;
300                        }
301                        RT_INFO->buf_left+=numbytes;
302                }
303
304        }
305        *buffer = RT_INFO->buf_current;
306        RT_INFO->buf_current += len;
307        RT_INFO->buf_left -= len;
308        assert(RT_INFO->buf_left >= 0);
309        return len;
310}
311
312
313static int rt_set_format(libtrace_t *libtrace, libtrace_packet_t *packet) 
314{
315       
316        if (packet->type >= RT_DATA_PCAP) {
317                if (!RT_INFO->dummy_pcap) {
318                        RT_INFO->dummy_pcap = trace_create_dead("pcap:-");
319                }
320                packet->trace = RT_INFO->dummy_pcap;
321                return 0;       
322        }
323
324        switch (packet->type) {
325                case RT_DATA_ERF:
326                        if (!RT_INFO->dummy_erf) {
327                                RT_INFO->dummy_erf = trace_create_dead("erf:-");
328                        }
329                        packet->trace = RT_INFO->dummy_erf;
330                        break;
331                case RT_DATA_WAG:
332                        if (!RT_INFO->dummy_wag) {
333                                RT_INFO->dummy_wag = trace_create_dead("wtf:-");
334                        }
335                        packet->trace = RT_INFO->dummy_wag;
336                        break;
337                case RT_DATA_LEGACY_ETH:
338                case RT_DATA_LEGACY_ATM:
339                case RT_DATA_LEGACY_POS:
340                        printf("Sending legacy over RT is currently not supported\n");
341                        return -1;
342                default:
343                        printf("Unrecognised format: %d\n", packet->type);
344                        return -1;
345        }
346        return 0; /* success */
347}               
348
349static void rt_set_payload(struct libtrace_packet_t *packet) {
350        dag_record_t *erfptr;
351       
352        switch (packet->type) {
353                case RT_DATA_ERF:
354                        erfptr = (dag_record_t *)packet->header;
355                       
356                        if (erfptr->flags.rxerror == 1) {
357                                packet->payload = NULL;
358                                break;
359                        }
360                        /* else drop into the default case */
361                default:
362                        packet->payload = (char *)packet->buffer +
363                                trace_get_framing_length(packet);
364                        break;
365        }
366}
367
368static int rt_send_ack(struct libtrace_t *libtrace, 
369                uint32_t seqno)  {
370       
371        static char *ack_buffer = 0;
372        char *buf_ptr;
373        int numbytes = 0;
374        int to_write = 0;
375        rt_header_t *hdr;
376        rt_ack_t *ack_hdr;
377       
378        if (!ack_buffer) {
379                ack_buffer = malloc(sizeof(rt_header_t) + sizeof(rt_ack_t));
380        }
381       
382        hdr = (rt_header_t *) ack_buffer;
383        ack_hdr = (rt_ack_t *) (ack_buffer + sizeof(rt_header_t));
384       
385        hdr->type = RT_ACK;
386        hdr->length = sizeof(rt_ack_t);
387
388        ack_hdr->sequence = seqno;
389       
390        to_write = hdr->length + sizeof(rt_header_t);
391        buf_ptr = ack_buffer;
392
393       
394        while (to_write > 0) {
395                numbytes = send(RT_INFO->input_fd, buf_ptr, to_write, 0); 
396                if (numbytes == -1) {
397                        if (errno == EINTR || errno == EAGAIN) {
398                                continue;
399                        }
400                        else {
401                                printf("Error sending ack\n");
402                                return -1;
403                        }
404                }
405                to_write = to_write - numbytes;
406                buf_ptr = buf_ptr + to_write;
407               
408        }
409
410        return 1;
411}
412       
413static int rt_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
414        rt_header_t rt_hdr;
415        rt_header_t *pkt_hdr = &rt_hdr;
416        int pkt_size = 0;
417       
418       
419        if (packet->buf_control == TRACE_CTRL_EXTERNAL || !packet->buffer) {
420                packet->buf_control = TRACE_CTRL_PACKET;
421                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
422        } 
423
424        packet->header = packet->buffer;
425
426        /* FIXME: Better error handling required */
427        if (rt_read(libtrace, (void **)&pkt_hdr, sizeof(rt_header_t)) !=
428                        sizeof(rt_header_t)) {
429                printf("Error receiving rt header\n");
430                return -1;
431        }
432
433        packet->type = pkt_hdr->type;
434        pkt_size = pkt_hdr->length;
435        packet->size = pkt_hdr->length;
436
437        if (packet->type >= RT_DATA_SIMPLE) {
438                if (rt_read(libtrace, &packet->buffer, pkt_size) != pkt_size) {
439                        printf("Error receiving packet\n");
440                        return -1;
441                }
442               
443                if (rt_set_format(libtrace, packet) < 0) {
444                        return -1;
445                }
446                rt_set_payload(packet);
447
448                if (reliability > 0) {
449                       
450                        if (rt_send_ack(libtrace, pkt_hdr->sequence) 
451                                        == -1)
452                        {
453                                return -1;
454                        }
455                }
456        } else {
457                switch(packet->type) {
458                        case RT_STATUS:
459                        case RT_DUCK:
460                                if (rt_read(libtrace, &packet->buffer, 
461                                                        pkt_size) !=
462                                                pkt_size) {
463                                        printf("Error receiving status packet\n");
464                                        return -1;
465                                }
466                                packet->header = 0;
467                                packet->payload = packet->buffer;
468                                break;
469                        case RT_END_DATA:
470                                return 0;
471                        case RT_PAUSE_ACK:
472                                /* FIXME: Do something useful */
473                                break;
474                        case RT_OPTION:
475                                /* FIXME: Do something useful here as well */
476                                break;
477                        case RT_KEYCHANGE:
478                                break;
479                        default:
480                                printf("Bad rt type for client receipt: %d\n",
481                                        pkt_hdr->type);
482                }
483        }
484        /* Return the number of bytes read from the stream */
485        return sizeof(rt_header_t) + packet->size; 
486}
487
488static int rt_get_capture_length(const struct libtrace_packet_t *packet) {
489        switch (packet->type) {
490                case RT_DUCK:
491                        return sizeof(rt_duck_t);
492                case RT_STATUS:
493                        return sizeof(rt_status_t);
494                case RT_HELLO:
495                        return sizeof(rt_hello_t);
496                case RT_START:
497                        return sizeof(rt_start_t);
498                case RT_ACK:
499                        return sizeof(rt_ack_t);
500                case RT_END_DATA:
501                        return sizeof(rt_end_data_t);
502                case RT_CLOSE:
503                        return sizeof(rt_close_t);
504                case RT_DENY_CONN:
505                        return sizeof(rt_deny_conn_t);
506                case RT_PAUSE:
507                        return sizeof(rt_pause_t);
508                case RT_PAUSE_ACK:
509                        return sizeof(rt_pause_ack_t);
510                case RT_OPTION:
511                        return sizeof(rt_option_t);
512                case RT_KEYCHANGE:
513                        return sizeof(rt_keychange_t);
514        }
515        printf("Unknown type: %d\n", packet->type);
516        return 0;
517}
518                       
519static int rt_get_framing_length(const libtrace_packet_t *packet) {
520        return 0;
521}
522
523
524static int rt_get_fd(const libtrace_t *trace) {
525        return ((struct rt_format_data_t *)trace->format_data)->input_fd;
526}
527
528
529
530static void rt_help() {
531        printf("rt format module\n");
532        printf("Supported input URIs:\n");
533        printf("\trt:hostname:port\n");
534        printf("\trt:hostname (connects on default port)\n");
535        printf("\n");
536        printf("\te.g.: rt:localhost\n");
537        printf("\te.g.: rt:localhost:32500\n");
538        printf("\n");
539
540}
541
542
543static struct libtrace_format_t rt = {
544        "rt",
545        "$Id$",
546        TRACE_FORMAT_RT,
547        rt_init_input,                  /* init_input */
548        NULL,                           /* config_input */
549        rt_start_input,                 /* start_input */
550        NULL,                           /* init_output */
551        NULL,                           /* config_output */
552        NULL,                           /* start_output */
553        NULL,                           /* pause_output */
554        rt_fin_input,                   /* fin_input */
555        NULL,                           /* fin_output */
556        rt_read_packet,                 /* read_packet */
557        NULL,                           /* fin_packet */
558        NULL,                           /* write_packet */
559        NULL,                           /* get_link_type */
560        NULL,                           /* get_direction */
561        NULL,                           /* set_direction */
562        NULL,                           /* get_erf_timestamp */
563        NULL,                           /* get_timeval */
564        NULL,                           /* get_seconds */
565        NULL,                           /* seek_erf */
566        NULL,                           /* seek_timeval */
567        NULL,                           /* seek_seconds */
568        rt_get_capture_length,          /* get_capture_length */
569        NULL,                           /* get_wire_length */
570        rt_get_framing_length,          /* get_framing_length */
571        NULL,                           /* set_capture_length */
572        rt_get_fd,                      /* get_fd */
573        trace_event_device,             /* trace_event */
574        rt_help,                        /* help */
575        NULL                            /* next pointer */
576};
577
578void __attribute__((constructor)) rt_constructor() {
579        register_format(&rt);
580}
Note: See TracBrowser for help on using the repository browser.