source: lib/format_rt.c @ 5ab626a

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 5ab626a was 5ab626a, checked in by Richard Sanger <rsangerarj@…>, 6 years ago

Deprecate trace_get_filtered/accepted/recevied/dropped() in favour of a single function

Adds the single trace_get_statistics function. This allows the structure to be filled
at a point in time, rather than making multiple calls to the library during which state
might have changed.

This has been designed such that the structure can be added to in the future without
breaking old code.

The old internal get_captured_packets was removed from the formats as it was never used.
Eventually we should completely remove get_filtered and received from the formats and replace
them with get_statistics.

In additon some extra fields have added, such as error and captured and the pre-existing
fields are better defined.

The linux formats have been updated to use this new API, which combined with reading
/proc/net/dev returns a full set of statistics.

  • Property mode set to 100644
File size: 24.3 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Authors: Daniel Lawson
8 *          Perry Lorier
9 *          Shane Alcock
10 *         
11 * All rights reserved.
12 *
13 * This code has been developed by the University of Waikato WAND
14 * research group. For further information please see http://www.wand.net.nz/
15 *
16 * libtrace is free software; you can redistribute it and/or modify
17 * it under the terms of the GNU General Public License as published by
18 * the Free Software Foundation; either version 2 of the License, or
19 * (at your option) any later version.
20 *
21 * libtrace is distributed in the hope that it will be useful,
22 * but WITHOUT ANY WARRANTY; without even the implied warranty of
23 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24 * GNU General Public License for more details.
25 *
26 * You should have received a copy of the GNU General Public License
27 * along with libtrace; if not, write to the Free Software
28 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
29 *
30 * $Id$
31 *
32 */
33
34
35#define _GNU_SOURCE
36
37#include "config.h"
38#include "common.h"
39#include "libtrace.h"
40#include "libtrace_int.h"
41#include "format_helper.h"
42#include "rt_protocol.h"
43
44#include <sys/stat.h>
45#include <assert.h>
46#include <errno.h>
47#include <fcntl.h>
48#include <stdio.h>
49#include <string.h>
50#include <stdlib.h>
51#include <unistd.h>
52
53#ifndef WIN32
54# include <netdb.h>
55#endif
56
57#define RT_INFO ((struct rt_format_data_t*)libtrace->format_data)
58
59/* Convert the RT denial code into a nice printable and coherent string */
60static const char *rt_deny_reason(enum rt_conn_denied_t reason) 
61{
62        const char *string = 0;
63
64        switch(reason) {
65                case RT_DENY_WRAPPER:
66                        string = "Rejected by TCP Wrappers";
67                        break;
68                case RT_DENY_FULL:
69                        string = "Max connections reached on server";
70                        break;
71                case RT_DENY_AUTH:
72                        string = "Authentication failed";
73                        break;
74                default:
75                        string = "Unknown reason";
76        }
77
78        return string;
79}
80
81
82struct rt_format_data_t {
83        /* Name of the host to connect to */
84        char *hostname;
85        /* Buffer to store received packets into */
86        char *pkt_buffer;
87        /* Pointer to the next packet to be read from the buffer */
88        char *buf_current;
89        /* Amount of buffer space used */
90        size_t buf_filled;
91        /* The port to connect to */
92        int port;
93        /* The file descriptor for the RT connection */
94        int input_fd;
95        /* Flag indicating whether the server is doing reliable RT */
96        int reliable;
97
98        /* Header for the packet currently being received */
99        rt_header_t rt_hdr;
100       
101        /* Dummy traces that can be assigned to the received packets to ensure
102         * that the appropriate functions can be used to process them */
103        libtrace_t *dummy_duck;
104        libtrace_t *dummy_erf;
105        libtrace_t *dummy_pcap;
106        libtrace_t *dummy_linux;
107        libtrace_t *dummy_ring;
108        libtrace_t *dummy_bpf;
109};
110
111/* Connects to an RT server
112 *
113 * Returns -1 if an error occurs
114 */
115static int rt_connect(libtrace_t *libtrace) {
116        struct hostent *he;
117        struct sockaddr_in remote;
118        rt_header_t connect_msg;
119        rt_deny_conn_t deny_hdr;       
120        rt_hello_t hello_opts;
121        uint8_t reason;
122       
123        if ((he=gethostbyname(RT_INFO->hostname)) == NULL) {
124                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
125                                "Failed to convert hostname %s to address",
126                                RT_INFO->hostname);
127                return -1;
128        }
129        if ((RT_INFO->input_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
130                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
131                                "Could not create socket");
132                return -1;
133        }
134
135        memset(&remote,0, sizeof(remote));
136        remote.sin_family = AF_INET;
137        remote.sin_port = htons(RT_INFO->port);
138        remote.sin_addr = *((struct in_addr *)he->h_addr);
139
140        if (connect(RT_INFO->input_fd, (struct sockaddr *)&remote,
141                                (socklen_t)sizeof(struct sockaddr)) == -1) {
142                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
143                                "Could not connect to host %s on port %d",
144                                RT_INFO->hostname, RT_INFO->port);
145                return -1;
146        }
147
148        /* We are connected, now receive message from server */
149       
150        if (recv(RT_INFO->input_fd, (void*)&connect_msg, sizeof(rt_header_t), 0) != sizeof(rt_header_t) ) {
151                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
152                                "Could not receive connection message from %s",
153                                RT_INFO->hostname);
154                return -1;
155        }
156       
157        switch (connect_msg.type) {
158                case TRACE_RT_DENY_CONN:
159                        /* Connection was denied */
160                       
161                        if (recv(RT_INFO->input_fd, (void*)&deny_hdr, 
162                                                sizeof(rt_deny_conn_t),
163                                                0) != sizeof(rt_deny_conn_t)) {
164                                reason = 0;
165                        }       
166                        reason = deny_hdr.reason;
167                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
168                                "Connection attempt is denied: %s",
169                                rt_deny_reason(reason));       
170                        return -1;
171                case TRACE_RT_HELLO:
172                        /* Hello message - read the options sent to us by the
173                         * server */
174                        if (recv(RT_INFO->input_fd, (void*)&hello_opts, 
175                                                sizeof(rt_hello_t), 0)
176                                        != sizeof(rt_hello_t)) {
177                                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
178                                        "Failed to receive TRACE_RT_HELLO options");
179                                return -1;
180                        }
181                        RT_INFO->reliable = hello_opts.reliable;
182                       
183                        return 0;
184                default:
185                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
186                                        "Unknown message type received: %d",
187                                        connect_msg.type);
188                        return -1;
189        }
190        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
191                        "Somehow you managed to reach this unreachable code");
192        return -1;
193}
194
195static void rt_init_format_data(libtrace_t *libtrace) {
196        libtrace->format_data = malloc(sizeof(struct rt_format_data_t));
197
198        RT_INFO->dummy_duck = NULL;
199        RT_INFO->dummy_erf = NULL;
200        RT_INFO->dummy_pcap = NULL;
201        RT_INFO->dummy_linux = NULL;
202        RT_INFO->dummy_ring = NULL;
203        RT_INFO->dummy_bpf = NULL;
204        RT_INFO->pkt_buffer = NULL;
205        RT_INFO->buf_current = NULL;
206        RT_INFO->buf_filled = 0;
207        RT_INFO->hostname = NULL;
208        RT_INFO->port = 0;
209}
210
211static int rt_init_input(libtrace_t *libtrace) {
212        char *scan;
213        char *uridata = libtrace->uridata;
214
215        rt_init_format_data(libtrace);
216
217        /* If the user specifies "rt:" then assume localhost and the default
218         * port */     
219        if (strlen(uridata) == 0) {
220                RT_INFO->hostname =
221                        strdup("localhost");
222                RT_INFO->port =
223                        COLLECTOR_PORT;
224        } else {
225                /* If the user does not specify a port, assume the default
226                 * port */
227                if ((scan = strchr(uridata,':')) == NULL) {
228                        RT_INFO->hostname =
229                                strdup(uridata);
230                        RT_INFO->port =
231                                COLLECTOR_PORT;
232                } else {
233                        RT_INFO->hostname =
234                                (char *)strndup(uridata,
235                                                (size_t)(scan - uridata));
236                        RT_INFO->port =
237                                atoi(++scan);
238                }
239        }
240
241        return 0;
242}
243       
244static int rt_start_input(libtrace_t *libtrace) {
245        rt_header_t start_msg;
246
247        start_msg.type = TRACE_RT_START;
248        start_msg.length = 0; 
249
250        if (rt_connect(libtrace) == -1)
251                return -1;
252       
253        /* Need to send start message to server */
254        if (send(RT_INFO->input_fd, (void*)&start_msg, sizeof(rt_header_t) +
255                                start_msg.length, 0) != sizeof(rt_header_t)) {
256                printf("Failed to send start message to server\n");
257                return -1;
258        }
259        RT_INFO->rt_hdr.type = TRACE_RT_LAST;
260
261        return 0;
262}
263
264static int rt_pause_input(libtrace_t *libtrace) {
265        rt_header_t close_msg;
266
267        close_msg.type = TRACE_RT_CLOSE;
268        close_msg.length = 0; 
269       
270        /* Send a close message to the server */
271        if (send(RT_INFO->input_fd, (void*)&close_msg, sizeof(rt_header_t) + 
272                                close_msg.length, 0) != (int)sizeof(rt_header_t)
273                                + close_msg.length) {
274                printf("Failed to send close message to server\n");
275       
276        }
277
278        close(RT_INFO->input_fd);
279        return 0;
280}
281
282static int rt_fin_input(libtrace_t *libtrace) {
283        /* Make sure we clean up any dummy traces that we have been using */
284       
285        if (RT_INFO->dummy_duck)
286                trace_destroy_dead(RT_INFO->dummy_duck);
287
288        if (RT_INFO->dummy_erf) 
289                trace_destroy_dead(RT_INFO->dummy_erf);
290               
291        if (RT_INFO->dummy_pcap)
292                trace_destroy_dead(RT_INFO->dummy_pcap);
293
294        if (RT_INFO->dummy_linux)
295                trace_destroy_dead(RT_INFO->dummy_linux);
296       
297        if (RT_INFO->dummy_ring)
298                trace_destroy_dead(RT_INFO->dummy_ring);
299
300        if (RT_INFO->dummy_bpf)
301                trace_destroy_dead(RT_INFO->dummy_bpf);
302        free(libtrace->format_data);
303        return 0;
304}
305
306
307/* I've upped this to 10K to deal with jumbo-grams that have not been snapped
308 * in any way. This means we have a much larger memory overhead per packet
309 * (which won't be used in the vast majority of cases), so we may want to think
310 * about doing something smarter, e.g. allocate a smaller block of memory and
311 * only increase it as required.
312 *
313 * XXX Capturing off int: can still lead to packets that are larger than 10K,
314 * in instances where the fragmentation is done magically by the NIC. This
315 * is pretty nasty, but also very rare.
316 */
317#define RT_BUF_SIZE (LIBTRACE_PACKET_BUFSIZE * 2)
318
319/* Receives data from an RT server */
320static int rt_read(libtrace_t *libtrace, void **buffer, size_t len, int block) 
321{
322        int numbytes;
323       
324        assert(len <= RT_BUF_SIZE);
325       
326        if (!RT_INFO->pkt_buffer) {
327                RT_INFO->pkt_buffer = (char*)malloc((size_t)RT_BUF_SIZE);
328                RT_INFO->buf_current = RT_INFO->pkt_buffer;
329                RT_INFO->buf_filled = 0;
330        }
331
332#ifndef MSG_DONTWAIT
333#define MSG_DONTWAIT 0
334#endif
335
336        if (block)
337                block=0;
338        else
339                block=MSG_DONTWAIT;
340
341        /* If we don't have enough buffer space for the amount we want to
342         * read, move the current buffer contents to the front of the buffer
343         * to make room */
344        if (len > RT_INFO->buf_filled) {
345                memcpy(RT_INFO->pkt_buffer, RT_INFO->buf_current, 
346                                RT_INFO->buf_filled);
347                RT_INFO->buf_current = RT_INFO->pkt_buffer;
348#ifndef MSG_NOSIGNAL
349#  define MSG_NOSIGNAL 0
350#endif
351                /* Loop as long as we don't have all the data that we were
352                 * asked for */
353                while (len > RT_INFO->buf_filled) {
354                        if ((numbytes = recv(RT_INFO->input_fd,
355                                                RT_INFO->buf_current + 
356                                                RT_INFO->buf_filled,
357                                                RT_BUF_SIZE-RT_INFO->buf_filled,
358                                                MSG_NOSIGNAL|block)) <= 0) {
359                                if (numbytes == 0) {
360                                        trace_set_err(libtrace, TRACE_ERR_RT_FAILURE, 
361                                                        "No data received");
362                                        return -1;
363                                }
364                               
365                                if (errno == EINTR) {
366                                        /* ignore EINTR in case
367                                         * a caller is using signals
368                                         */
369                                        continue;
370                                }
371                                if (errno == EAGAIN) {
372                                        /* We asked for non-blocking mode, so
373                                         * we need to return now */
374                                        trace_set_err(libtrace,
375                                                        EAGAIN,
376                                                        "EAGAIN");
377                                        return -1;
378                                }
379                               
380                                perror("recv");
381                                trace_set_err(libtrace, errno,
382                                                "Failed to read data into rt recv buffer");
383                                return -1;
384                        }
385                        RT_INFO->buf_filled+=numbytes;
386                }
387
388        }
389        *buffer = RT_INFO->buf_current;
390        RT_INFO->buf_current += len;
391        RT_INFO->buf_filled -= len;
392        return len;
393}
394
395
396/* Sets the trace format for the packet to match the format it was originally
397 * captured in, rather than the RT format */
398static int rt_set_format(libtrace_t *libtrace, libtrace_packet_t *packet) 
399{
400
401        /* We need to assign the packet to a "dead" trace */
402
403        /* Try to minimize the number of corrupt packets that slip through
404         * while making it easy to identify new pcap DLTs */
405        if (packet->type > TRACE_RT_DATA_DLT && 
406                        packet->type < TRACE_RT_DATA_DLT_END) {
407                if (!RT_INFO->dummy_pcap) {
408                        RT_INFO->dummy_pcap = trace_create_dead("pcap:-");
409                }
410                packet->trace = RT_INFO->dummy_pcap;
411                return 0;       
412        }
413
414        if (packet->type > TRACE_RT_DATA_BPF &&
415                        packet->type < TRACE_RT_DATA_BPF_END) {
416
417                if (!RT_INFO->dummy_bpf) {
418                        RT_INFO->dummy_bpf = trace_create_dead("bpf:-");
419                        /* This may fail on a non-BSD machine */
420                        if (trace_is_err(RT_INFO->dummy_bpf)) {
421                                trace_perror(RT_INFO->dummy_bpf, "Creating dead bpf trace");
422                                return -1;
423                        }
424                }
425                packet->trace = RT_INFO->dummy_bpf;
426                return 0;
427        }
428
429        switch (packet->type) {
430                case TRACE_RT_DUCK_2_4:
431                case TRACE_RT_DUCK_2_5:
432                case TRACE_RT_DUCK_5_0:
433                        if (!RT_INFO->dummy_duck) {
434                                RT_INFO->dummy_duck = trace_create_dead("duck:dummy");
435                        }
436                        packet->trace = RT_INFO->dummy_duck;
437                        break;
438                case TRACE_RT_DATA_ERF:
439                        if (!RT_INFO->dummy_erf) {
440                                RT_INFO->dummy_erf = trace_create_dead("erf:-");
441                        }
442                        packet->trace = RT_INFO->dummy_erf;
443                        break;
444                case TRACE_RT_DATA_LINUX_NATIVE:
445                        if (!RT_INFO->dummy_linux) {
446                                RT_INFO->dummy_linux = trace_create_dead("int:");
447                                /* This may fail on a non-Linux machine */
448                                if (trace_is_err(RT_INFO->dummy_linux)) {
449                                        trace_perror(RT_INFO->dummy_linux, "Creating dead int trace");
450                                        return -1;
451                                }
452                        }
453                        packet->trace = RT_INFO->dummy_linux;
454                        break;
455                case TRACE_RT_DATA_LINUX_RING:
456                        if (!RT_INFO->dummy_ring) {
457                                RT_INFO->dummy_ring = trace_create_dead("ring:");
458                                /* This may fail on a non-Linux machine */
459                                if (trace_is_err(RT_INFO->dummy_ring)) {
460                                        trace_perror(RT_INFO->dummy_ring, "Creating dead ring trace");
461                                        return -1;
462                                }
463                        }
464                        packet->trace = RT_INFO->dummy_ring;
465                        break;
466                case TRACE_RT_STATUS:
467                case TRACE_RT_METADATA:
468                        /* Just use the RT trace! */
469                        packet->trace = libtrace;
470                        break;
471                case TRACE_RT_DATA_LEGACY_ETH:
472                case TRACE_RT_DATA_LEGACY_ATM:
473                case TRACE_RT_DATA_LEGACY_POS:
474                        printf("Sending legacy over RT is currently not supported\n");
475                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, "Legacy packet cannot be sent over rt");
476                        return -1;
477                default:
478                        printf("Unrecognised format: %u\n", packet->type);
479                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, "Unrecognised packet format");
480                        return -1;
481        }
482        return 0; /* success */
483}               
484
485/* Sends an RT ACK to the server to acknowledge receipt of packets */
486static int rt_send_ack(libtrace_t *libtrace, 
487                uint32_t seqno)  {
488       
489        static char *ack_buffer = 0;
490        char *buf_ptr;
491        int numbytes = 0;
492        size_t to_write = 0;
493        rt_header_t *hdr;
494        rt_ack_t *ack_hdr;
495       
496        if (!ack_buffer) {
497                ack_buffer = (char*)malloc(sizeof(rt_header_t) 
498                                                        + sizeof(rt_ack_t));
499        }
500       
501        hdr = (rt_header_t *) ack_buffer;
502        ack_hdr = (rt_ack_t *) (ack_buffer + sizeof(rt_header_t));
503       
504        hdr->type = TRACE_RT_ACK;
505        hdr->length = sizeof(rt_ack_t);
506
507        ack_hdr->sequence = seqno;
508       
509        to_write = hdr->length + sizeof(rt_header_t);
510        buf_ptr = ack_buffer;
511
512        /* Keep trying until we write the entire ACK */
513        while (to_write > 0) {
514                numbytes = send(RT_INFO->input_fd, buf_ptr, to_write, 0); 
515                if (numbytes == -1) {
516                        if (errno == EINTR || errno == EAGAIN) {
517                                continue;
518                        }
519                        else {
520                                printf("Error sending ack\n");
521                                perror("send");
522                                trace_set_err(libtrace, TRACE_ERR_RT_FAILURE, 
523                                                "Error sending ack");
524                                return -1;
525                        }
526                }
527                to_write = to_write - numbytes;
528                buf_ptr = buf_ptr + to_write;
529               
530        }
531
532        return 1;
533}
534
535/* Shouldn't need to call this too often */
536static int rt_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
537                void *buffer, libtrace_rt_types_t rt_type, uint32_t flags) {
538
539        if (packet->buffer != buffer &&
540                        packet->buf_control == TRACE_CTRL_PACKET) {
541                free(packet->buffer);
542        }
543
544        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
545                packet->buf_control = TRACE_CTRL_PACKET;
546        } else
547                packet->buf_control = TRACE_CTRL_EXTERNAL;
548
549
550        packet->buffer = buffer;
551        packet->header = NULL;
552        packet->type = rt_type;
553        packet->payload = buffer;
554
555        if (libtrace->format_data == NULL) {
556                rt_init_format_data(libtrace);
557        }
558
559        return 0;
560}       
561
562/* Reads the body of an RT packet from the network */
563static int rt_read_data_packet(libtrace_t *libtrace,
564                libtrace_packet_t *packet, int blocking) {
565        uint32_t prep_flags = 0;
566
567        prep_flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
568
569        /* The stored RT header will tell us how much data we need to read */
570        if (rt_read(libtrace, &packet->buffer, (size_t)RT_INFO->rt_hdr.length, 
571                                blocking) != RT_INFO->rt_hdr.length) {
572                return -1;
573        }
574
575        /* Send an ACK if required */
576        if (RT_INFO->reliable > 0 && packet->type >= TRACE_RT_DATA_SIMPLE) {
577                if (rt_send_ack(libtrace, RT_INFO->rt_hdr.sequence) == -1)
578                                return -1;
579        }
580       
581        /* Convert to the original capture format */
582        if (rt_set_format(libtrace, packet) < 0) {
583                return -1;
584        }
585               
586        /* Update payload pointers and packet type to match the original
587         * format */
588        if (trace_prepare_packet(packet->trace, packet, packet->buffer,
589                                packet->type, prep_flags)) {
590                return -1;
591        }
592
593        return 0;
594}
595
596/* Reads an RT packet from the network. Will block if the "blocking" flag is
597 * set to 1, otherwise will return if insufficient data is available */
598static int rt_read_packet_versatile(libtrace_t *libtrace,
599                libtrace_packet_t *packet,int blocking) {
600        rt_header_t *pkt_hdr = NULL;
601        void *void_hdr;
602        libtrace_rt_types_t switch_type;
603       
604        if (packet->buf_control == TRACE_CTRL_PACKET) {
605                packet->buf_control = TRACE_CTRL_EXTERNAL;
606                free(packet->buffer);
607                packet->buffer = NULL;
608        }
609
610        /* RT_LAST indicates that we need to read the RT header for the next
611         * packet. This is a touch hax, I admit */
612        if (RT_INFO->rt_hdr.type == TRACE_RT_LAST) {
613                void_hdr = (void *)pkt_hdr;
614                /* FIXME: Better error handling required */
615                if (rt_read(libtrace, &void_hdr, 
616                                sizeof(rt_header_t),blocking) !=
617                                sizeof(rt_header_t)) {
618                        return -1;
619                }
620                pkt_hdr = (rt_header_t *)void_hdr;
621               
622                /* Need to store these in case the next rt_read overwrites
623                 * the buffer they came from! */
624                RT_INFO->rt_hdr.type = pkt_hdr->type;
625                RT_INFO->rt_hdr.length = pkt_hdr->length;
626                RT_INFO->rt_hdr.sequence = pkt_hdr->sequence;
627        }
628        packet->type = RT_INFO->rt_hdr.type;
629       
630        /* All data-bearing packets (as opposed to RT internal messages)
631         * should be treated the same way when it comes to reading the rest
632         * of the packet */
633        if (packet->type >= TRACE_RT_DATA_SIMPLE) {
634                switch_type = TRACE_RT_DATA_SIMPLE;
635        } else {
636                switch_type = packet->type;
637        }
638
639        switch(switch_type) {
640                case TRACE_RT_DATA_SIMPLE:
641                case TRACE_RT_DUCK_2_4:
642                case TRACE_RT_DUCK_2_5:
643                case TRACE_RT_STATUS:
644                case TRACE_RT_METADATA:
645                        if (rt_read_data_packet(libtrace, packet, blocking))
646                                return -1;
647                        break;
648                case TRACE_RT_END_DATA:
649                case TRACE_RT_KEYCHANGE:
650                case TRACE_RT_LOSTCONN:
651                case TRACE_RT_CLIENTDROP:
652                case TRACE_RT_SERVERSTART:
653                        /* All these have no payload */
654                        break;
655                case TRACE_RT_PAUSE_ACK:
656                        /* XXX: Add support for this */
657                        break;
658                case TRACE_RT_OPTION:
659                        /* XXX: Add support for this */
660                        break;
661                default:
662                        printf("Bad rt type for client receipt: %d\n",
663                                        switch_type);
664                        return -1;
665        }
666                               
667                       
668               
669        /* Return the number of bytes read from the stream */
670        RT_INFO->rt_hdr.type = TRACE_RT_LAST;
671        return RT_INFO->rt_hdr.length + sizeof(rt_header_t);
672}
673
674/* Reads the next available packet in a blocking fashion */
675static int rt_read_packet(libtrace_t *libtrace,
676                libtrace_packet_t *packet) {
677        return rt_read_packet_versatile(libtrace,packet,1);
678}
679
680
681/* This should only get called for RT messages - RT-encapsulated data records
682 * should be converted to the appropriate capture format */
683static int rt_get_capture_length(const libtrace_packet_t *packet) {
684        rt_metadata_t *rt_md_hdr;
685        switch (packet->type) {
686                case TRACE_RT_STATUS:
687                        return sizeof(rt_status_t);
688                case TRACE_RT_HELLO:
689                        return sizeof(rt_hello_t);
690                case TRACE_RT_START:
691                        return 0;
692                case TRACE_RT_ACK:
693                        return sizeof(rt_ack_t);
694                case TRACE_RT_END_DATA:
695                        return 0;
696                case TRACE_RT_CLOSE:
697                        return 0;
698                case TRACE_RT_DENY_CONN:
699                        return sizeof(rt_deny_conn_t);
700                case TRACE_RT_PAUSE:
701                        return 0; 
702                case TRACE_RT_PAUSE_ACK:
703                        return 0;
704                case TRACE_RT_OPTION:
705                        return 0; /* FIXME */
706                case TRACE_RT_KEYCHANGE:
707                        return 0;
708                case TRACE_RT_LOSTCONN:
709                        return 0;
710                case TRACE_RT_SERVERSTART:
711                        return 0;
712                case TRACE_RT_CLIENTDROP:
713                        return 0;
714                case TRACE_RT_METADATA:
715                        /* This is a little trickier to work out */
716                        rt_md_hdr = (rt_metadata_t *)packet->buffer;
717                        return rt_md_hdr->label_len + rt_md_hdr->value_len + 
718                                sizeof(rt_metadata_t);
719                default:
720                        printf("Unknown type: %d\n", packet->type);
721                       
722        }
723        return 0;
724}
725
726/* RT messages do not have a wire length because they were not captured from
727 * the wire - they were generated by the capture process */
728static int rt_get_wire_length(UNUSED const libtrace_packet_t *packet) {
729        return 0;
730}
731
732/* Although RT messages do contain "framing", this framing is considered to be
733 * stripped as soon as the packet is read by the RT client */                   
734static int rt_get_framing_length(UNUSED const libtrace_packet_t *packet) {
735        return 0;
736}
737
738
739static libtrace_linktype_t rt_get_link_type(UNUSED const libtrace_packet_t *packet)
740{
741        /* RT messages don't have a link type */
742        return TRACE_TYPE_NONDATA;
743}
744
745static int rt_get_fd(const libtrace_t *trace) {
746        return ((struct rt_format_data_t *)trace->format_data)->input_fd;
747}
748
749static libtrace_eventobj_t trace_event_rt(libtrace_t *trace,
750                                        libtrace_packet_t *packet) 
751{
752        libtrace_eventobj_t event = {0,0,0.0,0};
753        libtrace_err_t read_err;
754
755        assert(trace);
756        assert(packet);
757       
758        if (trace->format->get_fd) {
759                event.fd = trace->format->get_fd(trace);
760        } else {
761                event.fd = 0;
762        }
763
764        do {
765
766                event.size = rt_read_packet_versatile(trace, packet, 0);
767                if (event.size == -1) {
768                        read_err = trace_get_err(trace);
769                        if (read_err.err_num == EAGAIN) {
770                                /* No data available - do an IOWAIT */
771                                event.type = TRACE_EVENT_IOWAIT;
772                        }
773                        else {
774                                trace_perror(trace, "Error doing a non-blocking read from rt");
775                                event.type = TRACE_EVENT_PACKET;
776                                break;
777                        }
778                } else if (event.size == 0) {
779                        /* RT gives us a specific indicator that there will be
780                         * no more packets. */
781                        if (packet->type == TRACE_RT_END_DATA)
782                                event.type = TRACE_EVENT_TERMINATE;
783                        else {
784                                /* Since several RT messages can have zero-byte
785                                 * length (once the framing is removed), an
786                                 * event size of zero can still indicate a
787                                 * PACKET event */
788                                event.type = TRACE_EVENT_PACKET;
789                                trace->accepted_packets ++;
790                        }
791
792                }       
793                else {
794                        event.type = TRACE_EVENT_PACKET;
795                        trace->accepted_packets ++;
796                }
797
798                if (trace->filter && event.type == TRACE_EVENT_PACKET) {
799                        if (!trace_apply_filter(trace->filter, packet)) {
800                                trace_clear_cache(packet);
801                                trace->filtered_packets ++;
802                                continue;
803                        }
804                }
805
806                break; 
807        } while (1);
808
809        return event;
810}
811
812static void rt_help(void) {
813        printf("rt format module\n");
814        printf("Supported input URIs:\n");
815        printf("\trt:hostname:port\n");
816        printf("\trt:hostname (connects on default port)\n");
817        printf("\n");
818        printf("\te.g.: rt:localhost\n");
819        printf("\te.g.: rt:localhost:32500\n");
820        printf("\n");
821
822}
823
824
825static struct libtrace_format_t rt = {
826        "rt",
827        "$Id$",
828        TRACE_FORMAT_RT,
829        NULL,                           /* probe filename */
830        NULL,                           /* probe magic */
831        rt_init_input,                  /* init_input */
832        NULL,                           /* config_input */
833        rt_start_input,                 /* start_input */
834        rt_pause_input,                 /* pause */
835        NULL,                           /* init_output */
836        NULL,                           /* config_output */
837        NULL,                           /* start_output */
838        rt_fin_input,                   /* fin_input */
839        NULL,                           /* fin_output */
840        rt_read_packet,                 /* read_packet */
841        rt_prepare_packet,              /* prepare_packet */
842        NULL,                           /* fin_packet */
843        NULL,                           /* write_packet */
844        rt_get_link_type,               /* get_link_type */
845        NULL,                           /* get_direction */
846        NULL,                           /* set_direction */
847        NULL,                           /* get_erf_timestamp */
848        NULL,                           /* get_timeval */
849        NULL,                           /* get_timespec */
850        NULL,                           /* get_seconds */
851        NULL,                           /* seek_erf */
852        NULL,                           /* seek_timeval */
853        NULL,                           /* seek_seconds */
854        rt_get_capture_length,          /* get_capture_length */
855        rt_get_wire_length,                     /* get_wire_length */
856        rt_get_framing_length,          /* get_framing_length */
857        NULL,                           /* set_capture_length */
858        NULL,                           /* get_received_packets */
859        NULL,                           /* get_filtered_packets */
860        NULL,                           /* get_dropped_packets */
861        NULL,                           /* get_statistics */
862        rt_get_fd,                      /* get_fd */
863        trace_event_rt,             /* trace_event */
864        rt_help,                        /* help */
865        NULL,                   /* next pointer */
866        NON_PARALLEL(true) /* This is normally live */
867};
868
869void rt_constructor(void) {
870        register_format(&rt);
871}
Note: See TracBrowser for help on using the repository browser.