source: lib/format_rt.c @ 322c516

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 322c516 was 32751a2, checked in by Shane Alcock <salcock@…>, 6 years ago

Make RT safe for use as a parallel input

Previously, the RT format used to assume that only one read packet would
be in use at any given time, so subsequent read_packet calls could overwrite
the buffer space that previous packets had occupied. With parallel libtrace,
this is no longer the case.

For now, I've gone with the safe but slow solution of copying every
complete RT packet out of the receive buffer into memory space that has
been allocated to the libtrace_packet_t. It'd be nice to do this without
the copy, but we'd need to start tracking which packets have been returned
to us to be able to do this properly.

  • Property mode set to 100644
File size: 26.0 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Authors: Daniel Lawson
8 *          Perry Lorier
9 *          Shane Alcock
10 *         
11 * All rights reserved.
12 *
13 * This code has been developed by the University of Waikato WAND
14 * research group. For further information please see http://www.wand.net.nz/
15 *
16 * libtrace is free software; you can redistribute it and/or modify
17 * it under the terms of the GNU General Public License as published by
18 * the Free Software Foundation; either version 2 of the License, or
19 * (at your option) any later version.
20 *
21 * libtrace is distributed in the hope that it will be useful,
22 * but WITHOUT ANY WARRANTY; without even the implied warranty of
23 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24 * GNU General Public License for more details.
25 *
26 * You should have received a copy of the GNU General Public License
27 * along with libtrace; if not, write to the Free Software
28 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
29 *
30 * $Id$
31 *
32 */
33
34
35#define _GNU_SOURCE
36
37#include "config.h"
38#include "common.h"
39#include "libtrace.h"
40#include "libtrace_int.h"
41#include "format_helper.h"
42#include "rt_protocol.h"
43
44#include <sys/stat.h>
45#include <assert.h>
46#include <errno.h>
47#include <fcntl.h>
48#include <stdio.h>
49#include <string.h>
50#include <stdlib.h>
51#include <unistd.h>
52
53#ifndef WIN32
54# include <netdb.h>
55#endif
56
57#define RT_INFO ((struct rt_format_data_t*)libtrace->format_data)
58
59/* Convert the RT denial code into a nice printable and coherent string */
60static const char *rt_deny_reason(enum rt_conn_denied_t reason) 
61{
62        const char *string = 0;
63
64        switch(reason) {
65                case RT_DENY_WRAPPER:
66                        string = "Rejected by TCP Wrappers";
67                        break;
68                case RT_DENY_FULL:
69                        string = "Max connections reached on server";
70                        break;
71                case RT_DENY_AUTH:
72                        string = "Authentication failed";
73                        break;
74                default:
75                        string = "Unknown reason";
76        }
77
78        return string;
79}
80
81
82struct rt_format_data_t {
83        /* Name of the host to connect to */
84        char *hostname;
85        /* Buffer to store received packets into */
86        char *pkt_buffer;
87        /* Pointer to the next packet to be read from the buffer */
88        char *buf_current;
89        /* Amount of buffer space used */
90        size_t buf_filled;
91        /* The port to connect to */
92        int port;
93        /* The file descriptor for the RT connection */
94        int input_fd;
95        /* Flag indicating whether the server is doing reliable RT */
96        int reliable;
97
98        /* Header for the packet currently being received */
99        rt_header_t rt_hdr;
100
101        int unacked;
102       
103        /* Dummy traces that can be assigned to the received packets to ensure
104         * that the appropriate functions can be used to process them */
105        libtrace_t *dummy_duck;
106        libtrace_t *dummy_erf;
107        libtrace_t *dummy_pcap;
108        libtrace_t *dummy_linux;
109        libtrace_t *dummy_ring;
110        libtrace_t *dummy_bpf;
111};
112
113/* Connects to an RT server
114 *
115 * Returns -1 if an error occurs
116 */
117static int rt_connect(libtrace_t *libtrace) {
118        struct hostent *he;
119        struct sockaddr_in remote;
120        rt_header_t connect_msg;
121        rt_deny_conn_t deny_hdr;       
122        rt_hello_t hello_opts;
123        uint8_t reason;
124       
125        if ((he=gethostbyname(RT_INFO->hostname)) == NULL) {
126                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
127                                "Failed to convert hostname %s to address",
128                                RT_INFO->hostname);
129                return -1;
130        }
131        if ((RT_INFO->input_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
132                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
133                                "Could not create socket");
134                return -1;
135        }
136
137        memset(&remote,0, sizeof(remote));
138        remote.sin_family = AF_INET;
139        remote.sin_port = htons(RT_INFO->port);
140        remote.sin_addr = *((struct in_addr *)he->h_addr);
141
142        if (connect(RT_INFO->input_fd, (struct sockaddr *)&remote,
143                                (socklen_t)sizeof(struct sockaddr)) == -1) {
144                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
145                                "Could not connect to host %s on port %d",
146                                RT_INFO->hostname, RT_INFO->port);
147                return -1;
148        }
149
150        /* We are connected, now receive message from server */
151       
152        if (recv(RT_INFO->input_fd, (void*)&connect_msg, sizeof(rt_header_t), 0) != sizeof(rt_header_t) ) {
153                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
154                                "Could not receive connection message from %s",
155                                RT_INFO->hostname);
156                return -1;
157        }
158       
159        if (connect_msg.magic != LIBTRACE_RT_MAGIC) {
160                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
161                        "RT version mismatch: magic byte is incorrect");
162                return -1;
163        }
164
165        if (connect_msg.version != LIBTRACE_RT_VERSION) {
166                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
167                        "RT version mismatch: version is incorrect (expected %d, got %d",
168                        LIBTRACE_RT_VERSION, connect_msg.version);
169                return -1;
170        }
171
172               
173       
174        switch (ntohl(connect_msg.type)) {
175                case TRACE_RT_DENY_CONN:
176                        /* Connection was denied */
177                       
178                        if (recv(RT_INFO->input_fd, (void*)&deny_hdr, 
179                                                sizeof(rt_deny_conn_t),
180                                                0) != sizeof(rt_deny_conn_t)) {
181                                reason = 0;
182                        }       
183                        reason = ntohl(deny_hdr.reason);
184                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
185                                "Connection attempt is denied: %s",
186                                rt_deny_reason(reason));       
187                        return -1;
188                case TRACE_RT_HELLO:
189                        /* Hello message - read the options sent to us by the
190                         * server */
191                        if (recv(RT_INFO->input_fd, (void*)&hello_opts, 
192                                                sizeof(rt_hello_t), 0)
193                                        != sizeof(rt_hello_t)) {
194                                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
195                                        "Failed to receive TRACE_RT_HELLO options");
196                                return -1;
197                        }
198                       
199                       
200                        RT_INFO->reliable = hello_opts.reliable;
201                        RT_INFO->unacked = 0;
202                        return 0;
203                default:
204                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
205                                        "Unknown message type received: %d",
206                                        connect_msg.type);
207                        return -1;
208        }
209        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
210                        "Somehow you managed to reach this unreachable code");
211        return -1;
212}
213
214static void rt_init_format_data(libtrace_t *libtrace) {
215        libtrace->format_data = malloc(sizeof(struct rt_format_data_t));
216
217        RT_INFO->dummy_duck = NULL;
218        RT_INFO->dummy_erf = NULL;
219        RT_INFO->dummy_pcap = NULL;
220        RT_INFO->dummy_linux = NULL;
221        RT_INFO->dummy_ring = NULL;
222        RT_INFO->dummy_bpf = NULL;
223        RT_INFO->pkt_buffer = NULL;
224        RT_INFO->buf_current = NULL;
225        RT_INFO->buf_filled = 0;
226        RT_INFO->hostname = NULL;
227        RT_INFO->port = 0;
228        RT_INFO->unacked = 0;
229}
230
231static int rt_init_input(libtrace_t *libtrace) {
232        char *scan;
233        char *uridata = libtrace->uridata;
234
235        rt_init_format_data(libtrace);
236
237        /* If the user specifies "rt:" then assume localhost and the default
238         * port */     
239        if (strlen(uridata) == 0) {
240                RT_INFO->hostname =
241                        strdup("localhost");
242                RT_INFO->port =
243                        COLLECTOR_PORT;
244        } else {
245                /* If the user does not specify a port, assume the default
246                 * port */
247                if ((scan = strchr(uridata,':')) == NULL) {
248                        RT_INFO->hostname =
249                                strdup(uridata);
250                        RT_INFO->port =
251                                COLLECTOR_PORT;
252                } else {
253                        RT_INFO->hostname =
254                                (char *)strndup(uridata,
255                                                (size_t)(scan - uridata));
256                        RT_INFO->port =
257                                atoi(++scan);
258                }
259        }
260
261        return 0;
262}
263       
264static int rt_start_input(libtrace_t *libtrace) {
265        rt_header_t start_msg;
266
267        start_msg.type = htonl(TRACE_RT_START);
268        start_msg.length = 0; 
269
270        if (rt_connect(libtrace) == -1)
271                return -1;
272       
273        /* Need to send start message to server */
274        if (send(RT_INFO->input_fd, (void*)&start_msg, sizeof(rt_header_t) +
275                                start_msg.length, 0) != sizeof(rt_header_t)) {
276                printf("Failed to send start message to server\n");
277                return -1;
278        }
279        RT_INFO->rt_hdr.type = TRACE_RT_LAST;
280
281        return 0;
282}
283
284static int rt_pause_input(libtrace_t *libtrace) {
285        rt_header_t close_msg;
286
287        close_msg.type = htonl(TRACE_RT_CLOSE);
288        close_msg.length = 0; 
289       
290        /* Send a close message to the server */
291        if (send(RT_INFO->input_fd, (void*)&close_msg, sizeof(rt_header_t) + 
292                                close_msg.length, 0) != (int)sizeof(rt_header_t)
293                                + close_msg.length) {
294                printf("Failed to send close message to server\n");
295       
296        }
297
298        close(RT_INFO->input_fd);
299        return 0;
300}
301
302static int rt_fin_input(libtrace_t *libtrace) {
303        /* Make sure we clean up any dummy traces that we have been using */
304       
305        if (RT_INFO->dummy_duck)
306                trace_destroy_dead(RT_INFO->dummy_duck);
307
308        if (RT_INFO->dummy_erf) 
309                trace_destroy_dead(RT_INFO->dummy_erf);
310               
311        if (RT_INFO->dummy_pcap)
312                trace_destroy_dead(RT_INFO->dummy_pcap);
313
314        if (RT_INFO->dummy_linux)
315                trace_destroy_dead(RT_INFO->dummy_linux);
316       
317        if (RT_INFO->dummy_ring)
318                trace_destroy_dead(RT_INFO->dummy_ring);
319
320        if (RT_INFO->dummy_bpf)
321                trace_destroy_dead(RT_INFO->dummy_bpf);
322        free(libtrace->format_data);
323        return 0;
324}
325
326
327/* I've upped this to 10K to deal with jumbo-grams that have not been snapped
328 * in any way. This means we have a much larger memory overhead per packet
329 * (which won't be used in the vast majority of cases), so we may want to think
330 * about doing something smarter, e.g. allocate a smaller block of memory and
331 * only increase it as required.
332 *
333 * XXX Capturing off int: can still lead to packets that are larger than 10K,
334 * in instances where the fragmentation is done magically by the NIC. This
335 * is pretty nasty, but also very rare.
336 */
337#define RT_BUF_SIZE (LIBTRACE_PACKET_BUFSIZE * 2)
338
339/* Receives data from an RT server */
340static int rt_read(libtrace_t *libtrace, void *buffer, size_t len, int block) 
341{
342        int numbytes;
343       
344        assert(len <= RT_BUF_SIZE);
345       
346        if (!RT_INFO->pkt_buffer) {
347                RT_INFO->pkt_buffer = (char*)malloc((size_t)RT_BUF_SIZE);
348                RT_INFO->buf_current = RT_INFO->pkt_buffer;
349                RT_INFO->buf_filled = 0;
350        }
351
352#ifndef MSG_DONTWAIT
353#define MSG_DONTWAIT 0
354#endif
355
356        if (block)
357                block=0;
358        else
359                block=MSG_DONTWAIT;
360
361        /* If we don't have enough buffer space for the amount we want to
362         * read, move the current buffer contents to the front of the buffer
363         * to make room */
364        if (len > RT_INFO->buf_filled) {
365                memcpy(RT_INFO->pkt_buffer, RT_INFO->buf_current, 
366                                RT_INFO->buf_filled);
367                RT_INFO->buf_current = RT_INFO->pkt_buffer;
368#ifndef MSG_NOSIGNAL
369#  define MSG_NOSIGNAL 0
370#endif
371                /* Loop as long as we don't have all the data that we were
372                 * asked for */
373                while (len > RT_INFO->buf_filled) {
374                        if ((numbytes = recv(RT_INFO->input_fd,
375                                                RT_INFO->buf_current + 
376                                                RT_INFO->buf_filled,
377                                                RT_BUF_SIZE-RT_INFO->buf_filled,
378                                                MSG_NOSIGNAL|block)) <= 0) {
379                                if (numbytes == 0) {
380                                        trace_set_err(libtrace, TRACE_ERR_RT_FAILURE, 
381                                                        "No data received");
382                                        return -1;
383                                }
384                               
385                                if (errno == EINTR) {
386                                        /* ignore EINTR in case
387                                         * a caller is using signals
388                                         */
389                                        continue;
390                                }
391                                if (errno == EAGAIN) {
392                                        /* We asked for non-blocking mode, so
393                                         * we need to return now */
394                                        trace_set_err(libtrace,
395                                                        EAGAIN,
396                                                        "EAGAIN");
397                                        return -1;
398                                }
399                               
400                                perror("recv");
401                                trace_set_err(libtrace, errno,
402                                                "Failed to read data into rt recv buffer");
403                                return -1;
404                        }
405                        RT_INFO->buf_filled+=numbytes;
406                }
407
408        }
409        memcpy(buffer, RT_INFO->buf_current, len);
410        RT_INFO->buf_current += len;
411        RT_INFO->buf_filled -= len;
412        return len;
413}
414
415
416/* Sets the trace format for the packet to match the format it was originally
417 * captured in, rather than the RT format */
418static int rt_set_format(libtrace_t *libtrace, libtrace_packet_t *packet) 
419{
420
421        /* We need to assign the packet to a "dead" trace */
422
423        /* Try to minimize the number of corrupt packets that slip through
424         * while making it easy to identify new pcap DLTs */
425        if (packet->type > TRACE_RT_DATA_DLT && 
426                        packet->type < TRACE_RT_DATA_DLT_END) {
427                if (!RT_INFO->dummy_pcap) {
428                        RT_INFO->dummy_pcap = trace_create_dead("pcap:-");
429                }
430                packet->trace = RT_INFO->dummy_pcap;
431                return 0;       
432        }
433
434        if (packet->type > TRACE_RT_DATA_BPF &&
435                        packet->type < TRACE_RT_DATA_BPF_END) {
436
437                if (!RT_INFO->dummy_bpf) {
438                        RT_INFO->dummy_bpf = trace_create_dead("bpf:-");
439                        /* This may fail on a non-BSD machine */
440                        if (trace_is_err(RT_INFO->dummy_bpf)) {
441                                trace_perror(RT_INFO->dummy_bpf, "Creating dead bpf trace");
442                                return -1;
443                        }
444                }
445                packet->trace = RT_INFO->dummy_bpf;
446                return 0;
447        }
448
449        switch (packet->type) {
450                case TRACE_RT_DUCK_2_4:
451                case TRACE_RT_DUCK_2_5:
452                case TRACE_RT_DUCK_5_0:
453                        if (!RT_INFO->dummy_duck) {
454                                RT_INFO->dummy_duck = trace_create_dead("duck:dummy");
455                        }
456                        packet->trace = RT_INFO->dummy_duck;
457                        break;
458                case TRACE_RT_DATA_ERF:
459                        if (!RT_INFO->dummy_erf) {
460                                RT_INFO->dummy_erf = trace_create_dead("erf:-");
461                        }
462                        packet->trace = RT_INFO->dummy_erf;
463                        break;
464                case TRACE_RT_DATA_LINUX_NATIVE:
465                        if (!RT_INFO->dummy_linux) {
466                                RT_INFO->dummy_linux = trace_create_dead("int:");
467                                /* This may fail on a non-Linux machine */
468                                if (trace_is_err(RT_INFO->dummy_linux)) {
469                                        trace_perror(RT_INFO->dummy_linux, "Creating dead int trace");
470                                        return -1;
471                                }
472                        }
473                        packet->trace = RT_INFO->dummy_linux;
474                        break;
475                case TRACE_RT_DATA_LINUX_RING:
476                        if (!RT_INFO->dummy_ring) {
477                                RT_INFO->dummy_ring = trace_create_dead("ring:");
478                                /* This may fail on a non-Linux machine */
479                                if (trace_is_err(RT_INFO->dummy_ring)) {
480                                        trace_perror(RT_INFO->dummy_ring, "Creating dead ring trace");
481                                        return -1;
482                                }
483                        }
484                        packet->trace = RT_INFO->dummy_ring;
485                        break;
486                case TRACE_RT_STATUS:
487                case TRACE_RT_METADATA:
488                        /* Just use the RT trace! */
489                        packet->trace = libtrace;
490                        break;
491                case TRACE_RT_DATA_LEGACY_ETH:
492                case TRACE_RT_DATA_LEGACY_ATM:
493                case TRACE_RT_DATA_LEGACY_POS:
494                        printf("Sending legacy over RT is currently not supported\n");
495                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, "Legacy packet cannot be sent over rt");
496                        return -1;
497                default:
498                        printf("Unrecognised format: %u\n", packet->type);
499                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, "Unrecognised packet format");
500                        return -1;
501        }
502        return 0; /* success */
503}               
504
505/* Sends an RT ACK to the server to acknowledge receipt of packets */
506static int rt_send_ack(libtrace_t *libtrace, 
507                uint32_t seqno)  {
508       
509        static char *ack_buffer = 0;
510        char *buf_ptr;
511        int numbytes = 0;
512        size_t to_write = 0;
513        rt_header_t *hdr;
514        rt_ack_t *ack_hdr;
515       
516        if (!ack_buffer) {
517                ack_buffer = (char*)malloc(sizeof(rt_header_t) 
518                                                        + sizeof(rt_ack_t));
519        }
520       
521        hdr = (rt_header_t *) ack_buffer;
522        ack_hdr = (rt_ack_t *) (ack_buffer + sizeof(rt_header_t));
523       
524        hdr->type = htonl(TRACE_RT_ACK);
525        hdr->length = htons(sizeof(rt_ack_t));
526
527        ack_hdr->sequence = htonl(seqno);
528       
529        to_write = sizeof(rt_ack_t) + sizeof(rt_header_t);
530        buf_ptr = ack_buffer;
531
532        /* Keep trying until we write the entire ACK */
533        while (to_write > 0) {
534                numbytes = send(RT_INFO->input_fd, buf_ptr, to_write, 0); 
535                if (numbytes == -1) {
536                        if (errno == EINTR || errno == EAGAIN) {
537                                continue;
538                        }
539                        else {
540                                printf("Error sending ack\n");
541                                perror("send");
542                                trace_set_err(libtrace, TRACE_ERR_RT_FAILURE, 
543                                                "Error sending ack");
544                                return -1;
545                        }
546                }
547                to_write = to_write - numbytes;
548                buf_ptr = buf_ptr + to_write;
549               
550        }
551
552        return 1;
553}
554
555/* Shouldn't need to call this too often */
556static int rt_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
557                void *buffer, libtrace_rt_types_t rt_type, uint32_t flags) {
558
559        if (packet->buffer != buffer &&
560                        packet->buf_control == TRACE_CTRL_PACKET) {
561                free(packet->buffer);
562        }
563
564        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
565                packet->buf_control = TRACE_CTRL_PACKET;
566        } else
567                packet->buf_control = TRACE_CTRL_EXTERNAL;
568
569
570        packet->buffer = buffer;
571        packet->header = NULL;
572        packet->type = rt_type;
573        packet->payload = buffer;
574
575        if (libtrace->format_data == NULL) {
576                rt_init_format_data(libtrace);
577        }
578
579        return 0;
580}       
581
582/* Reads the body of an RT packet from the network */
583static int rt_read_data_packet(libtrace_t *libtrace,
584                libtrace_packet_t *packet, int blocking) {
585        uint32_t prep_flags = 0;
586
587        prep_flags |= TRACE_PREP_OWN_BUFFER;
588
589        /* The stored RT header will tell us how much data we need to read */
590        if (rt_read(libtrace, packet->buffer, (size_t)RT_INFO->rt_hdr.length, 
591                                blocking) != RT_INFO->rt_hdr.length) {
592                return -1;
593        }
594
595        /* Send an ACK if required */
596        if (RT_INFO->reliable > 0 && packet->type >= TRACE_RT_DATA_SIMPLE) {
597                RT_INFO->unacked ++;
598                if (RT_INFO->unacked >= RT_ACK_FREQUENCY) {
599                        if (rt_send_ack(libtrace, RT_INFO->rt_hdr.sequence) 
600                                        == -1)
601                                return -1;
602                        RT_INFO->unacked = 0;
603                }
604        }
605       
606        /* Convert to the original capture format */
607        if (rt_set_format(libtrace, packet) < 0) {
608                return -1;
609        }
610               
611        /* Update payload pointers and packet type to match the original
612         * format */
613        if (trace_prepare_packet(packet->trace, packet, packet->buffer,
614                                packet->type, prep_flags)) {
615                return -1;
616        }
617
618        return 0;
619}
620
621/* Reads an RT packet from the network. Will block if the "blocking" flag is
622 * set to 1, otherwise will return if insufficient data is available */
623static int rt_read_packet_versatile(libtrace_t *libtrace,
624                libtrace_packet_t *packet,int blocking) {
625        rt_header_t hdr;
626        rt_header_t *pkt_hdr = NULL;
627        void *void_hdr;
628        libtrace_rt_types_t switch_type;
629       
630        /* RT_LAST indicates that we need to read the RT header for the next
631         * packet. This is a touch hax, I admit */
632        if (RT_INFO->rt_hdr.type == TRACE_RT_LAST) {
633                /* FIXME: Better error handling required */
634                if (rt_read(libtrace, (void *)&hdr, 
635                                sizeof(rt_header_t),blocking) !=
636                                sizeof(rt_header_t)) {
637                        return -1;
638                }
639                /* Need to store these in case the next rt_read overwrites
640                 * the buffer they came from! */
641                RT_INFO->rt_hdr.type = ntohl(hdr.type);
642                RT_INFO->rt_hdr.length = ntohs(hdr.length);
643                RT_INFO->rt_hdr.sequence = ntohl(hdr.sequence);
644        }
645
646        if (packet->buf_control == TRACE_CTRL_PACKET) {
647                if (packet->buffer == NULL) {
648                        packet->buffer = malloc(RT_INFO->rt_hdr.length + sizeof(rt_header_t));
649
650                } else if (RT_INFO->rt_hdr.length > sizeof(packet->buffer)) {
651                        packet->buffer = realloc(packet->buffer, RT_INFO->rt_hdr.length + sizeof(rt_header_t));
652                }
653        }
654       
655        packet->type = RT_INFO->rt_hdr.type;
656        packet->payload = packet->buffer;
657       
658       
659        /* All data-bearing packets (as opposed to RT internal messages)
660         * should be treated the same way when it comes to reading the rest
661         * of the packet */
662        if (packet->type >= TRACE_RT_DATA_SIMPLE) {
663                switch_type = TRACE_RT_DATA_SIMPLE;
664        } else {
665                switch_type = packet->type;
666        }
667
668        switch(switch_type) {
669                case TRACE_RT_DATA_SIMPLE:
670                case TRACE_RT_DUCK_2_4:
671                case TRACE_RT_DUCK_2_5:
672                case TRACE_RT_STATUS:
673                case TRACE_RT_METADATA:
674                        if (rt_read_data_packet(libtrace, packet, blocking))
675                                return -1;
676                        break;
677                case TRACE_RT_END_DATA:
678                case TRACE_RT_KEYCHANGE:
679                case TRACE_RT_LOSTCONN:
680                case TRACE_RT_CLIENTDROP:
681                case TRACE_RT_SERVERSTART:
682                        /* All these have no payload */
683                        packet->header = packet->buffer;
684                        packet->payload = ((char *)packet->buffer + sizeof(rt_header_t));
685                        pkt_hdr = (rt_header_t *)packet->header;
686                        pkt_hdr->type = ntohl(RT_INFO->rt_hdr.type);
687                        pkt_hdr->length = ntohs(RT_INFO->rt_hdr.length);
688                        pkt_hdr->sequence = ntohl(RT_INFO->rt_hdr.sequence);
689
690                        /* XXX Do we need to save the other crap? */
691                        break;
692                case TRACE_RT_PAUSE_ACK:
693                        /* XXX: Add support for this */
694                        break;
695                case TRACE_RT_OPTION:
696                        /* XXX: Add support for this */
697                        break;
698                default:
699                        printf("Bad rt type for client receipt: %d\n",
700                                        switch_type);
701                        return -1;
702        }
703       
704        /* Return the number of bytes read from the stream */
705        RT_INFO->rt_hdr.type = TRACE_RT_LAST;
706        return RT_INFO->rt_hdr.length + sizeof(rt_header_t);
707}
708
709/* Reads the next available packet in a blocking fashion */
710static int rt_read_packet(libtrace_t *libtrace,
711                libtrace_packet_t *packet) {
712        return rt_read_packet_versatile(libtrace,packet,1);
713}
714
715
716/* This should only get called for RT messages - RT-encapsulated data records
717 * should be converted to the appropriate capture format */
718static int rt_get_capture_length(const libtrace_packet_t *packet) {
719        rt_metadata_t *rt_md_hdr;
720        switch (packet->type) {
721                case TRACE_RT_STATUS:
722                        return sizeof(rt_status_t);
723                case TRACE_RT_HELLO:
724                        return sizeof(rt_hello_t);
725                case TRACE_RT_START:
726                        return 0;
727                case TRACE_RT_ACK:
728                        return sizeof(rt_ack_t);
729                case TRACE_RT_END_DATA:
730                        return 0;
731                case TRACE_RT_CLOSE:
732                        return 0;
733                case TRACE_RT_DENY_CONN:
734                        return sizeof(rt_deny_conn_t);
735                case TRACE_RT_PAUSE:
736                        return 0; 
737                case TRACE_RT_PAUSE_ACK:
738                        return 0;
739                case TRACE_RT_OPTION:
740                        return 0; /* FIXME */
741                case TRACE_RT_KEYCHANGE:
742                        return 0;
743                case TRACE_RT_LOSTCONN:
744                        return 0;
745                case TRACE_RT_SERVERSTART:
746                        return 0;
747                case TRACE_RT_CLIENTDROP:
748                        return 0;
749                case TRACE_RT_METADATA:
750                        /* This is a little trickier to work out */
751                        rt_md_hdr = (rt_metadata_t *)packet->buffer;
752                        return rt_md_hdr->label_len + rt_md_hdr->value_len + 
753                                sizeof(rt_metadata_t);
754                default:
755                        printf("Unknown type: %d\n", packet->type);
756                       
757        }
758        return 0;
759}
760
761/* RT messages do not have a wire length because they were not captured from
762 * the wire - they were generated by the capture process */
763static int rt_get_wire_length(UNUSED const libtrace_packet_t *packet) {
764        return 0;
765}
766
767/* Although RT messages do contain "framing", this framing is considered to be
768 * stripped as soon as the packet is read by the RT client */                   
769static int rt_get_framing_length(UNUSED const libtrace_packet_t *packet) {
770        return 0;
771}
772
773
774static libtrace_linktype_t rt_get_link_type(UNUSED const libtrace_packet_t *packet)
775{
776        /* RT messages don't have a link type */
777        return TRACE_TYPE_NONDATA;
778}
779
780static int rt_get_fd(const libtrace_t *trace) {
781        return ((struct rt_format_data_t *)trace->format_data)->input_fd;
782}
783
784static libtrace_eventobj_t trace_event_rt(libtrace_t *trace,
785                                        libtrace_packet_t *packet) 
786{
787        libtrace_eventobj_t event = {0,0,0.0,0};
788        libtrace_err_t read_err;
789
790        assert(trace);
791        assert(packet);
792       
793        if (trace->format->get_fd) {
794                event.fd = trace->format->get_fd(trace);
795        } else {
796                event.fd = 0;
797        }
798
799        do {
800
801                event.size = rt_read_packet_versatile(trace, packet, 0);
802                if (event.size == -1) {
803                        read_err = trace_get_err(trace);
804                        if (read_err.err_num == EAGAIN) {
805                                /* No data available - do an IOWAIT */
806                                event.type = TRACE_EVENT_IOWAIT;
807                        }
808                        else {
809                                trace_perror(trace, "Error doing a non-blocking read from rt");
810                                event.type = TRACE_EVENT_PACKET;
811                                break;
812                        }
813                } else if (event.size == 0) {
814                        /* RT gives us a specific indicator that there will be
815                         * no more packets. */
816                        if (packet->type == TRACE_RT_END_DATA)
817                                event.type = TRACE_EVENT_TERMINATE;
818                        else {
819                                /* Since several RT messages can have zero-byte
820                                 * length (once the framing is removed), an
821                                 * event size of zero can still indicate a
822                                 * PACKET event */
823                                event.type = TRACE_EVENT_PACKET;
824                                trace->accepted_packets ++;
825                        }
826
827                }       
828                else {
829                        event.type = TRACE_EVENT_PACKET;
830                        trace->accepted_packets ++;
831                }
832
833                if (trace->filter && event.type == TRACE_EVENT_PACKET) {
834                        if (!trace_apply_filter(trace->filter, packet)) {
835                                trace_clear_cache(packet);
836                                trace->filtered_packets ++;
837                                continue;
838                        }
839                }
840
841                break; 
842        } while (1);
843
844        return event;
845}
846
847static void rt_help(void) {
848        printf("rt format module\n");
849        printf("Supported input URIs:\n");
850        printf("\trt:hostname:port\n");
851        printf("\trt:hostname (connects on default port)\n");
852        printf("\n");
853        printf("\te.g.: rt:localhost\n");
854        printf("\te.g.: rt:localhost:32500\n");
855        printf("\n");
856
857}
858
859
860static struct libtrace_format_t rt = {
861        "rt",
862        "$Id$",
863        TRACE_FORMAT_RT,
864        NULL,                           /* probe filename */
865        NULL,                           /* probe magic */
866        rt_init_input,                  /* init_input */
867        NULL,                           /* config_input */
868        rt_start_input,                 /* start_input */
869        rt_pause_input,                 /* pause */
870        NULL,                           /* init_output */
871        NULL,                           /* config_output */
872        NULL,                           /* start_output */
873        rt_fin_input,                   /* fin_input */
874        NULL,                           /* fin_output */
875        rt_read_packet,                 /* read_packet */
876        rt_prepare_packet,              /* prepare_packet */
877        NULL,                           /* fin_packet */
878        NULL,                           /* write_packet */
879        rt_get_link_type,               /* get_link_type */
880        NULL,                           /* get_direction */
881        NULL,                           /* set_direction */
882        NULL,                           /* get_erf_timestamp */
883        NULL,                           /* get_timeval */
884        NULL,                           /* get_timespec */
885        NULL,                           /* get_seconds */
886        NULL,                           /* seek_erf */
887        NULL,                           /* seek_timeval */
888        NULL,                           /* seek_seconds */
889        rt_get_capture_length,          /* get_capture_length */
890        rt_get_wire_length,                     /* get_wire_length */
891        rt_get_framing_length,          /* get_framing_length */
892        NULL,                           /* set_capture_length */
893        NULL,                           /* get_received_packets */
894        NULL,                           /* get_filtered_packets */
895        NULL,                           /* get_dropped_packets */
896        NULL,                           /* get_statistics */
897        rt_get_fd,                      /* get_fd */
898        trace_event_rt,             /* trace_event */
899        rt_help,                        /* help */
900        NULL,                   /* next pointer */
901        NON_PARALLEL(true) /* This is normally live */
902};
903
904void rt_constructor(void) {
905        register_format(&rt);
906}
Note: See TracBrowser for help on using the repository browser.