source: lib/format_rt.c @ 60e8e86

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 60e8e86 was 29bbef0, checked in by Richard Sanger <rsangerarj@…>, 7 years ago

My work from over summer, with a few things tidied up and updated to include recent commits/patches to bring this up to date. Still very much work in progress.

  • Property mode set to 100644
File size: 24.1 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Authors: Daniel Lawson
8 *          Perry Lorier
9 *          Shane Alcock
10 *         
11 * All rights reserved.
12 *
13 * This code has been developed by the University of Waikato WAND
14 * research group. For further information please see http://www.wand.net.nz/
15 *
16 * libtrace is free software; you can redistribute it and/or modify
17 * it under the terms of the GNU General Public License as published by
18 * the Free Software Foundation; either version 2 of the License, or
19 * (at your option) any later version.
20 *
21 * libtrace is distributed in the hope that it will be useful,
22 * but WITHOUT ANY WARRANTY; without even the implied warranty of
23 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24 * GNU General Public License for more details.
25 *
26 * You should have received a copy of the GNU General Public License
27 * along with libtrace; if not, write to the Free Software
28 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
29 *
30 * $Id$
31 *
32 */
33
34
35#define _GNU_SOURCE
36
37#include "config.h"
38#include "common.h"
39#include "libtrace.h"
40#include "libtrace_int.h"
41#include "format_helper.h"
42#include "rt_protocol.h"
43
44#include <sys/stat.h>
45#include <assert.h>
46#include <errno.h>
47#include <fcntl.h>
48#include <stdio.h>
49#include <string.h>
50#include <stdlib.h>
51#include <unistd.h>
52
53#ifndef WIN32
54# include <netdb.h>
55#endif
56
57#define RT_INFO ((struct rt_format_data_t*)libtrace->format_data)
58
59/* Convert the RT denial code into a nice printable and coherent string */
60static const char *rt_deny_reason(enum rt_conn_denied_t reason) 
61{
62        const char *string = 0;
63
64        switch(reason) {
65                case RT_DENY_WRAPPER:
66                        string = "Rejected by TCP Wrappers";
67                        break;
68                case RT_DENY_FULL:
69                        string = "Max connections reached on server";
70                        break;
71                case RT_DENY_AUTH:
72                        string = "Authentication failed";
73                        break;
74                default:
75                        string = "Unknown reason";
76        }
77
78        return string;
79}
80
81
82struct rt_format_data_t {
83        /* Name of the host to connect to */
84        char *hostname;
85        /* Buffer to store received packets into */
86        char *pkt_buffer;
87        /* Pointer to the next packet to be read from the buffer */
88        char *buf_current;
89        /* Amount of buffer space used */
90        size_t buf_filled;
91        /* The port to connect to */
92        int port;
93        /* The file descriptor for the RT connection */
94        int input_fd;
95        /* Flag indicating whether the server is doing reliable RT */
96        int reliable;
97
98        /* Header for the packet currently being received */
99        rt_header_t rt_hdr;
100       
101        /* Dummy traces that can be assigned to the received packets to ensure
102         * that the appropriate functions can be used to process them */
103        libtrace_t *dummy_duck;
104        libtrace_t *dummy_erf;
105        libtrace_t *dummy_pcap;
106        libtrace_t *dummy_linux;
107        libtrace_t *dummy_ring;
108        libtrace_t *dummy_bpf;
109};
110
111/* Connects to an RT server
112 *
113 * Returns -1 if an error occurs
114 */
115static int rt_connect(libtrace_t *libtrace) {
116        struct hostent *he;
117        struct sockaddr_in remote;
118        rt_header_t connect_msg;
119        rt_deny_conn_t deny_hdr;       
120        rt_hello_t hello_opts;
121        uint8_t reason;
122       
123        if ((he=gethostbyname(RT_INFO->hostname)) == NULL) {
124                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
125                                "Failed to convert hostname %s to address",
126                                RT_INFO->hostname);
127                return -1;
128        }
129        if ((RT_INFO->input_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
130                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
131                                "Could not create socket");
132                return -1;
133        }
134
135        memset(&remote,0, sizeof(remote));
136        remote.sin_family = AF_INET;
137        remote.sin_port = htons(RT_INFO->port);
138        remote.sin_addr = *((struct in_addr *)he->h_addr);
139
140        if (connect(RT_INFO->input_fd, (struct sockaddr *)&remote,
141                                (socklen_t)sizeof(struct sockaddr)) == -1) {
142                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
143                                "Could not connect to host %s on port %d",
144                                RT_INFO->hostname, RT_INFO->port);
145                return -1;
146        }
147
148        /* We are connected, now receive message from server */
149       
150        if (recv(RT_INFO->input_fd, (void*)&connect_msg, sizeof(rt_header_t), 0) != sizeof(rt_header_t) ) {
151                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
152                                "Could not receive connection message from %s",
153                                RT_INFO->hostname);
154                return -1;
155        }
156       
157        switch (connect_msg.type) {
158                case TRACE_RT_DENY_CONN:
159                        /* Connection was denied */
160                       
161                        if (recv(RT_INFO->input_fd, (void*)&deny_hdr, 
162                                                sizeof(rt_deny_conn_t),
163                                                0) != sizeof(rt_deny_conn_t)) {
164                                reason = 0;
165                        }       
166                        reason = deny_hdr.reason;
167                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
168                                "Connection attempt is denied: %s",
169                                rt_deny_reason(reason));       
170                        return -1;
171                case TRACE_RT_HELLO:
172                        /* Hello message - read the options sent to us by the
173                         * server */
174                        if (recv(RT_INFO->input_fd, (void*)&hello_opts, 
175                                                sizeof(rt_hello_t), 0)
176                                        != sizeof(rt_hello_t)) {
177                                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
178                                        "Failed to receive TRACE_RT_HELLO options");
179                                return -1;
180                        }
181                        RT_INFO->reliable = hello_opts.reliable;
182                       
183                        return 0;
184                default:
185                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
186                                        "Unknown message type received: %d",
187                                        connect_msg.type);
188                        return -1;
189        }
190        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
191                        "Somehow you managed to reach this unreachable code");
192        return -1;
193}
194
195static void rt_init_format_data(libtrace_t *libtrace) {
196        libtrace->format_data = malloc(sizeof(struct rt_format_data_t));
197
198        RT_INFO->dummy_duck = NULL;
199        RT_INFO->dummy_erf = NULL;
200        RT_INFO->dummy_pcap = NULL;
201        RT_INFO->dummy_linux = NULL;
202        RT_INFO->dummy_ring = NULL;
203        RT_INFO->dummy_bpf = NULL;
204        RT_INFO->pkt_buffer = NULL;
205        RT_INFO->buf_current = NULL;
206        RT_INFO->buf_filled = 0;
207        RT_INFO->hostname = NULL;
208        RT_INFO->port = 0;
209}
210
211static int rt_init_input(libtrace_t *libtrace) {
212        char *scan;
213        char *uridata = libtrace->uridata;
214
215        rt_init_format_data(libtrace);
216
217        /* If the user specifies "rt:" then assume localhost and the default
218         * port */     
219        if (strlen(uridata) == 0) {
220                RT_INFO->hostname =
221                        strdup("localhost");
222                RT_INFO->port =
223                        COLLECTOR_PORT;
224        } else {
225                /* If the user does not specify a port, assume the default
226                 * port */
227                if ((scan = strchr(uridata,':')) == NULL) {
228                        RT_INFO->hostname =
229                                strdup(uridata);
230                        RT_INFO->port =
231                                COLLECTOR_PORT;
232                } else {
233                        RT_INFO->hostname =
234                                (char *)strndup(uridata,
235                                                (size_t)(scan - uridata));
236                        RT_INFO->port =
237                                atoi(++scan);
238                }
239        }
240
241        return 0;
242}
243       
244static int rt_start_input(libtrace_t *libtrace) {
245        rt_header_t start_msg;
246
247        start_msg.type = TRACE_RT_START;
248        start_msg.length = 0; 
249
250        if (rt_connect(libtrace) == -1)
251                return -1;
252       
253        /* Need to send start message to server */
254        if (send(RT_INFO->input_fd, (void*)&start_msg, sizeof(rt_header_t) +
255                                start_msg.length, 0) != sizeof(rt_header_t)) {
256                printf("Failed to send start message to server\n");
257                return -1;
258        }
259        RT_INFO->rt_hdr.type = TRACE_RT_LAST;
260
261        return 0;
262}
263
264static int rt_pause_input(libtrace_t *libtrace) {
265        rt_header_t close_msg;
266
267        close_msg.type = TRACE_RT_CLOSE;
268        close_msg.length = 0; 
269       
270        /* Send a close message to the server */
271        if (send(RT_INFO->input_fd, (void*)&close_msg, sizeof(rt_header_t) + 
272                                close_msg.length, 0) != (int)sizeof(rt_header_t)
273                                + close_msg.length) {
274                printf("Failed to send close message to server\n");
275       
276        }
277
278        close(RT_INFO->input_fd);
279        return 0;
280}
281
282static int rt_fin_input(libtrace_t *libtrace) {
283        /* Make sure we clean up any dummy traces that we have been using */
284       
285        if (RT_INFO->dummy_duck)
286                trace_destroy_dead(RT_INFO->dummy_duck);
287
288        if (RT_INFO->dummy_erf) 
289                trace_destroy_dead(RT_INFO->dummy_erf);
290               
291        if (RT_INFO->dummy_pcap)
292                trace_destroy_dead(RT_INFO->dummy_pcap);
293
294        if (RT_INFO->dummy_linux)
295                trace_destroy_dead(RT_INFO->dummy_linux);
296       
297        if (RT_INFO->dummy_ring)
298                trace_destroy_dead(RT_INFO->dummy_ring);
299
300        if (RT_INFO->dummy_bpf)
301                trace_destroy_dead(RT_INFO->dummy_bpf);
302        free(libtrace->format_data);
303        return 0;
304}
305
306
307/* I've upped this to 10K to deal with jumbo-grams that have not been snapped
308 * in any way. This means we have a much larger memory overhead per packet
309 * (which won't be used in the vast majority of cases), so we may want to think
310 * about doing something smarter, e.g. allocate a smaller block of memory and
311 * only increase it as required.
312 *
313 * XXX Capturing off int: can still lead to packets that are larger than 10K,
314 * in instances where the fragmentation is done magically by the NIC. This
315 * is pretty nasty, but also very rare.
316 */
317#define RT_BUF_SIZE (LIBTRACE_PACKET_BUFSIZE * 2)
318
319/* Receives data from an RT server */
320static int rt_read(libtrace_t *libtrace, void **buffer, size_t len, int block) 
321{
322        int numbytes;
323       
324        assert(len <= RT_BUF_SIZE);
325       
326        if (!RT_INFO->pkt_buffer) {
327                RT_INFO->pkt_buffer = (char*)malloc((size_t)RT_BUF_SIZE);
328                RT_INFO->buf_current = RT_INFO->pkt_buffer;
329                RT_INFO->buf_filled = 0;
330        }
331
332#ifndef MSG_DONTWAIT
333#define MSG_DONTWAIT 0
334#endif
335
336        if (block)
337                block=0;
338        else
339                block=MSG_DONTWAIT;
340
341        /* If we don't have enough buffer space for the amount we want to
342         * read, move the current buffer contents to the front of the buffer
343         * to make room */
344        if (len > RT_INFO->buf_filled) {
345                memcpy(RT_INFO->pkt_buffer, RT_INFO->buf_current, 
346                                RT_INFO->buf_filled);
347                RT_INFO->buf_current = RT_INFO->pkt_buffer;
348#ifndef MSG_NOSIGNAL
349#  define MSG_NOSIGNAL 0
350#endif
351                /* Loop as long as we don't have all the data that we were
352                 * asked for */
353                while (len > RT_INFO->buf_filled) {
354                        if ((numbytes = recv(RT_INFO->input_fd,
355                                                RT_INFO->buf_current + 
356                                                RT_INFO->buf_filled,
357                                                RT_BUF_SIZE-RT_INFO->buf_filled,
358                                                MSG_NOSIGNAL|block)) <= 0) {
359                                if (numbytes == 0) {
360                                        trace_set_err(libtrace, TRACE_ERR_RT_FAILURE, 
361                                                        "No data received");
362                                        return -1;
363                                }
364                               
365                                if (errno == EINTR) {
366                                        /* ignore EINTR in case
367                                         * a caller is using signals
368                                         */
369                                        continue;
370                                }
371                                if (errno == EAGAIN) {
372                                        /* We asked for non-blocking mode, so
373                                         * we need to return now */
374                                        trace_set_err(libtrace,
375                                                        EAGAIN,
376                                                        "EAGAIN");
377                                        return -1;
378                                }
379                               
380                                perror("recv");
381                                trace_set_err(libtrace, errno,
382                                                "Failed to read data into rt recv buffer");
383                                return -1;
384                        }
385                        RT_INFO->buf_filled+=numbytes;
386                }
387
388        }
389        *buffer = RT_INFO->buf_current;
390        RT_INFO->buf_current += len;
391        RT_INFO->buf_filled -= len;
392        return len;
393}
394
395
396/* Sets the trace format for the packet to match the format it was originally
397 * captured in, rather than the RT format */
398static int rt_set_format(libtrace_t *libtrace, libtrace_packet_t *packet) 
399{
400
401        /* We need to assign the packet to a "dead" trace */
402
403        /* Try to minimize the number of corrupt packets that slip through
404         * while making it easy to identify new pcap DLTs */
405        if (packet->type > TRACE_RT_DATA_DLT && 
406                        packet->type < TRACE_RT_DATA_DLT_END) {
407                if (!RT_INFO->dummy_pcap) {
408                        RT_INFO->dummy_pcap = trace_create_dead("pcap:-");
409                }
410                packet->trace = RT_INFO->dummy_pcap;
411                return 0;       
412        }
413
414        if (packet->type > TRACE_RT_DATA_BPF &&
415                        packet->type < TRACE_RT_DATA_BPF_END) {
416
417                if (!RT_INFO->dummy_bpf) {
418                        RT_INFO->dummy_bpf = trace_create_dead("bpf:-");
419                        /* This may fail on a non-BSD machine */
420                        if (trace_is_err(RT_INFO->dummy_bpf)) {
421                                trace_perror(RT_INFO->dummy_bpf, "Creating dead bpf trace");
422                                return -1;
423                        }
424                }
425                packet->trace = RT_INFO->dummy_bpf;
426                return 0;
427        }
428
429        switch (packet->type) {
430                case TRACE_RT_DUCK_2_4:
431                case TRACE_RT_DUCK_2_5:
432                        if (!RT_INFO->dummy_duck) {
433                                RT_INFO->dummy_duck = trace_create_dead("duck:dummy");
434                        }
435                        packet->trace = RT_INFO->dummy_duck;
436                        break;
437                case TRACE_RT_DATA_ERF:
438                        if (!RT_INFO->dummy_erf) {
439                                RT_INFO->dummy_erf = trace_create_dead("erf:-");
440                        }
441                        packet->trace = RT_INFO->dummy_erf;
442                        break;
443                case TRACE_RT_DATA_LINUX_NATIVE:
444                        if (!RT_INFO->dummy_linux) {
445                                RT_INFO->dummy_linux = trace_create_dead("int:");
446                                /* This may fail on a non-Linux machine */
447                                if (trace_is_err(RT_INFO->dummy_linux)) {
448                                        trace_perror(RT_INFO->dummy_linux, "Creating dead int trace");
449                                        return -1;
450                                }
451                        }
452                        packet->trace = RT_INFO->dummy_linux;
453                        break;
454                case TRACE_RT_DATA_LINUX_RING:
455                        if (!RT_INFO->dummy_ring) {
456                                RT_INFO->dummy_ring = trace_create_dead("ring:");
457                                /* This may fail on a non-Linux machine */
458                                if (trace_is_err(RT_INFO->dummy_ring)) {
459                                        trace_perror(RT_INFO->dummy_ring, "Creating dead int trace");
460                                        return -1;
461                                }
462                        }
463                        packet->trace = RT_INFO->dummy_ring;
464                        break;
465                case TRACE_RT_STATUS:
466                case TRACE_RT_METADATA:
467                        /* Just use the RT trace! */
468                        packet->trace = libtrace;
469                        break;
470                case TRACE_RT_DATA_LEGACY_ETH:
471                case TRACE_RT_DATA_LEGACY_ATM:
472                case TRACE_RT_DATA_LEGACY_POS:
473                        printf("Sending legacy over RT is currently not supported\n");
474                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, "Legacy packet cannot be sent over rt");
475                        return -1;
476                default:
477                        printf("Unrecognised format: %u\n", packet->type);
478                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, "Unrecognised packet format");
479                        return -1;
480        }
481        return 0; /* success */
482}               
483
484/* Sends an RT ACK to the server to acknowledge receipt of packets */
485static int rt_send_ack(libtrace_t *libtrace, 
486                uint32_t seqno)  {
487       
488        static char *ack_buffer = 0;
489        char *buf_ptr;
490        int numbytes = 0;
491        size_t to_write = 0;
492        rt_header_t *hdr;
493        rt_ack_t *ack_hdr;
494       
495        if (!ack_buffer) {
496                ack_buffer = (char*)malloc(sizeof(rt_header_t) 
497                                                        + sizeof(rt_ack_t));
498        }
499       
500        hdr = (rt_header_t *) ack_buffer;
501        ack_hdr = (rt_ack_t *) (ack_buffer + sizeof(rt_header_t));
502       
503        hdr->type = TRACE_RT_ACK;
504        hdr->length = sizeof(rt_ack_t);
505
506        ack_hdr->sequence = seqno;
507       
508        to_write = hdr->length + sizeof(rt_header_t);
509        buf_ptr = ack_buffer;
510
511        /* Keep trying until we write the entire ACK */
512        while (to_write > 0) {
513                numbytes = send(RT_INFO->input_fd, buf_ptr, to_write, 0); 
514                if (numbytes == -1) {
515                        if (errno == EINTR || errno == EAGAIN) {
516                                continue;
517                        }
518                        else {
519                                printf("Error sending ack\n");
520                                perror("send");
521                                trace_set_err(libtrace, TRACE_ERR_RT_FAILURE, 
522                                                "Error sending ack");
523                                return -1;
524                        }
525                }
526                to_write = to_write - numbytes;
527                buf_ptr = buf_ptr + to_write;
528               
529        }
530
531        return 1;
532}
533
534/* Shouldn't need to call this too often */
535static int rt_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
536                void *buffer, libtrace_rt_types_t rt_type, uint32_t flags) {
537
538        if (packet->buffer != buffer &&
539                        packet->buf_control == TRACE_CTRL_PACKET) {
540                free(packet->buffer);
541        }
542
543        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
544                packet->buf_control = TRACE_CTRL_PACKET;
545        } else
546                packet->buf_control = TRACE_CTRL_EXTERNAL;
547
548
549        packet->buffer = buffer;
550        packet->header = NULL;
551        packet->type = rt_type;
552        packet->payload = buffer;
553
554        if (libtrace->format_data == NULL) {
555                rt_init_format_data(libtrace);
556        }
557
558        return 0;
559}       
560
561/* Reads the body of an RT packet from the network */
562static int rt_read_data_packet(libtrace_t *libtrace,
563                libtrace_packet_t *packet, int blocking) {
564        uint32_t prep_flags = 0;
565
566        prep_flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
567
568        /* The stored RT header will tell us how much data we need to read */
569        if (rt_read(libtrace, &packet->buffer, (size_t)RT_INFO->rt_hdr.length, 
570                                blocking) != RT_INFO->rt_hdr.length) {
571                return -1;
572        }
573
574        /* Send an ACK if required */
575        if (RT_INFO->reliable > 0 && packet->type >= TRACE_RT_DATA_SIMPLE) {
576                if (rt_send_ack(libtrace, RT_INFO->rt_hdr.sequence) == -1)
577                                return -1;
578        }
579       
580        /* Convert to the original capture format */
581        if (rt_set_format(libtrace, packet) < 0) {
582                return -1;
583        }
584               
585        /* Update payload pointers and packet type to match the original
586         * format */
587        if (trace_prepare_packet(packet->trace, packet, packet->buffer,
588                                packet->type, prep_flags)) {
589                return -1;
590        }
591
592        return 0;
593}
594
595/* Reads an RT packet from the network. Will block if the "blocking" flag is
596 * set to 1, otherwise will return if insufficient data is available */
597static int rt_read_packet_versatile(libtrace_t *libtrace,
598                libtrace_packet_t *packet,int blocking) {
599        rt_header_t *pkt_hdr = NULL;
600        void *void_hdr;
601        libtrace_rt_types_t switch_type;
602       
603        if (packet->buf_control == TRACE_CTRL_PACKET) {
604                packet->buf_control = TRACE_CTRL_EXTERNAL;
605                free(packet->buffer);
606                packet->buffer = NULL;
607        }
608
609        /* RT_LAST indicates that we need to read the RT header for the next
610         * packet. This is a touch hax, I admit */
611        if (RT_INFO->rt_hdr.type == TRACE_RT_LAST) {
612                void_hdr = (void *)pkt_hdr;
613                /* FIXME: Better error handling required */
614                if (rt_read(libtrace, &void_hdr, 
615                                sizeof(rt_header_t),blocking) !=
616                                sizeof(rt_header_t)) {
617                        return -1;
618                }
619                pkt_hdr = (rt_header_t *)void_hdr;
620               
621                /* Need to store these in case the next rt_read overwrites
622                 * the buffer they came from! */
623                RT_INFO->rt_hdr.type = pkt_hdr->type;
624                RT_INFO->rt_hdr.length = pkt_hdr->length;
625                RT_INFO->rt_hdr.sequence = pkt_hdr->sequence;
626        }
627        packet->type = RT_INFO->rt_hdr.type;
628       
629        /* All data-bearing packets (as opposed to RT internal messages)
630         * should be treated the same way when it comes to reading the rest
631         * of the packet */
632        if (packet->type >= TRACE_RT_DATA_SIMPLE) {
633                switch_type = TRACE_RT_DATA_SIMPLE;
634        } else {
635                switch_type = packet->type;
636        }
637
638        switch(switch_type) {
639                case TRACE_RT_DATA_SIMPLE:
640                case TRACE_RT_DUCK_2_4:
641                case TRACE_RT_DUCK_2_5:
642                case TRACE_RT_STATUS:
643                case TRACE_RT_METADATA:
644                        if (rt_read_data_packet(libtrace, packet, blocking))
645                                return -1;
646                        break;
647                case TRACE_RT_END_DATA:
648                case TRACE_RT_KEYCHANGE:
649                case TRACE_RT_LOSTCONN:
650                case TRACE_RT_CLIENTDROP:
651                case TRACE_RT_SERVERSTART:
652                        /* All these have no payload */
653                        break;
654                case TRACE_RT_PAUSE_ACK:
655                        /* XXX: Add support for this */
656                        break;
657                case TRACE_RT_OPTION:
658                        /* XXX: Add support for this */
659                        break;
660                default:
661                        printf("Bad rt type for client receipt: %d\n",
662                                        switch_type);
663                        return -1;
664        }
665                               
666                       
667               
668        /* Return the number of bytes read from the stream */
669        RT_INFO->rt_hdr.type = TRACE_RT_LAST;
670        return RT_INFO->rt_hdr.length + sizeof(rt_header_t);
671}
672
673/* Reads the next available packet in a blocking fashion */
674static int rt_read_packet(libtrace_t *libtrace,
675                libtrace_packet_t *packet) {
676        return rt_read_packet_versatile(libtrace,packet,1);
677}
678
679
680/* This should only get called for RT messages - RT-encapsulated data records
681 * should be converted to the appropriate capture format */
682static int rt_get_capture_length(const libtrace_packet_t *packet) {
683        rt_metadata_t *rt_md_hdr;
684        switch (packet->type) {
685                case TRACE_RT_STATUS:
686                        return sizeof(rt_status_t);
687                case TRACE_RT_HELLO:
688                        return sizeof(rt_hello_t);
689                case TRACE_RT_START:
690                        return 0;
691                case TRACE_RT_ACK:
692                        return sizeof(rt_ack_t);
693                case TRACE_RT_END_DATA:
694                        return 0;
695                case TRACE_RT_CLOSE:
696                        return 0;
697                case TRACE_RT_DENY_CONN:
698                        return sizeof(rt_deny_conn_t);
699                case TRACE_RT_PAUSE:
700                        return 0; 
701                case TRACE_RT_PAUSE_ACK:
702                        return 0;
703                case TRACE_RT_OPTION:
704                        return 0; /* FIXME */
705                case TRACE_RT_KEYCHANGE:
706                        return 0;
707                case TRACE_RT_LOSTCONN:
708                        return 0;
709                case TRACE_RT_SERVERSTART:
710                        return 0;
711                case TRACE_RT_CLIENTDROP:
712                        return 0;
713                case TRACE_RT_METADATA:
714                        /* This is a little trickier to work out */
715                        rt_md_hdr = (rt_metadata_t *)packet->buffer;
716                        return rt_md_hdr->label_len + rt_md_hdr->value_len + 
717                                sizeof(rt_metadata_t);
718                default:
719                        printf("Unknown type: %d\n", packet->type);
720                       
721        }
722        return 0;
723}
724
725/* RT messages do not have a wire length because they were not captured from
726 * the wire - they were generated by the capture process */
727static int rt_get_wire_length(UNUSED const libtrace_packet_t *packet) {
728        return 0;
729}
730
731/* Although RT messages do contain "framing", this framing is considered to be
732 * stripped as soon as the packet is read by the RT client */                   
733static int rt_get_framing_length(UNUSED const libtrace_packet_t *packet) {
734        return 0;
735}
736
737
738static libtrace_linktype_t rt_get_link_type(UNUSED const libtrace_packet_t *packet)
739{
740        /* RT messages don't have a link type */
741        return TRACE_TYPE_NONDATA;
742}
743
744static int rt_get_fd(const libtrace_t *trace) {
745        return ((struct rt_format_data_t *)trace->format_data)->input_fd;
746}
747
748static libtrace_eventobj_t trace_event_rt(libtrace_t *trace,
749                                        libtrace_packet_t *packet) 
750{
751        libtrace_eventobj_t event = {0,0,0.0,0};
752        libtrace_err_t read_err;
753
754        assert(trace);
755        assert(packet);
756       
757        if (trace->format->get_fd) {
758                event.fd = trace->format->get_fd(trace);
759        } else {
760                event.fd = 0;
761        }
762
763        do {
764
765                event.size = rt_read_packet_versatile(trace, packet, 0);
766                if (event.size == -1) {
767                        read_err = trace_get_err(trace);
768                        if (read_err.err_num == EAGAIN) {
769                                /* No data available - do an IOWAIT */
770                                event.type = TRACE_EVENT_IOWAIT;
771                        }
772                        else {
773                                trace_perror(trace, "Error doing a non-blocking read from rt");
774                                event.type = TRACE_EVENT_PACKET;
775                                break;
776                        }
777                } else if (event.size == 0) {
778                        /* RT gives us a specific indicator that there will be
779                         * no more packets. */
780                        if (packet->type == TRACE_RT_END_DATA)
781                                event.type = TRACE_EVENT_TERMINATE;
782                        else
783                                /* Since several RT messages can have zero-byte
784                                 * length (once the framing is removed), an
785                                 * event size of zero can still indicate a
786                                 * PACKET event */
787                                event.type = TRACE_EVENT_PACKET;
788
789                }       
790                else {
791                        event.type = TRACE_EVENT_PACKET;
792                }
793
794                if (trace->filter && event.type == TRACE_EVENT_PACKET) {
795                        if (!trace_apply_filter(trace->filter, packet)) {
796                                trace_clear_cache(packet);
797                                continue;
798                        }
799                }
800
801                break; 
802        } while (1);
803
804        return event;
805}
806
807static void rt_help(void) {
808        printf("rt format module\n");
809        printf("Supported input URIs:\n");
810        printf("\trt:hostname:port\n");
811        printf("\trt:hostname (connects on default port)\n");
812        printf("\n");
813        printf("\te.g.: rt:localhost\n");
814        printf("\te.g.: rt:localhost:32500\n");
815        printf("\n");
816
817}
818
819
820static struct libtrace_format_t rt = {
821        "rt",
822        "$Id$",
823        TRACE_FORMAT_RT,
824        NULL,                           /* probe filename */
825        NULL,                           /* probe magic */
826        rt_init_input,                  /* init_input */
827        NULL,                           /* config_input */
828        rt_start_input,                 /* start_input */
829        rt_pause_input,                 /* pause */
830        NULL,                           /* init_output */
831        NULL,                           /* config_output */
832        NULL,                           /* start_output */
833        rt_fin_input,                   /* fin_input */
834        NULL,                           /* fin_output */
835        rt_read_packet,                 /* read_packet */
836        rt_prepare_packet,              /* prepare_packet */
837        NULL,                           /* fin_packet */
838        NULL,                           /* write_packet */
839        rt_get_link_type,               /* get_link_type */
840        NULL,                           /* get_direction */
841        NULL,                           /* set_direction */
842        NULL,                           /* get_erf_timestamp */
843        NULL,                           /* get_timeval */
844        NULL,                           /* get_timespec */
845        NULL,                           /* get_seconds */
846        NULL,                           /* seek_erf */
847        NULL,                           /* seek_timeval */
848        NULL,                           /* seek_seconds */
849        rt_get_capture_length,          /* get_capture_length */
850        rt_get_wire_length,                     /* get_wire_length */
851        rt_get_framing_length,          /* get_framing_length */
852        NULL,                           /* set_capture_length */
853        NULL,                           /* get_received_packets */
854        NULL,                           /* get_filtered_packets */
855        NULL,                           /* get_dropped_packets */
856        NULL,                           /* get_captured_packets */
857        rt_get_fd,                      /* get_fd */
858        trace_event_rt,             /* trace_event */
859        rt_help,                        /* help */
860        NULL, /* pstart_input */
861        NULL, /* pread_packet */
862        NULL, /* ppause_input */
863        NULL, /* pfin_input */
864        NULL, /* pconfig_input */
865        NULL                            /* next pointer */
866};
867
868void rt_constructor(void) {
869        register_format(&rt);
870}
Note: See TracBrowser for help on using the repository browser.