source: lib/format_rt.c @ 1935565

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivegetfragoffhelplibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 1935565 was e90d5a8, checked in by Shane Alcock <salcock@…>, 9 years ago
  • Added a couple of new error types: BAD_FILTER and RT_FAILURE, with the aim of trying to restrict the use of BAD_PACKET to cases where the packet is clearly corrupt or broken
  • Property mode set to 100644
File size: 22.9 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Authors: Daniel Lawson
8 *          Perry Lorier
9 *          Shane Alcock
10 *         
11 * All rights reserved.
12 *
13 * This code has been developed by the University of Waikato WAND
14 * research group. For further information please see http://www.wand.net.nz/
15 *
16 * libtrace is free software; you can redistribute it and/or modify
17 * it under the terms of the GNU General Public License as published by
18 * the Free Software Foundation; either version 2 of the License, or
19 * (at your option) any later version.
20 *
21 * libtrace is distributed in the hope that it will be useful,
22 * but WITHOUT ANY WARRANTY; without even the implied warranty of
23 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24 * GNU General Public License for more details.
25 *
26 * You should have received a copy of the GNU General Public License
27 * along with libtrace; if not, write to the Free Software
28 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
29 *
30 * $Id$
31 *
32 */
33
34
35#define _GNU_SOURCE
36
37#include "config.h"
38#include "common.h"
39#include "libtrace.h"
40#include "libtrace_int.h"
41#include "format_helper.h"
42#include "rt_protocol.h"
43
44#include <sys/stat.h>
45#include <assert.h>
46#include <errno.h>
47#include <fcntl.h>
48#include <stdio.h>
49#include <string.h>
50#include <stdlib.h>
51#include <unistd.h>
52
53#ifndef WIN32
54# include <netdb.h>
55#endif
56
57#define RT_INFO ((struct rt_format_data_t*)libtrace->format_data)
58
59/* Convert the RT denial code into a nice printable and coherent string */
60static const char *rt_deny_reason(enum rt_conn_denied_t reason) 
61{
62        const char *string = 0;
63
64        switch(reason) {
65                case RT_DENY_WRAPPER:
66                        string = "Rejected by TCP Wrappers";
67                        break;
68                case RT_DENY_FULL:
69                        string = "Max connections reached on server";
70                        break;
71                case RT_DENY_AUTH:
72                        string = "Authentication failed";
73                        break;
74                default:
75                        string = "Unknown reason";
76        }
77
78        return string;
79}
80
81
82struct rt_format_data_t {
83        /* Name of the host to connect to */
84        char *hostname;
85        /* Buffer to store received packets into */
86        char *pkt_buffer;
87        /* Pointer to the next packet to be read from the buffer */
88        char *buf_current;
89        /* Amount of buffer space used */
90        size_t buf_filled;
91        /* The port to connect to */
92        int port;
93        /* The file descriptor for the RT connection */
94        int input_fd;
95        /* Flag indicating whether the server is doing reliable RT */
96        int reliable;
97
98        /* Header for the packet currently being received */
99        rt_header_t rt_hdr;
100       
101        /* Dummy traces that can be assigned to the received packets to ensure
102         * that the appropriate functions can be used to process them */
103        libtrace_t *dummy_duck;
104        libtrace_t *dummy_erf;
105        libtrace_t *dummy_pcap;
106        libtrace_t *dummy_linux;
107};
108
109/* Connects to an RT server
110 *
111 * Returns -1 if an error occurs
112 */
113static int rt_connect(libtrace_t *libtrace) {
114        struct hostent *he;
115        struct sockaddr_in remote;
116        rt_header_t connect_msg;
117        rt_deny_conn_t deny_hdr;       
118        rt_hello_t hello_opts;
119        uint8_t reason;
120       
121        if ((he=gethostbyname(RT_INFO->hostname)) == NULL) {
122                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
123                                "Failed to convert hostname %s to address",
124                                RT_INFO->hostname);
125                return -1;
126        }
127        if ((RT_INFO->input_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
128                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
129                                "Could not create socket");
130                return -1;
131        }
132
133        memset(&remote,0, sizeof(remote));
134        remote.sin_family = AF_INET;
135        remote.sin_port = htons(RT_INFO->port);
136        remote.sin_addr = *((struct in_addr *)he->h_addr);
137
138        if (connect(RT_INFO->input_fd, (struct sockaddr *)&remote,
139                                (socklen_t)sizeof(struct sockaddr)) == -1) {
140                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
141                                "Could not connect to host %s on port %d",
142                                RT_INFO->hostname, RT_INFO->port);
143                return -1;
144        }
145
146        /* We are connected, now receive message from server */
147       
148        if (recv(RT_INFO->input_fd, (void*)&connect_msg, sizeof(rt_header_t), 0) != sizeof(rt_header_t) ) {
149                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
150                                "Could not receive connection message from %s",
151                                RT_INFO->hostname);
152                return -1;
153        }
154       
155        switch (connect_msg.type) {
156                case TRACE_RT_DENY_CONN:
157                        /* Connection was denied */
158                       
159                        if (recv(RT_INFO->input_fd, (void*)&deny_hdr, 
160                                                sizeof(rt_deny_conn_t),
161                                                0) != sizeof(rt_deny_conn_t)) {
162                                reason = 0;
163                        }       
164                        reason = deny_hdr.reason;
165                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
166                                "Connection attempt is denied: %s",
167                                rt_deny_reason(reason));       
168                        return -1;
169                case TRACE_RT_HELLO:
170                        /* Hello message - read the options sent to us by the
171                         * server */
172                        if (recv(RT_INFO->input_fd, (void*)&hello_opts, 
173                                                sizeof(rt_hello_t), 0)
174                                        != sizeof(rt_hello_t)) {
175                                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
176                                        "Failed to receive TRACE_RT_HELLO options");
177                                return -1;
178                        }
179                        RT_INFO->reliable = hello_opts.reliable;
180                       
181                        return 0;
182                default:
183                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
184                                        "Unknown message type received: %d",
185                                        connect_msg.type);
186                        return -1;
187        }
188        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
189                        "Somehow you managed to reach this unreachable code");
190        return -1;
191}
192
193static void rt_init_format_data(libtrace_t *libtrace) {
194        libtrace->format_data = malloc(sizeof(struct rt_format_data_t));
195
196        RT_INFO->dummy_duck = NULL;
197        RT_INFO->dummy_erf = NULL;
198        RT_INFO->dummy_pcap = NULL;
199        RT_INFO->dummy_linux = NULL;
200        RT_INFO->pkt_buffer = NULL;
201        RT_INFO->buf_current = NULL;
202        RT_INFO->buf_filled = 0;
203        RT_INFO->hostname = NULL;
204        RT_INFO->port = 0;
205}
206
207static int rt_init_input(libtrace_t *libtrace) {
208        char *scan;
209        char *uridata = libtrace->uridata;
210
211        rt_init_format_data(libtrace);
212
213        /* If the user specifies "rt:" then assume localhost and the default
214         * port */     
215        if (strlen(uridata) == 0) {
216                RT_INFO->hostname =
217                        strdup("localhost");
218                RT_INFO->port =
219                        COLLECTOR_PORT;
220        } else {
221                /* If the user does not specify a port, assume the default
222                 * port */
223                if ((scan = strchr(uridata,':')) == NULL) {
224                        RT_INFO->hostname =
225                                strdup(uridata);
226                        RT_INFO->port =
227                                COLLECTOR_PORT;
228                } else {
229                        RT_INFO->hostname =
230                                (char *)strndup(uridata,
231                                                (size_t)(scan - uridata));
232                        RT_INFO->port =
233                                atoi(++scan);
234                }
235        }
236
237        return 0;
238}
239       
240static int rt_start_input(libtrace_t *libtrace) {
241        rt_header_t start_msg;
242
243        start_msg.type = TRACE_RT_START;
244        start_msg.length = 0; 
245
246        if (rt_connect(libtrace) == -1)
247                return -1;
248       
249        /* Need to send start message to server */
250        if (send(RT_INFO->input_fd, (void*)&start_msg, sizeof(rt_header_t) +
251                                start_msg.length, 0) != sizeof(rt_header_t)) {
252                printf("Failed to send start message to server\n");
253                return -1;
254        }
255        RT_INFO->rt_hdr.type = TRACE_RT_LAST;
256
257        return 0;
258}
259
260static int rt_pause_input(libtrace_t *libtrace) {
261        rt_header_t close_msg;
262
263        close_msg.type = TRACE_RT_CLOSE;
264        close_msg.length = 0; 
265       
266        /* Send a close message to the server */
267        if (send(RT_INFO->input_fd, (void*)&close_msg, sizeof(rt_header_t) + 
268                                close_msg.length, 0) != (int)sizeof(rt_header_t)
269                                + close_msg.length) {
270                printf("Failed to send close message to server\n");
271       
272        }
273
274        close(RT_INFO->input_fd);
275        return 0;
276}
277
278static int rt_fin_input(libtrace_t *libtrace) {
279        /* Make sure we clean up any dummy traces that we have been using */
280       
281        if (RT_INFO->dummy_duck)
282                trace_destroy_dead(RT_INFO->dummy_duck);
283
284        if (RT_INFO->dummy_erf) 
285                trace_destroy_dead(RT_INFO->dummy_erf);
286               
287        if (RT_INFO->dummy_pcap)
288                trace_destroy_dead(RT_INFO->dummy_pcap);
289
290        if (RT_INFO->dummy_linux)
291                trace_destroy_dead(RT_INFO->dummy_linux);
292        free(libtrace->format_data);
293        return 0;
294}
295
296
297/* I've upped this to 10K to deal with jumbo-grams that have not been snapped
298 * in any way. This means we have a much larger memory overhead per packet
299 * (which won't be used in the vast majority of cases), so we may want to think
300 * about doing something smarter, e.g. allocate a smaller block of memory and
301 * only increase it as required.
302 *
303 * XXX Capturing off int: can still lead to packets that are larger than 10K,
304 * in instances where the fragmentation is done magically by the NIC. This
305 * is pretty nasty, but also very rare.
306 */
307#define RT_BUF_SIZE 10000U
308
309/* Receives data from an RT server */
310static int rt_read(libtrace_t *libtrace, void **buffer, size_t len, int block) 
311{
312        int numbytes;
313       
314        assert(len <= RT_BUF_SIZE);
315       
316        if (!RT_INFO->pkt_buffer) {
317                RT_INFO->pkt_buffer = (char*)malloc((size_t)RT_BUF_SIZE);
318                RT_INFO->buf_current = RT_INFO->pkt_buffer;
319                RT_INFO->buf_filled = 0;
320        }
321
322#ifndef MSG_DONTWAIT
323#define MSG_DONTWAIT 0
324#endif
325
326        if (block)
327                block=0;
328        else
329                block=MSG_DONTWAIT;
330
331        /* If we don't have enough buffer space for the amount we want to
332         * read, move the current buffer contents to the front of the buffer
333         * to make room */
334        if (len > RT_INFO->buf_filled) {
335                memcpy(RT_INFO->pkt_buffer, RT_INFO->buf_current, 
336                                RT_INFO->buf_filled);
337                RT_INFO->buf_current = RT_INFO->pkt_buffer;
338#ifndef MSG_NOSIGNAL
339#  define MSG_NOSIGNAL 0
340#endif
341                /* Loop as long as we don't have all the data that we were
342                 * asked for */
343                while (len > RT_INFO->buf_filled) {
344                        if ((numbytes = recv(RT_INFO->input_fd,
345                                                RT_INFO->buf_current + 
346                                                RT_INFO->buf_filled,
347                                                RT_BUF_SIZE-RT_INFO->buf_filled,
348                                                MSG_NOSIGNAL|block)) <= 0) {
349                                if (numbytes == 0) {
350                                        trace_set_err(libtrace, TRACE_ERR_RT_FAILURE, 
351                                                        "No data received");
352                                        return -1;
353                                }
354                               
355                                if (errno == EINTR) {
356                                        /* ignore EINTR in case
357                                         * a caller is using signals
358                                         */
359                                        continue;
360                                }
361                                if (errno == EAGAIN) {
362                                        /* We asked for non-blocking mode, so
363                                         * we need to return now */
364                                        trace_set_err(libtrace,
365                                                        EAGAIN,
366                                                        "EAGAIN");
367                                        return -1;
368                                }
369                               
370                                perror("recv");
371                                trace_set_err(libtrace, errno,
372                                                "Failed to read data into rt recv buffer");
373                                return -1;
374                        }
375                        RT_INFO->buf_filled+=numbytes;
376                }
377
378        }
379        *buffer = RT_INFO->buf_current;
380        RT_INFO->buf_current += len;
381        RT_INFO->buf_filled -= len;
382        return len;
383}
384
385
386/* Sets the trace format for the packet to match the format it was originally
387 * captured in, rather than the RT format */
388static int rt_set_format(libtrace_t *libtrace, libtrace_packet_t *packet) 
389{
390
391        /* We need to assign the packet to a "dead" trace */
392
393        /* Try to minimize the number of corrupt packets that slip through
394         * while making it easy to identify new pcap DLTs */
395        if (packet->type > TRACE_RT_DATA_DLT && 
396                        packet->type < TRACE_RT_DATA_DLT_END) {
397                if (!RT_INFO->dummy_pcap) {
398                        RT_INFO->dummy_pcap = trace_create_dead("pcap:-");
399                }
400                packet->trace = RT_INFO->dummy_pcap;
401                return 0;       
402        }
403
404        switch (packet->type) {
405                case TRACE_RT_DUCK_2_4:
406                case TRACE_RT_DUCK_2_5:
407                        if (!RT_INFO->dummy_duck) {
408                                RT_INFO->dummy_duck = trace_create_dead("duck:dummy");
409                        }
410                        packet->trace = RT_INFO->dummy_duck;
411                        break;
412                case TRACE_RT_DATA_ERF:
413                        if (!RT_INFO->dummy_erf) {
414                                RT_INFO->dummy_erf = trace_create_dead("erf:-");
415                        }
416                        packet->trace = RT_INFO->dummy_erf;
417                        break;
418                case TRACE_RT_DATA_LINUX_NATIVE:
419                        if (!RT_INFO->dummy_linux) {
420                                RT_INFO->dummy_linux = trace_create_dead("int:");
421                        }
422                        packet->trace = RT_INFO->dummy_linux;
423                        break;
424                case TRACE_RT_STATUS:
425                case TRACE_RT_METADATA:
426                        /* Just use the RT trace! */
427                        packet->trace = libtrace;
428                        break;
429                case TRACE_RT_DATA_LEGACY_ETH:
430                case TRACE_RT_DATA_LEGACY_ATM:
431                case TRACE_RT_DATA_LEGACY_POS:
432                        printf("Sending legacy over RT is currently not supported\n");
433                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, "Legacy packet cannot be sent over rt");
434                        return -1;
435                default:
436                        printf("Unrecognised format: %u\n", packet->type);
437                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, "Unrecognised packet format");
438                        return -1;
439        }
440        return 0; /* success */
441}               
442
443/* Sends an RT ACK to the server to acknowledge receipt of packets */
444static int rt_send_ack(libtrace_t *libtrace, 
445                uint32_t seqno)  {
446       
447        static char *ack_buffer = 0;
448        char *buf_ptr;
449        int numbytes = 0;
450        size_t to_write = 0;
451        rt_header_t *hdr;
452        rt_ack_t *ack_hdr;
453       
454        if (!ack_buffer) {
455                ack_buffer = (char*)malloc(sizeof(rt_header_t) 
456                                                        + sizeof(rt_ack_t));
457        }
458       
459        hdr = (rt_header_t *) ack_buffer;
460        ack_hdr = (rt_ack_t *) (ack_buffer + sizeof(rt_header_t));
461       
462        hdr->type = TRACE_RT_ACK;
463        hdr->length = sizeof(rt_ack_t);
464
465        ack_hdr->sequence = seqno;
466       
467        to_write = hdr->length + sizeof(rt_header_t);
468        buf_ptr = ack_buffer;
469
470        /* Keep trying until we write the entire ACK */
471        while (to_write > 0) {
472                numbytes = send(RT_INFO->input_fd, buf_ptr, to_write, 0); 
473                if (numbytes == -1) {
474                        if (errno == EINTR || errno == EAGAIN) {
475                                continue;
476                        }
477                        else {
478                                printf("Error sending ack\n");
479                                perror("send");
480                                trace_set_err(libtrace, TRACE_ERR_RT_FAILURE, 
481                                                "Error sending ack");
482                                return -1;
483                        }
484                }
485                to_write = to_write - numbytes;
486                buf_ptr = buf_ptr + to_write;
487               
488        }
489
490        return 1;
491}
492
493/* Shouldn't need to call this too often */
494static int rt_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
495                void *buffer, libtrace_rt_types_t rt_type, uint32_t flags) {
496
497        if (packet->buffer != buffer &&
498                        packet->buf_control == TRACE_CTRL_PACKET) {
499                free(packet->buffer);
500        }
501
502        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
503                packet->buf_control = TRACE_CTRL_PACKET;
504        } else
505                packet->buf_control = TRACE_CTRL_EXTERNAL;
506
507
508        packet->buffer = buffer;
509        packet->header = NULL;
510        packet->type = rt_type;
511        packet->payload = buffer;
512
513        if (libtrace->format_data == NULL) {
514                rt_init_format_data(libtrace);
515        }
516
517        return 0;
518}       
519
520/* Reads the body of an RT packet from the network */
521static int rt_read_data_packet(libtrace_t *libtrace,
522                libtrace_packet_t *packet, int blocking) {
523        uint32_t prep_flags = 0;
524
525        prep_flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
526
527        /* The stored RT header will tell us how much data we need to read */
528        if (rt_read(libtrace, &packet->buffer, (size_t)RT_INFO->rt_hdr.length, 
529                                blocking) != RT_INFO->rt_hdr.length) {
530                return -1;
531        }
532
533        /* Send an ACK if required */
534        if (RT_INFO->reliable > 0 && packet->type >= TRACE_RT_DATA_SIMPLE) {
535                if (rt_send_ack(libtrace, RT_INFO->rt_hdr.sequence) == -1)
536                                return -1;
537        }
538       
539        /* Convert to the original capture format */
540        if (rt_set_format(libtrace, packet) < 0) {
541                return -1;
542        }
543               
544        /* Update payload pointers and packet type to match the original
545         * format */
546        if (trace_prepare_packet(packet->trace, packet, packet->buffer,
547                                packet->type, prep_flags)) {
548                return -1;
549        }
550
551        return 0;
552}
553
554/* Reads an RT packet from the network. Will block if the "blocking" flag is
555 * set to 1, otherwise will return if insufficient data is available */
556static int rt_read_packet_versatile(libtrace_t *libtrace,
557                libtrace_packet_t *packet,int blocking) {
558        rt_header_t *pkt_hdr = NULL;
559        void *void_hdr;
560        libtrace_rt_types_t switch_type;
561       
562        if (packet->buf_control == TRACE_CTRL_PACKET) {
563                packet->buf_control = TRACE_CTRL_EXTERNAL;
564                free(packet->buffer);
565                packet->buffer = NULL;
566        }
567
568        /* RT_LAST indicates that we need to read the RT header for the next
569         * packet. This is a touch hax, I admit */
570        if (RT_INFO->rt_hdr.type == TRACE_RT_LAST) {
571                void_hdr = (void *)pkt_hdr;
572                /* FIXME: Better error handling required */
573                if (rt_read(libtrace, &void_hdr, 
574                                sizeof(rt_header_t),blocking) !=
575                                sizeof(rt_header_t)) {
576                        return -1;
577                }
578                pkt_hdr = (rt_header_t *)void_hdr;
579               
580                /* Need to store these in case the next rt_read overwrites
581                 * the buffer they came from! */
582                RT_INFO->rt_hdr.type = pkt_hdr->type;
583                RT_INFO->rt_hdr.length = pkt_hdr->length;
584                RT_INFO->rt_hdr.sequence = pkt_hdr->sequence;
585        }
586        packet->type = RT_INFO->rt_hdr.type;
587       
588        /* All data-bearing packets (as opposed to RT internal messages)
589         * should be treated the same way when it comes to reading the rest
590         * of the packet */
591        if (packet->type >= TRACE_RT_DATA_SIMPLE) {
592                switch_type = TRACE_RT_DATA_SIMPLE;
593        } else {
594                switch_type = packet->type;
595        }
596
597        switch(switch_type) {
598                case TRACE_RT_DATA_SIMPLE:
599                case TRACE_RT_DUCK_2_4:
600                case TRACE_RT_DUCK_2_5:
601                case TRACE_RT_STATUS:
602                case TRACE_RT_METADATA:
603                        if (rt_read_data_packet(libtrace, packet, blocking))
604                                return -1;
605                        break;
606                case TRACE_RT_END_DATA:
607                case TRACE_RT_KEYCHANGE:
608                case TRACE_RT_LOSTCONN:
609                case TRACE_RT_CLIENTDROP:
610                case TRACE_RT_SERVERSTART:
611                        /* All these have no payload */
612                        break;
613                case TRACE_RT_PAUSE_ACK:
614                        /* XXX: Add support for this */
615                        break;
616                case TRACE_RT_OPTION:
617                        /* XXX: Add support for this */
618                        break;
619                default:
620                        printf("Bad rt type for client receipt: %d\n",
621                                        switch_type);
622                        return -1;
623        }
624                               
625                       
626               
627        /* Return the number of bytes read from the stream */
628        RT_INFO->rt_hdr.type = TRACE_RT_LAST;
629        return RT_INFO->rt_hdr.length + sizeof(rt_header_t);
630}
631
632/* Reads the next available packet in a blocking fashion */
633static int rt_read_packet(libtrace_t *libtrace,
634                libtrace_packet_t *packet) {
635        return rt_read_packet_versatile(libtrace,packet,1);
636}
637
638
639/* This should only get called for RT messages - RT-encapsulated data records
640 * should be converted to the appropriate capture format */
641static int rt_get_capture_length(const libtrace_packet_t *packet) {
642        rt_metadata_t *rt_md_hdr;
643        switch (packet->type) {
644                case TRACE_RT_STATUS:
645                        return sizeof(rt_status_t);
646                case TRACE_RT_HELLO:
647                        return sizeof(rt_hello_t);
648                case TRACE_RT_START:
649                        return 0;
650                case TRACE_RT_ACK:
651                        return sizeof(rt_ack_t);
652                case TRACE_RT_END_DATA:
653                        return 0;
654                case TRACE_RT_CLOSE:
655                        return 0;
656                case TRACE_RT_DENY_CONN:
657                        return sizeof(rt_deny_conn_t);
658                case TRACE_RT_PAUSE:
659                        return 0; 
660                case TRACE_RT_PAUSE_ACK:
661                        return 0;
662                case TRACE_RT_OPTION:
663                        return 0; /* FIXME */
664                case TRACE_RT_KEYCHANGE:
665                        return 0;
666                case TRACE_RT_LOSTCONN:
667                        return 0;
668                case TRACE_RT_SERVERSTART:
669                        return 0;
670                case TRACE_RT_CLIENTDROP:
671                        return 0;
672                case TRACE_RT_METADATA:
673                        /* This is a little trickier to work out */
674                        rt_md_hdr = (rt_metadata_t *)packet->buffer;
675                        return rt_md_hdr->label_len + rt_md_hdr->value_len + 
676                                sizeof(rt_metadata_t);
677                default:
678                        printf("Unknown type: %d\n", packet->type);
679                       
680        }
681        return 0;
682}
683
684/* RT messages do not have a wire length because they were not captured from
685 * the wire - they were generated by the capture process */
686static int rt_get_wire_length(UNUSED const libtrace_packet_t *packet) {
687        return 0;
688}
689
690/* Although RT messages do contain "framing", this framing is considered to be
691 * stripped as soon as the packet is read by the RT client */                   
692static int rt_get_framing_length(UNUSED const libtrace_packet_t *packet) {
693        return 0;
694}
695
696
697static libtrace_linktype_t rt_get_link_type(UNUSED const libtrace_packet_t *packet)
698{
699        /* RT messages don't have a link type */
700        return TRACE_TYPE_NONDATA;
701}
702
703static int rt_get_fd(const libtrace_t *trace) {
704        return ((struct rt_format_data_t *)trace->format_data)->input_fd;
705}
706
707static libtrace_eventobj_t trace_event_rt(libtrace_t *trace,
708                                        libtrace_packet_t *packet) 
709{
710        libtrace_eventobj_t event = {0,0,0.0,0};
711        libtrace_err_t read_err;
712
713        assert(trace);
714        assert(packet);
715       
716        if (trace->format->get_fd) {
717                event.fd = trace->format->get_fd(trace);
718        } else {
719                event.fd = 0;
720        }
721
722        do {
723
724                event.size = rt_read_packet_versatile(trace, packet, 0);
725                if (event.size == -1) {
726                        read_err = trace_get_err(trace);
727                        if (read_err.err_num == EAGAIN) {
728                                /* No data available - do an IOWAIT */
729                                event.type = TRACE_EVENT_IOWAIT;
730                        }
731                        else {
732                                trace_perror(trace, "Error doing a non-blocking read from rt");
733                                event.type = TRACE_EVENT_PACKET;
734                                break;
735                        }
736                } else if (event.size == 0) {
737                        /* RT gives us a specific indicator that there will be
738                         * no more packets. */
739                        if (packet->type == TRACE_RT_END_DATA)
740                                event.type = TRACE_EVENT_TERMINATE;
741                        else
742                                /* Since several RT messages can have zero-byte
743                                 * length (once the framing is removed), an
744                                 * event size of zero can still indicate a
745                                 * PACKET event */
746                                event.type = TRACE_EVENT_PACKET;
747
748                }       
749                else {
750                        event.type = TRACE_EVENT_PACKET;
751                }
752
753                if (trace->filter && event.type == TRACE_EVENT_PACKET) {
754                        if (!trace_apply_filter(trace->filter, packet)) {
755                                trace_clear_cache(packet);
756                                continue;
757                        }
758                }
759
760                break; 
761        } while (1);
762
763        return event;
764}
765
766static void rt_help(void) {
767        printf("rt format module\n");
768        printf("Supported input URIs:\n");
769        printf("\trt:hostname:port\n");
770        printf("\trt:hostname (connects on default port)\n");
771        printf("\n");
772        printf("\te.g.: rt:localhost\n");
773        printf("\te.g.: rt:localhost:32500\n");
774        printf("\n");
775
776}
777
778
779static struct libtrace_format_t rt = {
780        "rt",
781        "$Id$",
782        TRACE_FORMAT_RT,
783        NULL,                           /* probe filename */
784        NULL,                           /* probe magic */
785        rt_init_input,                  /* init_input */
786        NULL,                           /* config_input */
787        rt_start_input,                 /* start_input */
788        rt_pause_input,                 /* pause */
789        NULL,                           /* init_output */
790        NULL,                           /* config_output */
791        NULL,                           /* start_output */
792        rt_fin_input,                   /* fin_input */
793        NULL,                           /* fin_output */
794        rt_read_packet,                 /* read_packet */
795        rt_prepare_packet,              /* prepare_packet */
796        NULL,                           /* fin_packet */
797        NULL,                           /* write_packet */
798        rt_get_link_type,               /* get_link_type */
799        NULL,                           /* get_direction */
800        NULL,                           /* set_direction */
801        NULL,                           /* get_erf_timestamp */
802        NULL,                           /* get_timeval */
803        NULL,                           /* get_timespec */
804        NULL,                           /* get_seconds */
805        NULL,                           /* seek_erf */
806        NULL,                           /* seek_timeval */
807        NULL,                           /* seek_seconds */
808        rt_get_capture_length,          /* get_capture_length */
809        rt_get_wire_length,                     /* get_wire_length */
810        rt_get_framing_length,          /* get_framing_length */
811        NULL,                           /* set_capture_length */
812        NULL,                           /* get_received_packets */
813        NULL,                           /* get_filtered_packets */
814        NULL,                           /* get_dropped_packets */
815        NULL,                           /* get_captured_packets */
816        rt_get_fd,                      /* get_fd */
817        trace_event_rt,             /* trace_event */
818        rt_help,                        /* help */
819        NULL                            /* next pointer */
820};
821
822void rt_constructor(void) {
823        register_format(&rt);
824}
Note: See TracBrowser for help on using the repository browser.