source: lib/format_rt.c @ d391ce0

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since d391ce0 was d391ce0, checked in by Shane Alcock <salcock@…>, 6 years ago

Updated format_rt to use new bucket structure

This should save us from having to memcpy every packet from the read
buffer into the packet buffer.

Added an internalid and srcbucket reference to the libtrace_packet_t
structure, so that trace_fin_packet() can release packets from any buckets
that they are owned by.

Fix race condition on trace->last_packet.

  • Property mode set to 100644
File size: 25.9 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Authors: Daniel Lawson
8 *          Perry Lorier
9 *          Shane Alcock
10 *         
11 * All rights reserved.
12 *
13 * This code has been developed by the University of Waikato WAND
14 * research group. For further information please see http://www.wand.net.nz/
15 *
16 * libtrace is free software; you can redistribute it and/or modify
17 * it under the terms of the GNU General Public License as published by
18 * the Free Software Foundation; either version 2 of the License, or
19 * (at your option) any later version.
20 *
21 * libtrace is distributed in the hope that it will be useful,
22 * but WITHOUT ANY WARRANTY; without even the implied warranty of
23 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24 * GNU General Public License for more details.
25 *
26 * You should have received a copy of the GNU General Public License
27 * along with libtrace; if not, write to the Free Software
28 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
29 *
30 * $Id$
31 *
32 */
33
34
35#define _GNU_SOURCE
36
37#include "config.h"
38#include "common.h"
39#include "libtrace.h"
40#include "libtrace_int.h"
41#include "format_helper.h"
42#include "rt_protocol.h"
43
44#include "data-struct/buckets.h"
45
46#include <sys/stat.h>
47#include <assert.h>
48#include <errno.h>
49#include <fcntl.h>
50#include <stdio.h>
51#include <string.h>
52#include <stdlib.h>
53#include <unistd.h>
54
55#ifndef WIN32
56# include <netdb.h>
57#endif
58
59#define RT_INFO ((struct rt_format_data_t*)libtrace->format_data)
60
61/* Convert the RT denial code into a nice printable and coherent string */
62static const char *rt_deny_reason(enum rt_conn_denied_t reason) 
63{
64        const char *string = 0;
65
66        switch(reason) {
67                case RT_DENY_WRAPPER:
68                        string = "Rejected by TCP Wrappers";
69                        break;
70                case RT_DENY_FULL:
71                        string = "Max connections reached on server";
72                        break;
73                case RT_DENY_AUTH:
74                        string = "Authentication failed";
75                        break;
76                default:
77                        string = "Unknown reason";
78        }
79
80        return string;
81}
82
83
84struct rt_format_data_t {
85        /* Name of the host to connect to */
86        char *hostname;
87        /* Buffer to store received packets into */
88        char *pkt_buffer;
89        /* Pointer to the next packet to be read from the buffer */
90        char *buf_read;
91        /* Pointer to the next unused byte in the buffer */
92        char *buf_write;
93        /* The port to connect to */
94        int port;
95        /* The file descriptor for the RT connection */
96        int input_fd;
97        /* Flag indicating whether the server is doing reliable RT */
98        int reliable;
99
100        int unacked;
101       
102        /* Dummy traces that can be assigned to the received packets to ensure
103         * that the appropriate functions can be used to process them */
104        libtrace_t *dummy_duck;
105        libtrace_t *dummy_erf;
106        libtrace_t *dummy_pcap;
107        libtrace_t *dummy_linux;
108        libtrace_t *dummy_ring;
109        libtrace_t *dummy_bpf;
110
111        /* Bucket structure for storing read packets until the user is
112         * done with them. */
113        libtrace_bucket_t *bucket;
114};
115
116/* Connects to an RT server
117 *
118 * Returns -1 if an error occurs
119 */
120static int rt_connect(libtrace_t *libtrace) {
121        struct hostent *he;
122        struct sockaddr_in remote;
123        rt_header_t connect_msg;
124        rt_deny_conn_t deny_hdr;       
125        rt_hello_t hello_opts;
126        uint8_t reason;
127       
128        if ((he=gethostbyname(RT_INFO->hostname)) == NULL) {
129                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
130                                "Failed to convert hostname %s to address",
131                                RT_INFO->hostname);
132                return -1;
133        }
134        if ((RT_INFO->input_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
135                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
136                                "Could not create socket");
137                return -1;
138        }
139
140        memset(&remote,0, sizeof(remote));
141        remote.sin_family = AF_INET;
142        remote.sin_port = htons(RT_INFO->port);
143        remote.sin_addr = *((struct in_addr *)he->h_addr);
144
145        if (connect(RT_INFO->input_fd, (struct sockaddr *)&remote,
146                                (socklen_t)sizeof(struct sockaddr)) == -1) {
147                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
148                                "Could not connect to host %s on port %d",
149                                RT_INFO->hostname, RT_INFO->port);
150                return -1;
151        }
152
153        /* We are connected, now receive message from server */
154       
155        if (recv(RT_INFO->input_fd, (void*)&connect_msg, sizeof(rt_header_t), 0) != sizeof(rt_header_t) ) {
156                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
157                                "Could not receive connection message from %s",
158                                RT_INFO->hostname);
159                return -1;
160        }
161       
162        if (connect_msg.magic != LIBTRACE_RT_MAGIC) {
163                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
164                        "RT version mismatch: magic byte is incorrect");
165                return -1;
166        }
167
168        if (connect_msg.version != LIBTRACE_RT_VERSION) {
169                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
170                        "RT version mismatch: version is incorrect (expected %d, got %d",
171                        LIBTRACE_RT_VERSION, connect_msg.version);
172                return -1;
173        }
174
175               
176       
177        switch (ntohl(connect_msg.type)) {
178                case TRACE_RT_DENY_CONN:
179                        /* Connection was denied */
180                       
181                        if (recv(RT_INFO->input_fd, (void*)&deny_hdr, 
182                                                sizeof(rt_deny_conn_t),
183                                                0) != sizeof(rt_deny_conn_t)) {
184                                reason = 0;
185                        }       
186                        reason = ntohl(deny_hdr.reason);
187                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
188                                "Connection attempt is denied: %s",
189                                rt_deny_reason(reason));       
190                        return -1;
191                case TRACE_RT_HELLO:
192                        /* Hello message - read the options sent to us by the
193                         * server */
194                        if (recv(RT_INFO->input_fd, (void*)&hello_opts, 
195                                                sizeof(rt_hello_t), 0)
196                                        != sizeof(rt_hello_t)) {
197                                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
198                                        "Failed to receive TRACE_RT_HELLO options");
199                                return -1;
200                        }
201                       
202                       
203                        RT_INFO->reliable = hello_opts.reliable;
204                        RT_INFO->unacked = 0;
205                        return 0;
206                default:
207                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
208                                        "Unknown message type received: %d",
209                                        connect_msg.type);
210                        return -1;
211        }
212        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
213                        "Somehow you managed to reach this unreachable code");
214        return -1;
215}
216
217static void rt_init_format_data(libtrace_t *libtrace) {
218        libtrace->format_data = malloc(sizeof(struct rt_format_data_t));
219
220        RT_INFO->dummy_duck = NULL;
221        RT_INFO->dummy_erf = NULL;
222        RT_INFO->dummy_pcap = NULL;
223        RT_INFO->dummy_linux = NULL;
224        RT_INFO->dummy_ring = NULL;
225        RT_INFO->dummy_bpf = NULL;
226        RT_INFO->pkt_buffer = NULL;
227        RT_INFO->buf_read = NULL;
228        RT_INFO->buf_write = NULL;
229        RT_INFO->hostname = NULL;
230        RT_INFO->port = 0;
231        RT_INFO->unacked = 0;
232
233        RT_INFO->bucket = libtrace_bucket_init();
234}
235
236static int rt_init_input(libtrace_t *libtrace) {
237        char *scan;
238        char *uridata = libtrace->uridata;
239
240        rt_init_format_data(libtrace);
241
242        /* If the user specifies "rt:" then assume localhost and the default
243         * port */     
244        if (strlen(uridata) == 0) {
245                RT_INFO->hostname =
246                        strdup("localhost");
247                RT_INFO->port =
248                        COLLECTOR_PORT;
249        } else {
250                /* If the user does not specify a port, assume the default
251                 * port */
252                if ((scan = strchr(uridata,':')) == NULL) {
253                        RT_INFO->hostname =
254                                strdup(uridata);
255                        RT_INFO->port =
256                                COLLECTOR_PORT;
257                } else {
258                        RT_INFO->hostname =
259                                (char *)strndup(uridata,
260                                                (size_t)(scan - uridata));
261                        RT_INFO->port =
262                                atoi(++scan);
263                }
264        }
265
266        return 0;
267}
268       
269static int rt_start_input(libtrace_t *libtrace) {
270        rt_header_t start_msg;
271
272        start_msg.type = htonl(TRACE_RT_START);
273        start_msg.length = 0;
274        start_msg.sequence = 0;
275        start_msg.version = LIBTRACE_RT_VERSION;
276        start_msg.magic = LIBTRACE_RT_MAGIC; 
277
278        if (rt_connect(libtrace) == -1)
279                return -1;
280       
281        /* Need to send start message to server */
282        if (send(RT_INFO->input_fd, (void*)&start_msg, sizeof(rt_header_t) +
283                                start_msg.length, 0) != sizeof(rt_header_t)) {
284                printf("Failed to send start message to server\n");
285                return -1;
286        }
287
288        return 0;
289}
290
291static int rt_pause_input(libtrace_t *libtrace) {
292        rt_header_t close_msg;
293
294        close_msg.type = htonl(TRACE_RT_CLOSE);
295        close_msg.length = 0; 
296       
297        /* Send a close message to the server */
298        if (send(RT_INFO->input_fd, (void*)&close_msg, sizeof(rt_header_t) + 
299                                close_msg.length, 0) != (int)sizeof(rt_header_t)
300                                + close_msg.length) {
301                printf("Failed to send close message to server\n");
302       
303        }
304
305        close(RT_INFO->input_fd);
306        return 0;
307}
308
309static int rt_fin_input(libtrace_t *libtrace) {
310        /* Make sure we clean up any dummy traces that we have been using */
311
312        if (RT_INFO->dummy_duck)
313                trace_destroy_dead(RT_INFO->dummy_duck);
314
315        if (RT_INFO->dummy_erf)
316                trace_destroy_dead(RT_INFO->dummy_erf);
317
318        if (RT_INFO->dummy_pcap)
319                trace_destroy_dead(RT_INFO->dummy_pcap);
320
321        if (RT_INFO->dummy_linux)
322                trace_destroy_dead(RT_INFO->dummy_linux);
323
324        if (RT_INFO->dummy_ring)
325                trace_destroy_dead(RT_INFO->dummy_ring);
326
327        if (RT_INFO->dummy_bpf)
328                trace_destroy_dead(RT_INFO->dummy_bpf);
329
330        if (RT_INFO->bucket)
331                libtrace_bucket_destroy(RT_INFO->bucket);
332        free(libtrace->format_data);
333        return 0;
334}
335
336/* Sends an RT ACK to the server to acknowledge receipt of packets */
337static int rt_send_ack(libtrace_t *libtrace, 
338                uint32_t seqno)  {
339       
340        static char *ack_buffer = 0;
341        char *buf_ptr;
342        int numbytes = 0;
343        size_t to_write = 0;
344        rt_header_t *hdr;
345        rt_ack_t *ack_hdr;
346       
347        if (!ack_buffer) {
348                ack_buffer = (char*)malloc(sizeof(rt_header_t) 
349                                                        + sizeof(rt_ack_t));
350        }
351       
352        hdr = (rt_header_t *) ack_buffer;
353        ack_hdr = (rt_ack_t *) (ack_buffer + sizeof(rt_header_t));
354       
355        hdr->type = htonl(TRACE_RT_ACK);
356        hdr->length = htons(sizeof(rt_ack_t));
357
358        ack_hdr->sequence = htonl(seqno);
359       
360        to_write = sizeof(rt_ack_t) + sizeof(rt_header_t);
361        buf_ptr = ack_buffer;
362
363        /* Keep trying until we write the entire ACK */
364        while (to_write > 0) {
365                numbytes = send(RT_INFO->input_fd, buf_ptr, to_write, 0); 
366                if (numbytes == -1) {
367                        if (errno == EINTR || errno == EAGAIN) {
368                                continue;
369                        }
370                        else {
371                                printf("Error sending ack\n");
372                                perror("send");
373                                trace_set_err(libtrace, TRACE_ERR_RT_FAILURE, 
374                                                "Error sending ack");
375                                return -1;
376                        }
377                }
378                to_write = to_write - numbytes;
379                buf_ptr = buf_ptr + to_write;
380               
381        }
382
383        return 1;
384}
385
386/* Sets the trace format for the packet to match the format it was originally
387 * captured in, rather than the RT format */
388static int rt_set_format(libtrace_t *libtrace, libtrace_packet_t *packet) 
389{
390
391        /* We need to assign the packet to a "dead" trace */
392
393        /* Try to minimize the number of corrupt packets that slip through
394         * while making it easy to identify new pcap DLTs */
395        if (packet->type > TRACE_RT_DATA_DLT && 
396                        packet->type < TRACE_RT_DATA_DLT_END) {
397                if (!RT_INFO->dummy_pcap) {
398                        RT_INFO->dummy_pcap = trace_create_dead("pcap:-");
399                }
400                packet->trace = RT_INFO->dummy_pcap;
401                return 0;       
402        }
403
404        if (packet->type > TRACE_RT_DATA_BPF &&
405                        packet->type < TRACE_RT_DATA_BPF_END) {
406
407                if (!RT_INFO->dummy_bpf) {
408                        RT_INFO->dummy_bpf = trace_create_dead("bpf:-");
409                        /* This may fail on a non-BSD machine */
410                        if (trace_is_err(RT_INFO->dummy_bpf)) {
411                                trace_perror(RT_INFO->dummy_bpf, "Creating dead bpf trace");
412                                return -1;
413                        }
414                }
415                packet->trace = RT_INFO->dummy_bpf;
416                return 0;
417        }
418
419        switch (packet->type) {
420                case TRACE_RT_DUCK_2_4:
421                case TRACE_RT_DUCK_2_5:
422                case TRACE_RT_DUCK_5_0:
423                        if (!RT_INFO->dummy_duck) {
424                                RT_INFO->dummy_duck = trace_create_dead("duck:dummy");
425                        }
426                        packet->trace = RT_INFO->dummy_duck;
427                        break;
428                case TRACE_RT_DATA_ERF:
429                        if (!RT_INFO->dummy_erf) {
430                                RT_INFO->dummy_erf = trace_create_dead("erf:-");
431                        }
432                        packet->trace = RT_INFO->dummy_erf;
433                        break;
434                case TRACE_RT_DATA_LINUX_NATIVE:
435                        if (!RT_INFO->dummy_linux) {
436                                RT_INFO->dummy_linux = trace_create_dead("int:");
437                                /* This may fail on a non-Linux machine */
438                                if (trace_is_err(RT_INFO->dummy_linux)) {
439                                        trace_perror(RT_INFO->dummy_linux, "Creating dead int trace");
440                                        return -1;
441                                }
442                        }
443                        packet->trace = RT_INFO->dummy_linux;
444                        break;
445                case TRACE_RT_DATA_LINUX_RING:
446                        if (!RT_INFO->dummy_ring) {
447                                RT_INFO->dummy_ring = trace_create_dead("ring:");
448                                /* This may fail on a non-Linux machine */
449                                if (trace_is_err(RT_INFO->dummy_ring)) {
450                                        trace_perror(RT_INFO->dummy_ring, "Creating dead ring trace");
451                                        return -1;
452                                }
453                        }
454                        packet->trace = RT_INFO->dummy_ring;
455                        break;
456                case TRACE_RT_STATUS:
457                case TRACE_RT_METADATA:
458                        /* Just use the RT trace! */
459                        packet->trace = libtrace;
460                        break;
461                case TRACE_RT_DATA_LEGACY_ETH:
462                case TRACE_RT_DATA_LEGACY_ATM:
463                case TRACE_RT_DATA_LEGACY_POS:
464                        printf("Sending legacy over RT is currently not supported\n");
465                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, "Legacy packet cannot be sent over rt");
466                        return -1;
467                default:
468                        printf("Unrecognised format: %u\n", packet->type);
469                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, "Unrecognised packet format");
470                        return -1;
471        }
472        return 0; /* success */
473}               
474
475
476/* I've upped this to 10K to deal with jumbo-grams that have not been snapped
477 * in any way. This means we have a much larger memory overhead per packet
478 * (which won't be used in the vast majority of cases), so we may want to think
479 * about doing something smarter, e.g. allocate a smaller block of memory and
480 * only increase it as required.
481 *
482 * XXX Capturing off int: can still lead to packets that are larger than 10K,
483 * in instances where the fragmentation is done magically by the NIC. This
484 * is pretty nasty, but also very rare.
485 */
486#define RT_BUF_SIZE (LIBTRACE_PACKET_BUFSIZE * 2)
487
488static int rt_process_data_packet(libtrace_t *libtrace,
489                libtrace_packet_t *packet) {
490
491        uint32_t prep_flags = TRACE_PREP_DO_NOT_OWN_BUFFER;
492        rt_header_t *hdr = (rt_header_t *)packet->header;
493
494        /* Send an ACK if required */
495        if (RT_INFO->reliable > 0 && packet->type >= TRACE_RT_DATA_SIMPLE) {
496                RT_INFO->unacked ++;
497                if (RT_INFO->unacked >= RT_ACK_FREQUENCY) {
498                        if (rt_send_ack(libtrace, hdr->sequence) == -1)
499                                return -1;
500                        RT_INFO->unacked = 0;
501                }
502        }
503
504        /* Convert to the original capture format */
505        if (rt_set_format(libtrace, packet) < 0) {
506                return -1;
507        }
508
509        /* Update payload pointers and packet type to match the original
510         * format */
511        if (trace_prepare_packet(packet->trace, packet, packet->payload,
512                                packet->type, prep_flags)) {
513                return -1;
514        }
515
516        return 1;
517
518}
519
520/* Receives data from an RT server */
521static int rt_read(libtrace_t *libtrace, int block) {
522        int numbytes;
523
524        if (!RT_INFO->pkt_buffer) {
525                RT_INFO->pkt_buffer = (char*)malloc((size_t)RT_BUF_SIZE);
526                RT_INFO->buf_write = RT_INFO->pkt_buffer;
527                RT_INFO->buf_read = RT_INFO->pkt_buffer;
528                libtrace_create_new_bucket(RT_INFO->bucket, RT_INFO->pkt_buffer);
529        }
530
531#ifndef MSG_DONTWAIT
532#define MSG_DONTWAIT 0
533#endif
534#ifndef MSG_NOSIGNAL
535#  define MSG_NOSIGNAL 0
536#endif
537
538        if (block)
539                block=0;
540        else
541                block=MSG_DONTWAIT;
542
543        /* If the current buffer has plenty of space left, we can continue to
544         * read into it, otherwise create a new buffer and move anything in
545         * the old buffer over to it */
546        if (RT_INFO->buf_write - RT_INFO->pkt_buffer > RT_BUF_SIZE / 2) {
547                char *newbuf = (char*)malloc((size_t)RT_BUF_SIZE);
548
549                memcpy(newbuf, RT_INFO->buf_read, RT_INFO->buf_write - RT_INFO->buf_read);
550                RT_INFO->buf_write = newbuf + (RT_INFO->buf_write - RT_INFO->buf_read);
551                RT_INFO->buf_read = newbuf;
552                RT_INFO->pkt_buffer = newbuf;
553                libtrace_create_new_bucket(RT_INFO->bucket, newbuf);
554
555        }
556
557        /* Attempt to fill the buffer as much as we can */
558        if ((numbytes = recv(RT_INFO->input_fd, RT_INFO->buf_write,
559                        RT_BUF_SIZE - (RT_INFO->buf_write-RT_INFO->pkt_buffer),
560                        MSG_NOSIGNAL | block)) <= 0) {
561
562                if (numbytes == 0) {
563                        trace_set_err(libtrace, TRACE_ERR_RT_FAILURE,
564                                        "No data received by RT client");
565                        return -1;
566                }
567
568                if (errno == EINTR) {
569                        /* Ignore EINTR in case a caller is using signals */
570                        return 0;
571                }
572
573                if (errno == EAGAIN) {
574                        /* No data available and we are non-blocking */
575                        trace_set_err(libtrace, EAGAIN, "EAGAIN");
576                        return -1;
577                }
578
579                trace_set_err(libtrace, TRACE_ERR_RT_FAILURE,
580                                "Error reading from RT socket: %s",
581                                strerror(errno));
582                return -1;
583        }
584
585        RT_INFO->buf_write += numbytes;
586        return (RT_INFO->buf_write - RT_INFO->buf_read);
587
588}
589
590
591static int rt_get_next_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
592                int block) {
593
594        rt_header_t *rthdr;
595
596        if (packet->buffer && packet->buf_control == TRACE_CTRL_PACKET)
597                free(packet->buffer);
598
599        while (RT_INFO->buf_write - RT_INFO->buf_read <
600                                (uint32_t)sizeof(rt_header_t)) {
601                if (rt_read(libtrace, block) == -1)
602                        return -1;
603        }
604
605        rthdr = (rt_header_t *)RT_INFO->buf_read;
606
607        /* Check if we have enough payload */
608        while (RT_INFO->buf_write - (RT_INFO->buf_read + sizeof(rt_header_t))
609                        < ntohs(rthdr->length)) {
610                if (rt_read(libtrace, block) == -1)
611                        return -1;
612                rthdr = (rt_header_t *)RT_INFO->buf_read;
613        }
614
615
616        packet->buffer = RT_INFO->buf_read;
617        packet->header = RT_INFO->buf_read;
618        packet->type = ntohl(((rt_header_t *)packet->header)->type);
619        packet->payload = RT_INFO->buf_read + sizeof(rt_header_t);
620        packet->internalid = libtrace_push_into_bucket(RT_INFO->bucket);
621        assert(packet->internalid != 0);
622        packet->srcbucket = RT_INFO->bucket;
623        packet->buf_control = TRACE_CTRL_EXTERNAL;
624
625        RT_INFO->buf_read += ntohs(rthdr->length) + sizeof(rt_header_t);
626
627        if (packet->type >= TRACE_RT_DATA_SIMPLE) {
628                rt_process_data_packet(libtrace, packet);
629        } else {
630                switch(packet->type) {
631                        case TRACE_RT_DUCK_2_4:
632                        case TRACE_RT_DUCK_2_5:
633                        case TRACE_RT_STATUS:
634                        case TRACE_RT_METADATA:
635                                if (rt_process_data_packet(libtrace, packet) < 0)
636                                        return -1;
637                                break;
638                        case TRACE_RT_END_DATA:
639                        case TRACE_RT_KEYCHANGE:
640                        case TRACE_RT_LOSTCONN:
641                        case TRACE_RT_CLIENTDROP:
642                        case TRACE_RT_SERVERSTART:
643                                break;
644                        case TRACE_RT_PAUSE_ACK:
645                        case TRACE_RT_OPTION:
646                                break;
647                        default:
648                                fprintf(stderr, "Bad RT type for client: %d\n",
649                                                packet->type);
650                                return -1;
651                }
652        }
653
654        return ntohs(rthdr->length);
655
656}
657
658/* Shouldn't need to call this too often */
659static int rt_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
660                void *buffer, libtrace_rt_types_t rt_type, uint32_t flags) {
661
662        if (packet->buffer != buffer &&
663                        packet->buf_control == TRACE_CTRL_PACKET) {
664                free(packet->buffer);
665        }
666
667        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
668                packet->buf_control = TRACE_CTRL_PACKET;
669        } else
670                packet->buf_control = TRACE_CTRL_EXTERNAL;
671
672
673        packet->buffer = buffer;
674        packet->header = NULL;
675        packet->type = rt_type;
676        packet->payload = buffer;
677
678        if (libtrace->format_data == NULL) {
679                rt_init_format_data(libtrace);
680        }
681
682        return 0;
683}       
684
685/* Reads the next available packet in a blocking fashion */
686static int rt_read_packet(libtrace_t *libtrace,
687                libtrace_packet_t *packet) {
688        return rt_get_next_packet(libtrace,packet,1);
689}
690
691
692/* This should only get called for RT messages - RT-encapsulated data records
693 * should be converted to the appropriate capture format */
694static int rt_get_capture_length(const libtrace_packet_t *packet) {
695        rt_metadata_t *rt_md_hdr;
696        switch (packet->type) {
697                case TRACE_RT_STATUS:
698                        return sizeof(rt_status_t);
699                case TRACE_RT_HELLO:
700                        return sizeof(rt_hello_t);
701                case TRACE_RT_START:
702                        return 0;
703                case TRACE_RT_ACK:
704                        return sizeof(rt_ack_t);
705                case TRACE_RT_END_DATA:
706                        return 0;
707                case TRACE_RT_CLOSE:
708                        return 0;
709                case TRACE_RT_DENY_CONN:
710                        return sizeof(rt_deny_conn_t);
711                case TRACE_RT_PAUSE:
712                        return 0; 
713                case TRACE_RT_PAUSE_ACK:
714                        return 0;
715                case TRACE_RT_OPTION:
716                        return 0; /* FIXME */
717                case TRACE_RT_KEYCHANGE:
718                        return 0;
719                case TRACE_RT_LOSTCONN:
720                        return 0;
721                case TRACE_RT_SERVERSTART:
722                        return 0;
723                case TRACE_RT_CLIENTDROP:
724                        return 0;
725                case TRACE_RT_METADATA:
726                        /* This is a little trickier to work out */
727                        rt_md_hdr = (rt_metadata_t *)packet->buffer;
728                        return rt_md_hdr->label_len + rt_md_hdr->value_len + 
729                                sizeof(rt_metadata_t);
730                default:
731                        printf("Unknown type: %d\n", packet->type);
732                       
733        }
734        return 0;
735}
736
737/* RT messages do not have a wire length because they were not captured from
738 * the wire - they were generated by the capture process */
739static int rt_get_wire_length(UNUSED const libtrace_packet_t *packet) {
740        return 0;
741}
742
743/* Although RT messages do contain "framing", this framing is considered to be
744 * stripped as soon as the packet is read by the RT client */                   
745static int rt_get_framing_length(UNUSED const libtrace_packet_t *packet) {
746        return 0;
747}
748
749
750static libtrace_linktype_t rt_get_link_type(UNUSED const libtrace_packet_t *packet)
751{
752        /* RT messages don't have a link type */
753        return TRACE_TYPE_NONDATA;
754}
755
756static int rt_get_fd(const libtrace_t *trace) {
757        return ((struct rt_format_data_t *)trace->format_data)->input_fd;
758}
759
760static libtrace_eventobj_t trace_event_rt(libtrace_t *trace,
761                                        libtrace_packet_t *packet) 
762{
763        libtrace_eventobj_t event = {0,0,0.0,0};
764        libtrace_err_t read_err;
765
766        assert(trace);
767        assert(packet);
768       
769        if (trace->format->get_fd) {
770                event.fd = trace->format->get_fd(trace);
771        } else {
772                event.fd = 0;
773        }
774
775        do {
776
777                event.size = rt_get_next_packet(trace, packet, 0);
778                if (event.size == -1) {
779                        read_err = trace_get_err(trace);
780                        if (read_err.err_num == EAGAIN) {
781                                /* No data available - do an IOWAIT */
782                                event.type = TRACE_EVENT_IOWAIT;
783                        }
784                        else {
785                                trace_perror(trace, "Error doing a non-blocking read from rt");
786                                event.type = TRACE_EVENT_PACKET;
787                                break;
788                        }
789                } else if (event.size == 0) {
790                        /* RT gives us a specific indicator that there will be
791                         * no more packets. */
792                        if (packet->type == TRACE_RT_END_DATA)
793                                event.type = TRACE_EVENT_TERMINATE;
794                        else {
795                                /* Since several RT messages can have zero-byte
796                                 * length (once the framing is removed), an
797                                 * event size of zero can still indicate a
798                                 * PACKET event */
799                                event.type = TRACE_EVENT_PACKET;
800                                trace->accepted_packets ++;
801                        }
802
803                }       
804                else {
805                        event.type = TRACE_EVENT_PACKET;
806                        trace->accepted_packets ++;
807                }
808
809                if (trace->filter && event.type == TRACE_EVENT_PACKET) {
810                        if (!trace_apply_filter(trace->filter, packet)) {
811                                trace_clear_cache(packet);
812                                trace->filtered_packets ++;
813                                continue;
814                        }
815                }
816
817                break; 
818        } while (1);
819
820        return event;
821}
822
823static void rt_help(void) {
824        printf("rt format module\n");
825        printf("Supported input URIs:\n");
826        printf("\trt:hostname:port\n");
827        printf("\trt:hostname (connects on default port)\n");
828        printf("\n");
829        printf("\te.g.: rt:localhost\n");
830        printf("\te.g.: rt:localhost:32500\n");
831        printf("\n");
832
833}
834
835
836static struct libtrace_format_t rt = {
837        "rt",
838        "$Id$",
839        TRACE_FORMAT_RT,
840        NULL,                           /* probe filename */
841        NULL,                           /* probe magic */
842        rt_init_input,                  /* init_input */
843        NULL,                           /* config_input */
844        rt_start_input,                 /* start_input */
845        rt_pause_input,                 /* pause */
846        NULL,                           /* init_output */
847        NULL,                           /* config_output */
848        NULL,                           /* start_output */
849        rt_fin_input,                   /* fin_input */
850        NULL,                           /* fin_output */
851        rt_read_packet,                 /* read_packet */
852        rt_prepare_packet,              /* prepare_packet */
853        NULL,                           /* fin_packet */
854        NULL,                           /* write_packet */
855        rt_get_link_type,               /* get_link_type */
856        NULL,                           /* get_direction */
857        NULL,                           /* set_direction */
858        NULL,                           /* get_erf_timestamp */
859        NULL,                           /* get_timeval */
860        NULL,                           /* get_timespec */
861        NULL,                           /* get_seconds */
862        NULL,                           /* seek_erf */
863        NULL,                           /* seek_timeval */
864        NULL,                           /* seek_seconds */
865        rt_get_capture_length,          /* get_capture_length */
866        rt_get_wire_length,                     /* get_wire_length */
867        rt_get_framing_length,          /* get_framing_length */
868        NULL,                           /* set_capture_length */
869        NULL,                           /* get_received_packets */
870        NULL,                           /* get_filtered_packets */
871        NULL,                           /* get_dropped_packets */
872        NULL,                           /* get_statistics */
873        rt_get_fd,                      /* get_fd */
874        trace_event_rt,             /* trace_event */
875        rt_help,                        /* help */
876        NULL,                   /* next pointer */
877        NON_PARALLEL(true) /* This is normally live */
878};
879
880void rt_constructor(void) {
881        register_format(&rt);
882}
Note: See TracBrowser for help on using the repository browser.