source: lib/format_rt.c @ 32ee9b2

cachetimestampsdeveloprc-4.0.4ringdecrementfixringperformance
Last change on this file since 32ee9b2 was 32ee9b2, checked in by Shane Alcock <salcock@…>, 2 years ago

Add new trace_flush_output() to public API

Can be used to force a libtrace output to dump any buffered output
to disk immediately.

Note that if the file is compressed or the output trace format
requires a trailer, the flushed file will still not be properly
readable afterwards as this will not result in any trailers
being written. You'll still have to close the file for that.

Mainly this is useful for ensuring that output file sizes grow
over time in situations where the amount of output is relatively
small, rather than staying stuck at 0 bytes until we either reach
1MB of output or the file is closed. For instance, you could have
a timer that calls trace_flush_output() every 30 seconds so that
the output file size will grow if any packets were written in the
last 30 seconds.

  • Property mode set to 100644
File size: 25.8 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27#define _GNU_SOURCE
28
29#include "config.h"
30#include "common.h"
31#include "libtrace.h"
32#include "libtrace_int.h"
33#include "format_helper.h"
34#include "rt_protocol.h"
35
36#include "data-struct/buckets.h"
37
38#include <sys/stat.h>
39#include <assert.h>
40#include <errno.h>
41#include <fcntl.h>
42#include <stdio.h>
43#include <string.h>
44#include <stdlib.h>
45#include <unistd.h>
46
47#ifndef WIN32
48# include <netdb.h>
49#endif
50
51#define RT_INFO ((struct rt_format_data_t*)libtrace->format_data)
52
53/* Convert the RT denial code into a nice printable and coherent string */
54static const char *rt_deny_reason(enum rt_conn_denied_t reason) 
55{
56        const char *string = 0;
57
58        switch(reason) {
59                case RT_DENY_WRAPPER:
60                        string = "Rejected by TCP Wrappers";
61                        break;
62                case RT_DENY_FULL:
63                        string = "Max connections reached on server";
64                        break;
65                case RT_DENY_AUTH:
66                        string = "Authentication failed";
67                        break;
68                default:
69                        string = "Unknown reason";
70        }
71
72        return string;
73}
74
75
76struct rt_format_data_t {
77        /* Name of the host to connect to */
78        char *hostname;
79        /* Buffer to store received packets into */
80        char *pkt_buffer;
81        /* Pointer to the next packet to be read from the buffer */
82        char *buf_read;
83        /* Pointer to the next unused byte in the buffer */
84        char *buf_write;
85        /* The port to connect to */
86        int port;
87        /* The file descriptor for the RT connection */
88        int input_fd;
89        /* Flag indicating whether the server is doing reliable RT */
90        int reliable;
91
92        int unacked;
93       
94        /* Dummy traces that can be assigned to the received packets to ensure
95         * that the appropriate functions can be used to process them */
96        libtrace_t *dummy_duck;
97        libtrace_t *dummy_erf;
98        libtrace_t *dummy_pcap;
99        libtrace_t *dummy_linux;
100        libtrace_t *dummy_ring;
101        libtrace_t *dummy_bpf;
102
103        /* Bucket structure for storing read packets until the user is
104         * done with them. */
105        libtrace_bucket_t *bucket;
106};
107
108/* Connects to an RT server
109 *
110 * Returns -1 if an error occurs
111 */
112static int rt_connect(libtrace_t *libtrace) {
113        struct hostent *he;
114        struct sockaddr_in remote;
115        rt_header_t connect_msg;
116        rt_deny_conn_t deny_hdr;       
117        rt_hello_t hello_opts;
118        uint8_t reason;
119       
120        if ((he=gethostbyname(RT_INFO->hostname)) == NULL) {
121                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
122                                "Failed to convert hostname %s to address",
123                                RT_INFO->hostname);
124                return -1;
125        }
126        if ((RT_INFO->input_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
127                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
128                                "Could not create socket");
129                return -1;
130        }
131
132        memset(&remote,0, sizeof(remote));
133        remote.sin_family = AF_INET;
134        remote.sin_port = htons(RT_INFO->port);
135        remote.sin_addr = *((struct in_addr *)he->h_addr);
136
137        if (connect(RT_INFO->input_fd, (struct sockaddr *)&remote,
138                                (socklen_t)sizeof(struct sockaddr)) == -1) {
139                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
140                                "Could not connect to host %s on port %d",
141                                RT_INFO->hostname, RT_INFO->port);
142                return -1;
143        }
144
145        /* We are connected, now receive message from server */
146       
147        if (recv(RT_INFO->input_fd, (void*)&connect_msg, sizeof(rt_header_t), 0) != sizeof(rt_header_t) ) {
148                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
149                                "Could not receive connection message from %s",
150                                RT_INFO->hostname);
151                return -1;
152        }
153
154        if (connect_msg.magic != LIBTRACE_RT_MAGIC) {
155                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
156                        "RT version mismatch: magic byte is incorrect");
157                return -1;
158        }
159
160        if (connect_msg.version != LIBTRACE_RT_VERSION) {
161                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
162                        "RT version mismatch: version is incorrect (expected %d, got %d",
163                        LIBTRACE_RT_VERSION, connect_msg.version);
164                return -1;
165        }
166
167               
168       
169        switch (ntohl(connect_msg.type)) {
170                case TRACE_RT_DENY_CONN:
171                        /* Connection was denied */
172                       
173                        if (recv(RT_INFO->input_fd, (void*)&deny_hdr, 
174                                                sizeof(rt_deny_conn_t),
175                                                0) != sizeof(rt_deny_conn_t)) {
176                                reason = 0;
177                        }       
178                        reason = ntohl(deny_hdr.reason);
179                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
180                                "Connection attempt is denied: %s",
181                                rt_deny_reason(reason));       
182                        return -1;
183                case TRACE_RT_HELLO:
184                        /* Hello message - read the options sent to us by the
185                         * server */
186                        if (recv(RT_INFO->input_fd, (void*)&hello_opts, 
187                                                sizeof(rt_hello_t), 0)
188                                        != sizeof(rt_hello_t)) {
189                                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
190                                        "Failed to receive TRACE_RT_HELLO options");
191                                return -1;
192                        }
193                       
194                       
195                        RT_INFO->reliable = hello_opts.reliable;
196                        RT_INFO->unacked = 0;
197                        return 0;
198                default:
199                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
200                                        "Unknown message type received: %d",
201                                        connect_msg.type);
202                        return -1;
203        }
204        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
205                        "Somehow you managed to reach this unreachable code");
206        return -1;
207}
208
209static void rt_init_format_data(libtrace_t *libtrace) {
210        libtrace->format_data = malloc(sizeof(struct rt_format_data_t));
211
212        RT_INFO->dummy_duck = NULL;
213        RT_INFO->dummy_erf = NULL;
214        RT_INFO->dummy_pcap = NULL;
215        RT_INFO->dummy_linux = NULL;
216        RT_INFO->dummy_ring = NULL;
217        RT_INFO->dummy_bpf = NULL;
218        RT_INFO->pkt_buffer = NULL;
219        RT_INFO->buf_read = NULL;
220        RT_INFO->buf_write = NULL;
221        RT_INFO->hostname = NULL;
222        RT_INFO->port = 0;
223        RT_INFO->unacked = 0;
224
225        RT_INFO->bucket = libtrace_bucket_init();
226}
227
228static int rt_init_input(libtrace_t *libtrace) {
229        char *scan;
230        char *uridata = libtrace->uridata;
231
232        rt_init_format_data(libtrace);
233
234        /* If the user specifies "rt:" then assume localhost and the default
235         * port */     
236        if (strlen(uridata) == 0) {
237                RT_INFO->hostname =
238                        strdup("localhost");
239                RT_INFO->port =
240                        COLLECTOR_PORT;
241        } else {
242                /* If the user does not specify a port, assume the default
243                 * port */
244                if ((scan = strchr(uridata,':')) == NULL) {
245                        RT_INFO->hostname =
246                                strdup(uridata);
247                        RT_INFO->port =
248                                COLLECTOR_PORT;
249                } else {
250                        RT_INFO->hostname =
251                                (char *)strndup(uridata,
252                                                (size_t)(scan - uridata));
253                        RT_INFO->port =
254                                atoi(++scan);
255                }
256        }
257
258        return 0;
259}
260       
261static int rt_start_input(libtrace_t *libtrace) {
262        rt_header_t start_msg;
263
264        start_msg.type = htonl(TRACE_RT_START);
265        start_msg.length = 0;
266        start_msg.sequence = 0;
267        start_msg.version = LIBTRACE_RT_VERSION;
268        start_msg.magic = LIBTRACE_RT_MAGIC; 
269
270        if (rt_connect(libtrace) == -1)
271                return -1;
272       
273        /* Need to send start message to server */
274        if (send(RT_INFO->input_fd, (void*)&start_msg, sizeof(rt_header_t) +
275                                start_msg.length, 0) != sizeof(rt_header_t)) {
276                printf("Failed to send start message to server\n");
277                return -1;
278        }
279
280        return 0;
281}
282
283static int rt_pause_input(libtrace_t *libtrace) {
284        rt_header_t close_msg;
285
286        close_msg.type = htonl(TRACE_RT_CLOSE);
287        close_msg.length = 0; 
288       
289        /* Send a close message to the server */
290        if (send(RT_INFO->input_fd, (void*)&close_msg, sizeof(rt_header_t) + 
291                                close_msg.length, 0) != (int)sizeof(rt_header_t)
292                                + close_msg.length) {
293                printf("Failed to send close message to server\n");
294       
295        }
296
297        close(RT_INFO->input_fd);
298        return 0;
299}
300
301static int rt_fin_input(libtrace_t *libtrace) {
302        /* Make sure we clean up any dummy traces that we have been using */
303
304        if (RT_INFO->dummy_duck)
305                trace_destroy_dead(RT_INFO->dummy_duck);
306
307        if (RT_INFO->dummy_erf)
308                trace_destroy_dead(RT_INFO->dummy_erf);
309
310        if (RT_INFO->dummy_pcap)
311                trace_destroy_dead(RT_INFO->dummy_pcap);
312
313        if (RT_INFO->dummy_linux)
314                trace_destroy_dead(RT_INFO->dummy_linux);
315
316        if (RT_INFO->dummy_ring)
317                trace_destroy_dead(RT_INFO->dummy_ring);
318
319        if (RT_INFO->dummy_bpf)
320                trace_destroy_dead(RT_INFO->dummy_bpf);
321
322        if (RT_INFO->bucket)
323                libtrace_bucket_destroy(RT_INFO->bucket);
324        free(libtrace->format_data);
325        return 0;
326}
327
328/* Sends an RT ACK to the server to acknowledge receipt of packets */
329static int rt_send_ack(libtrace_t *libtrace, 
330                uint32_t seqno)  {
331       
332        static char *ack_buffer = 0;
333        char *buf_ptr;
334        int numbytes = 0;
335        size_t to_write = 0;
336        rt_header_t *hdr;
337        rt_ack_t *ack_hdr;
338       
339        if (!ack_buffer) {
340                ack_buffer = (char*)malloc(sizeof(rt_header_t) 
341                                                        + sizeof(rt_ack_t));
342        }
343       
344        hdr = (rt_header_t *) ack_buffer;
345        ack_hdr = (rt_ack_t *) (ack_buffer + sizeof(rt_header_t));
346       
347        hdr->type = htonl(TRACE_RT_ACK);
348        hdr->length = htons(sizeof(rt_ack_t));
349
350        ack_hdr->sequence = htonl(seqno);
351       
352        to_write = sizeof(rt_ack_t) + sizeof(rt_header_t);
353        buf_ptr = ack_buffer;
354
355        /* Keep trying until we write the entire ACK */
356        while (to_write > 0) {
357                numbytes = send(RT_INFO->input_fd, buf_ptr, to_write, 0); 
358                if (numbytes == -1) {
359                        if (errno == EINTR || errno == EAGAIN) {
360                                continue;
361                        }
362                        else {
363                                printf("Error sending ack\n");
364                                perror("send");
365                                trace_set_err(libtrace, TRACE_ERR_RT_FAILURE, 
366                                                "Error sending ack");
367                                return -1;
368                        }
369                }
370                to_write = to_write - numbytes;
371                buf_ptr = buf_ptr + to_write;
372               
373        }
374
375        return 1;
376}
377
378/* Sets the trace format for the packet to match the format it was originally
379 * captured in, rather than the RT format */
380static int rt_set_format(libtrace_t *libtrace, libtrace_packet_t *packet) 
381{
382
383        /* We need to assign the packet to a "dead" trace */
384
385        /* Try to minimize the number of corrupt packets that slip through
386         * while making it easy to identify new pcap DLTs */
387        if (packet->type > TRACE_RT_DATA_DLT && 
388                        packet->type < TRACE_RT_DATA_DLT_END) {
389                if (!RT_INFO->dummy_pcap) {
390                        RT_INFO->dummy_pcap = trace_create_dead("pcap:-");
391                }
392                packet->trace = RT_INFO->dummy_pcap;
393                return 0;       
394        }
395
396        if (packet->type > TRACE_RT_DATA_BPF &&
397                        packet->type < TRACE_RT_DATA_BPF_END) {
398
399                if (!RT_INFO->dummy_bpf) {
400                        RT_INFO->dummy_bpf = trace_create_dead("bpf:-");
401                        /* This may fail on a non-BSD machine */
402                        if (trace_is_err(RT_INFO->dummy_bpf)) {
403                                trace_perror(RT_INFO->dummy_bpf, "Creating dead bpf trace");
404                                return -1;
405                        }
406                }
407                packet->trace = RT_INFO->dummy_bpf;
408                return 0;
409        }
410
411        switch (packet->type) {
412                case TRACE_RT_DUCK_2_4:
413                case TRACE_RT_DUCK_2_5:
414                case TRACE_RT_DUCK_5_0:
415                        if (!RT_INFO->dummy_duck) {
416                                RT_INFO->dummy_duck = trace_create_dead("duck:dummy");
417                        }
418                        packet->trace = RT_INFO->dummy_duck;
419                        break;
420                case TRACE_RT_DATA_ERF:
421                        if (!RT_INFO->dummy_erf) {
422                                RT_INFO->dummy_erf = trace_create_dead("erf:-");
423                        }
424                        packet->trace = RT_INFO->dummy_erf;
425                        break;
426                case TRACE_RT_DATA_LINUX_NATIVE:
427                        if (!RT_INFO->dummy_linux) {
428                                RT_INFO->dummy_linux = trace_create_dead("int:");
429                                /* This may fail on a non-Linux machine */
430                                if (trace_is_err(RT_INFO->dummy_linux)) {
431                                        trace_perror(RT_INFO->dummy_linux, "Creating dead int trace");
432                                        return -1;
433                                }
434                        }
435                        packet->trace = RT_INFO->dummy_linux;
436                        break;
437                case TRACE_RT_DATA_LINUX_RING:
438                        if (!RT_INFO->dummy_ring) {
439                                RT_INFO->dummy_ring = trace_create_dead("ring:");
440                                /* This may fail on a non-Linux machine */
441                                if (trace_is_err(RT_INFO->dummy_ring)) {
442                                        trace_perror(RT_INFO->dummy_ring, "Creating dead ring trace");
443                                        return -1;
444                                }
445                        }
446                        packet->trace = RT_INFO->dummy_ring;
447                        break;
448                case TRACE_RT_STATUS:
449                case TRACE_RT_METADATA:
450                        /* Just use the RT trace! */
451                        packet->trace = libtrace;
452                        break;
453                case TRACE_RT_DATA_LEGACY_ETH:
454                case TRACE_RT_DATA_LEGACY_ATM:
455                case TRACE_RT_DATA_LEGACY_POS:
456                        printf("Sending legacy over RT is currently not supported\n");
457                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, "Legacy packet cannot be sent over rt");
458                        return -1;
459                default:
460                        printf("Unrecognised format: %u\n", packet->type);
461                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, "Unrecognised packet format");
462                        return -1;
463        }
464        return 0; /* success */
465}               
466
467
468/* I've upped this to 10K to deal with jumbo-grams that have not been snapped
469 * in any way. This means we have a much larger memory overhead per packet
470 * (which won't be used in the vast majority of cases), so we may want to think
471 * about doing something smarter, e.g. allocate a smaller block of memory and
472 * only increase it as required.
473 *
474 * XXX Capturing off int: can still lead to packets that are larger than 10K,
475 * in instances where the fragmentation is done magically by the NIC. This
476 * is pretty nasty, but also very rare.
477 */
478#define RT_BUF_SIZE (LIBTRACE_PACKET_BUFSIZE * 2)
479
480static int rt_process_data_packet(libtrace_t *libtrace,
481                libtrace_packet_t *packet) {
482
483        uint32_t prep_flags = TRACE_PREP_DO_NOT_OWN_BUFFER;
484        rt_header_t *hdr = (rt_header_t *)packet->header;
485
486        /* Send an ACK if required */
487        if (RT_INFO->reliable > 0 && packet->type >= TRACE_RT_DATA_SIMPLE) {
488                RT_INFO->unacked ++;
489                if (RT_INFO->unacked >= RT_ACK_FREQUENCY) {
490                        if (rt_send_ack(libtrace, hdr->sequence) == -1)
491                                return -1;
492                        RT_INFO->unacked = 0;
493                }
494        }
495
496        /* Convert to the original capture format */
497        if (rt_set_format(libtrace, packet) < 0) {
498                return -1;
499        }
500
501        /* Update payload pointers and packet type to match the original
502         * format */
503        if (trace_prepare_packet(packet->trace, packet, packet->payload,
504                                packet->type, prep_flags)) {
505                return -1;
506        }
507
508        return 1;
509
510}
511
512/* Receives data from an RT server */
513static int rt_read(libtrace_t *libtrace, int block) {
514        int numbytes;
515
516        if (!RT_INFO->pkt_buffer) {
517                RT_INFO->pkt_buffer = (char*)malloc((size_t)RT_BUF_SIZE);
518                RT_INFO->buf_write = RT_INFO->pkt_buffer;
519                RT_INFO->buf_read = RT_INFO->pkt_buffer;
520                libtrace_create_new_bucket(RT_INFO->bucket, RT_INFO->pkt_buffer);
521        }
522
523#ifndef MSG_DONTWAIT
524#define MSG_DONTWAIT 0
525#endif
526#ifndef MSG_NOSIGNAL
527#  define MSG_NOSIGNAL 0
528#endif
529
530        if (block)
531                block=0;
532        else
533                block=MSG_DONTWAIT;
534
535        /* If the current buffer has plenty of space left, we can continue to
536         * read into it, otherwise create a new buffer and move anything in
537         * the old buffer over to it */
538        if (RT_INFO->buf_write - RT_INFO->pkt_buffer > RT_BUF_SIZE / 2) {
539                char *newbuf = (char*)malloc((size_t)RT_BUF_SIZE);
540
541                memcpy(newbuf, RT_INFO->buf_read, RT_INFO->buf_write - RT_INFO->buf_read);
542                RT_INFO->buf_write = newbuf + (RT_INFO->buf_write - RT_INFO->buf_read);
543                RT_INFO->buf_read = newbuf;
544                RT_INFO->pkt_buffer = newbuf;
545                libtrace_create_new_bucket(RT_INFO->bucket, newbuf);
546
547        }
548
549        /* Attempt to fill the buffer as much as we can */
550        if ((numbytes = recv(RT_INFO->input_fd, RT_INFO->buf_write,
551                        RT_BUF_SIZE - (RT_INFO->buf_write-RT_INFO->pkt_buffer),
552                        MSG_NOSIGNAL | block)) <= 0) {
553
554                if (numbytes == 0) {
555                        trace_set_err(libtrace, TRACE_ERR_RT_FAILURE,
556                                        "No data received by RT client");
557                        return -1;
558                }
559
560                if (errno == EINTR) {
561                        /* Ignore EINTR in case a caller is using signals */
562                        return 0;
563                }
564
565                if (errno == EAGAIN) {
566                        /* No data available and we are non-blocking */
567                        trace_set_err(libtrace, EAGAIN, "EAGAIN");
568                        return -1;
569                }
570
571                trace_set_err(libtrace, TRACE_ERR_RT_FAILURE,
572                                "Error reading from RT socket: %s",
573                                strerror(errno));
574                return -1;
575        }
576
577        RT_INFO->buf_write += numbytes;
578        return (RT_INFO->buf_write - RT_INFO->buf_read);
579
580}
581
582
583static int rt_get_next_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
584                int block) {
585
586        rt_header_t *rthdr;
587
588        if (packet->buffer && packet->buf_control == TRACE_CTRL_PACKET)
589                free(packet->buffer);
590
591        while (RT_INFO->buf_write - RT_INFO->buf_read <
592                                (uint32_t)sizeof(rt_header_t)) {
593                if (rt_read(libtrace, block) == -1)
594                        return -1;
595        }
596
597        rthdr = (rt_header_t *)RT_INFO->buf_read;
598
599        /* Check if we have enough payload */
600        while (RT_INFO->buf_write - (RT_INFO->buf_read + sizeof(rt_header_t))
601                        < ntohs(rthdr->length)) {
602                if (rt_read(libtrace, block) == -1)
603                        return -1;
604                rthdr = (rt_header_t *)RT_INFO->buf_read;
605        }
606
607
608        packet->buffer = RT_INFO->buf_read;
609        packet->header = RT_INFO->buf_read;
610        packet->type = ntohl(((rt_header_t *)packet->header)->type);
611        packet->payload = RT_INFO->buf_read + sizeof(rt_header_t);
612        packet->internalid = libtrace_push_into_bucket(RT_INFO->bucket);
613        assert(packet->internalid != 0);
614        packet->srcbucket = RT_INFO->bucket;
615        packet->buf_control = TRACE_CTRL_EXTERNAL;
616
617        RT_INFO->buf_read += ntohs(rthdr->length) + sizeof(rt_header_t);
618
619        if (packet->type >= TRACE_RT_DATA_SIMPLE) {
620                rt_process_data_packet(libtrace, packet);
621        } else {
622                switch(packet->type) {
623                        case TRACE_RT_DUCK_2_4:
624                        case TRACE_RT_DUCK_2_5:
625                        case TRACE_RT_STATUS:
626                        case TRACE_RT_METADATA:
627                                if (rt_process_data_packet(libtrace, packet) < 0)
628                                        return -1;
629                                break;
630                        case TRACE_RT_END_DATA:
631                        case TRACE_RT_KEYCHANGE:
632                        case TRACE_RT_LOSTCONN:
633                        case TRACE_RT_CLIENTDROP:
634                        case TRACE_RT_SERVERSTART:
635                                break;
636                        case TRACE_RT_PAUSE_ACK:
637                        case TRACE_RT_OPTION:
638                                break;
639                        default:
640                                fprintf(stderr, "Bad RT type for client: %d\n",
641                                                packet->type);
642                                return -1;
643                }
644        }
645
646        return ntohs(rthdr->length);
647
648}
649
650/* Shouldn't need to call this too often */
651static int rt_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
652                void *buffer, libtrace_rt_types_t rt_type, uint32_t flags) {
653
654        if (packet->buffer != buffer &&
655                        packet->buf_control == TRACE_CTRL_PACKET) {
656                free(packet->buffer);
657        }
658
659        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
660                packet->buf_control = TRACE_CTRL_PACKET;
661        } else
662                packet->buf_control = TRACE_CTRL_EXTERNAL;
663
664
665        packet->buffer = buffer;
666        packet->header = NULL;
667        packet->type = rt_type;
668        packet->payload = buffer;
669
670        if (libtrace->format_data == NULL) {
671                rt_init_format_data(libtrace);
672        }
673
674        return 0;
675}       
676
677/* Reads the next available packet in a blocking fashion */
678static int rt_read_packet(libtrace_t *libtrace,
679                libtrace_packet_t *packet) {
680        return rt_get_next_packet(libtrace,packet,1);
681}
682
683
684/* This should only get called for RT messages - RT-encapsulated data records
685 * should be converted to the appropriate capture format */
686static int rt_get_capture_length(const libtrace_packet_t *packet) {
687        rt_metadata_t *rt_md_hdr;
688        switch (packet->type) {
689                case TRACE_RT_STATUS:
690                        return sizeof(rt_status_t);
691                case TRACE_RT_HELLO:
692                        return sizeof(rt_hello_t);
693                case TRACE_RT_START:
694                        return 0;
695                case TRACE_RT_ACK:
696                        return sizeof(rt_ack_t);
697                case TRACE_RT_END_DATA:
698                        return 0;
699                case TRACE_RT_CLOSE:
700                        return 0;
701                case TRACE_RT_DENY_CONN:
702                        return sizeof(rt_deny_conn_t);
703                case TRACE_RT_PAUSE:
704                        return 0; 
705                case TRACE_RT_PAUSE_ACK:
706                        return 0;
707                case TRACE_RT_OPTION:
708                        return 0; /* FIXME */
709                case TRACE_RT_KEYCHANGE:
710                        return 0;
711                case TRACE_RT_LOSTCONN:
712                        return 0;
713                case TRACE_RT_SERVERSTART:
714                        return 0;
715                case TRACE_RT_CLIENTDROP:
716                        return 0;
717                case TRACE_RT_METADATA:
718                        /* This is a little trickier to work out */
719                        rt_md_hdr = (rt_metadata_t *)packet->buffer;
720                        return rt_md_hdr->label_len + rt_md_hdr->value_len + 
721                                sizeof(rt_metadata_t);
722                default:
723                        printf("Unknown type: %d\n", packet->type);
724                       
725        }
726        return 0;
727}
728
729/* RT messages do not have a wire length because they were not captured from
730 * the wire - they were generated by the capture process */
731static int rt_get_wire_length(UNUSED const libtrace_packet_t *packet) {
732        return 0;
733}
734
735/* Although RT messages do contain "framing", this framing is considered to be
736 * stripped as soon as the packet is read by the RT client */                   
737static int rt_get_framing_length(UNUSED const libtrace_packet_t *packet) {
738        return 0;
739}
740
741
742static libtrace_linktype_t rt_get_link_type(UNUSED const libtrace_packet_t *packet)
743{
744        /* RT messages don't have a link type */
745        return TRACE_TYPE_NONDATA;
746}
747
748static int rt_get_fd(const libtrace_t *trace) {
749        return ((struct rt_format_data_t *)trace->format_data)->input_fd;
750}
751
752static libtrace_eventobj_t trace_event_rt(libtrace_t *trace,
753                                        libtrace_packet_t *packet) 
754{
755        libtrace_eventobj_t event = {0,0,0.0,0};
756        libtrace_err_t read_err;
757
758        assert(trace);
759        assert(packet);
760       
761        if (trace->format->get_fd) {
762                event.fd = trace->format->get_fd(trace);
763        } else {
764                event.fd = 0;
765        }
766
767        do {
768
769                event.size = rt_get_next_packet(trace, packet, 0);
770                if (event.size == -1) {
771                        read_err = trace_get_err(trace);
772                        if (read_err.err_num == EAGAIN) {
773                                /* No data available - do an IOWAIT */
774                                event.type = TRACE_EVENT_IOWAIT;
775                        }
776                        else {
777                                trace_perror(trace, "Error doing a non-blocking read from rt");
778                                event.type = TRACE_EVENT_PACKET;
779                                break;
780                        }
781                } else if (event.size == 0) {
782                        /* RT gives us a specific indicator that there will be
783                         * no more packets. */
784                        if (packet->type == TRACE_RT_END_DATA)
785                                event.type = TRACE_EVENT_TERMINATE;
786                        else {
787                                /* Since several RT messages can have zero-byte
788                                 * length (once the framing is removed), an
789                                 * event size of zero can still indicate a
790                                 * PACKET event */
791                                event.type = TRACE_EVENT_PACKET;
792                                trace->accepted_packets ++;
793                        }
794
795                }       
796                else {
797                        event.type = TRACE_EVENT_PACKET;
798                        trace->accepted_packets ++;
799                }
800
801                if (trace->filter && event.type == TRACE_EVENT_PACKET) {
802                        if (!trace_apply_filter(trace->filter, packet)) {
803                                trace_clear_cache(packet);
804                                trace->filtered_packets ++;
805                                continue;
806                        }
807                }
808
809                break; 
810        } while (1);
811
812        return event;
813}
814
815static void rt_help(void) {
816        printf("rt format module\n");
817        printf("Supported input URIs:\n");
818        printf("\trt:hostname:port\n");
819        printf("\trt:hostname (connects on default port)\n");
820        printf("\n");
821        printf("\te.g.: rt:localhost\n");
822        printf("\te.g.: rt:localhost:32500\n");
823        printf("\n");
824
825}
826
827
828static struct libtrace_format_t rt = {
829        "rt",
830        "$Id$",
831        TRACE_FORMAT_RT,
832        NULL,                           /* probe filename */
833        NULL,                           /* probe magic */
834        rt_init_input,                  /* init_input */
835        NULL,                           /* config_input */
836        rt_start_input,                 /* start_input */
837        rt_pause_input,                 /* pause */
838        NULL,                           /* init_output */
839        NULL,                           /* config_output */
840        NULL,                           /* start_output */
841        rt_fin_input,                   /* fin_input */
842        NULL,                           /* fin_output */
843        rt_read_packet,                 /* read_packet */
844        rt_prepare_packet,              /* prepare_packet */
845        NULL,                           /* fin_packet */
846        NULL,                           /* write_packet */
847        NULL,                           /* flush_output */
848        rt_get_link_type,               /* get_link_type */
849        NULL,                           /* get_direction */
850        NULL,                           /* set_direction */
851        NULL,                           /* get_erf_timestamp */
852        NULL,                           /* get_timeval */
853        NULL,                           /* get_timespec */
854        NULL,                           /* get_seconds */
855        NULL,                           /* seek_erf */
856        NULL,                           /* seek_timeval */
857        NULL,                           /* seek_seconds */
858        rt_get_capture_length,          /* get_capture_length */
859        rt_get_wire_length,                     /* get_wire_length */
860        rt_get_framing_length,          /* get_framing_length */
861        NULL,                           /* set_capture_length */
862        NULL,                           /* get_received_packets */
863        NULL,                           /* get_filtered_packets */
864        NULL,                           /* get_dropped_packets */
865        NULL,                           /* get_statistics */
866        rt_get_fd,                      /* get_fd */
867        trace_event_rt,             /* trace_event */
868        rt_help,                        /* help */
869        NULL,                   /* next pointer */
870        NON_PARALLEL(true) /* This is normally live */
871};
872
873void rt_constructor(void) {
874        register_format(&rt);
875}
Note: See TracBrowser for help on using the repository browser.