source: lib/format_rt.c @ 2725318

develop
Last change on this file since 2725318 was 2725318, checked in by Jacob Van Walraven <jcv9@…>, 2 years ago

Cleanup some of the assertions

  • Property mode set to 100644
File size: 25.8 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27#define _GNU_SOURCE
28
29#include "config.h"
30#include "common.h"
31#include "libtrace.h"
32#include "libtrace_int.h"
33#include "format_helper.h"
34#include "rt_protocol.h"
35
36#include "data-struct/buckets.h"
37
38#include <sys/stat.h>
39#include <assert.h>
40#include <errno.h>
41#include <fcntl.h>
42#include <stdio.h>
43#include <string.h>
44#include <stdlib.h>
45#include <unistd.h>
46
47#ifndef WIN32
48# include <netdb.h>
49#endif
50
51#define RT_INFO ((struct rt_format_data_t*)libtrace->format_data)
52
53/* Convert the RT denial code into a nice printable and coherent string */
54static const char *rt_deny_reason(enum rt_conn_denied_t reason) 
55{
56        const char *string = 0;
57
58        switch(reason) {
59                case RT_DENY_WRAPPER:
60                        string = "Rejected by TCP Wrappers";
61                        break;
62                case RT_DENY_FULL:
63                        string = "Max connections reached on server";
64                        break;
65                case RT_DENY_AUTH:
66                        string = "Authentication failed";
67                        break;
68                default:
69                        string = "Unknown reason";
70        }
71
72        return string;
73}
74
75
76struct rt_format_data_t {
77        /* Name of the host to connect to */
78        char *hostname;
79        /* Buffer to store received packets into */
80        char *pkt_buffer;
81        /* Pointer to the next packet to be read from the buffer */
82        char *buf_read;
83        /* Pointer to the next unused byte in the buffer */
84        char *buf_write;
85        /* The port to connect to */
86        int port;
87        /* The file descriptor for the RT connection */
88        int input_fd;
89        /* Flag indicating whether the server is doing reliable RT */
90        int reliable;
91
92        int unacked;
93       
94        /* Dummy traces that can be assigned to the received packets to ensure
95         * that the appropriate functions can be used to process them */
96        libtrace_t *dummy_duck;
97        libtrace_t *dummy_erf;
98        libtrace_t *dummy_pcap;
99        libtrace_t *dummy_linux;
100        libtrace_t *dummy_ring;
101        libtrace_t *dummy_bpf;
102
103        /* Bucket structure for storing read packets until the user is
104         * done with them. */
105        libtrace_bucket_t *bucket;
106};
107
108/* Connects to an RT server
109 *
110 * Returns -1 if an error occurs
111 */
112static int rt_connect(libtrace_t *libtrace) {
113        struct hostent *he;
114        struct sockaddr_in remote;
115        rt_header_t connect_msg;
116        rt_deny_conn_t deny_hdr;       
117        rt_hello_t hello_opts;
118        uint8_t reason;
119       
120        if ((he=gethostbyname(RT_INFO->hostname)) == NULL) {
121                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
122                                "Failed to convert hostname %s to address",
123                                RT_INFO->hostname);
124                return -1;
125        }
126        if ((RT_INFO->input_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
127                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
128                                "Could not create socket");
129                return -1;
130        }
131
132        memset(&remote,0, sizeof(remote));
133        remote.sin_family = AF_INET;
134        remote.sin_port = htons(RT_INFO->port);
135        remote.sin_addr = *((struct in_addr *)he->h_addr);
136
137        if (connect(RT_INFO->input_fd, (struct sockaddr *)&remote,
138                                (socklen_t)sizeof(struct sockaddr)) == -1) {
139                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
140                                "Could not connect to host %s on port %d",
141                                RT_INFO->hostname, RT_INFO->port);
142                return -1;
143        }
144
145        /* We are connected, now receive message from server */
146       
147        if (recv(RT_INFO->input_fd, (void*)&connect_msg, sizeof(rt_header_t), 0) != sizeof(rt_header_t) ) {
148                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
149                                "Could not receive connection message from %s",
150                                RT_INFO->hostname);
151                return -1;
152        }
153
154        if (connect_msg.magic != LIBTRACE_RT_MAGIC) {
155                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
156                        "RT version mismatch: magic byte is incorrect");
157                return -1;
158        }
159
160        if (connect_msg.version != LIBTRACE_RT_VERSION) {
161                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
162                        "RT version mismatch: version is incorrect (expected %d, got %d",
163                        LIBTRACE_RT_VERSION, connect_msg.version);
164                return -1;
165        }
166
167               
168       
169        switch (ntohl(connect_msg.type)) {
170                case TRACE_RT_DENY_CONN:
171                        /* Connection was denied */
172                       
173                        if (recv(RT_INFO->input_fd, (void*)&deny_hdr, 
174                                                sizeof(rt_deny_conn_t),
175                                                0) != sizeof(rt_deny_conn_t)) {
176                                reason = 0;
177                        }       
178                        reason = ntohl(deny_hdr.reason);
179                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
180                                "Connection attempt is denied: %s",
181                                rt_deny_reason(reason));       
182                        return -1;
183                case TRACE_RT_HELLO:
184                        /* Hello message - read the options sent to us by the
185                         * server */
186                        if (recv(RT_INFO->input_fd, (void*)&hello_opts, 
187                                                sizeof(rt_hello_t), 0)
188                                        != sizeof(rt_hello_t)) {
189                                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
190                                        "Failed to receive TRACE_RT_HELLO options");
191                                return -1;
192                        }
193                       
194                       
195                        RT_INFO->reliable = hello_opts.reliable;
196                        RT_INFO->unacked = 0;
197                        return 0;
198                default:
199                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
200                                        "Unknown message type received: %d",
201                                        connect_msg.type);
202                        return -1;
203        }
204        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
205                        "Somehow you managed to reach this unreachable code");
206        return -1;
207}
208
209static void rt_init_format_data(libtrace_t *libtrace) {
210        libtrace->format_data = malloc(sizeof(struct rt_format_data_t));
211
212        RT_INFO->dummy_duck = NULL;
213        RT_INFO->dummy_erf = NULL;
214        RT_INFO->dummy_pcap = NULL;
215        RT_INFO->dummy_linux = NULL;
216        RT_INFO->dummy_ring = NULL;
217        RT_INFO->dummy_bpf = NULL;
218        RT_INFO->pkt_buffer = NULL;
219        RT_INFO->buf_read = NULL;
220        RT_INFO->buf_write = NULL;
221        RT_INFO->hostname = NULL;
222        RT_INFO->port = 0;
223        RT_INFO->unacked = 0;
224
225        RT_INFO->bucket = libtrace_bucket_init();
226}
227
228static int rt_init_input(libtrace_t *libtrace) {
229        char *scan;
230        char *uridata = libtrace->uridata;
231
232        rt_init_format_data(libtrace);
233
234        /* If the user specifies "rt:" then assume localhost and the default
235         * port */
236        if (strlen(uridata) == 0) {
237                RT_INFO->hostname =
238                        strdup("localhost");
239                RT_INFO->port =
240                        COLLECTOR_PORT;
241        } else {
242                /* If the user does not specify a port, assume the default
243                 * port */
244                if ((scan = strchr(uridata,':')) == NULL) {
245                        RT_INFO->hostname =
246                                strdup(uridata);
247                        RT_INFO->port =
248                                COLLECTOR_PORT;
249                } else {
250                        RT_INFO->hostname =
251                                (char *)strndup(uridata,
252                                                (size_t)(scan - uridata));
253                        RT_INFO->port =
254                                atoi(++scan);
255                }
256        }
257
258        return 0;
259}
260       
261static int rt_start_input(libtrace_t *libtrace) {
262        rt_header_t start_msg;
263
264        start_msg.type = htonl(TRACE_RT_START);
265        start_msg.length = 0;
266        start_msg.sequence = 0;
267        start_msg.version = LIBTRACE_RT_VERSION;
268        start_msg.magic = LIBTRACE_RT_MAGIC; 
269
270        if (rt_connect(libtrace) == -1)
271                return -1;
272       
273        /* Need to send start message to server */
274        if (send(RT_INFO->input_fd, (void*)&start_msg, sizeof(rt_header_t) +
275                                start_msg.length, 0) != sizeof(rt_header_t)) {
276                printf("Failed to send start message to server\n");
277                return -1;
278        }
279
280        return 0;
281}
282
283static int rt_pause_input(libtrace_t *libtrace) {
284        rt_header_t close_msg;
285
286        close_msg.type = htonl(TRACE_RT_CLOSE);
287        close_msg.length = 0; 
288       
289        /* Send a close message to the server */
290        if (send(RT_INFO->input_fd, (void*)&close_msg, sizeof(rt_header_t) + 
291                                close_msg.length, 0) != (int)sizeof(rt_header_t)
292                                + close_msg.length) {
293                printf("Failed to send close message to server\n");
294       
295        }
296
297        close(RT_INFO->input_fd);
298        return 0;
299}
300
301static int rt_fin_input(libtrace_t *libtrace) {
302        /* Make sure we clean up any dummy traces that we have been using */
303
304        if (RT_INFO->dummy_duck)
305                trace_destroy_dead(RT_INFO->dummy_duck);
306
307        if (RT_INFO->dummy_erf)
308                trace_destroy_dead(RT_INFO->dummy_erf);
309
310        if (RT_INFO->dummy_pcap)
311                trace_destroy_dead(RT_INFO->dummy_pcap);
312
313        if (RT_INFO->dummy_linux)
314                trace_destroy_dead(RT_INFO->dummy_linux);
315
316        if (RT_INFO->dummy_ring)
317                trace_destroy_dead(RT_INFO->dummy_ring);
318
319        if (RT_INFO->dummy_bpf)
320                trace_destroy_dead(RT_INFO->dummy_bpf);
321
322        if (RT_INFO->bucket)
323                libtrace_bucket_destroy(RT_INFO->bucket);
324        free(libtrace->format_data);
325        return 0;
326}
327
328/* Sends an RT ACK to the server to acknowledge receipt of packets */
329static int rt_send_ack(libtrace_t *libtrace, 
330                uint32_t seqno)  {
331       
332        static char *ack_buffer = 0;
333        char *buf_ptr;
334        int numbytes = 0;
335        size_t to_write = 0;
336        rt_header_t *hdr;
337        rt_ack_t *ack_hdr;
338       
339        if (!ack_buffer) {
340                ack_buffer = (char*)malloc(sizeof(rt_header_t) 
341                                                        + sizeof(rt_ack_t));
342        }
343       
344        hdr = (rt_header_t *) ack_buffer;
345        ack_hdr = (rt_ack_t *) (ack_buffer + sizeof(rt_header_t));
346       
347        hdr->type = htonl(TRACE_RT_ACK);
348        hdr->length = htons(sizeof(rt_ack_t));
349
350        ack_hdr->sequence = htonl(seqno);
351       
352        to_write = sizeof(rt_ack_t) + sizeof(rt_header_t);
353        buf_ptr = ack_buffer;
354
355        /* Keep trying until we write the entire ACK */
356        while (to_write > 0) {
357                numbytes = send(RT_INFO->input_fd, buf_ptr, to_write, 0); 
358                if (numbytes == -1) {
359                        if (errno == EINTR || errno == EAGAIN) {
360                                continue;
361                        }
362                        else {
363                                printf("Error sending ack\n");
364                                perror("send");
365                                trace_set_err(libtrace, TRACE_ERR_RT_FAILURE, 
366                                                "Error sending ack");
367                                return -1;
368                        }
369                }
370                to_write = to_write - numbytes;
371                buf_ptr = buf_ptr + to_write;
372               
373        }
374
375        return 1;
376}
377
378/* Sets the trace format for the packet to match the format it was originally
379 * captured in, rather than the RT format */
380static int rt_set_format(libtrace_t *libtrace, libtrace_packet_t *packet) 
381{
382
383        /* We need to assign the packet to a "dead" trace */
384
385        /* Try to minimize the number of corrupt packets that slip through
386         * while making it easy to identify new pcap DLTs */
387        if (packet->type > TRACE_RT_DATA_DLT && 
388                        packet->type < TRACE_RT_DATA_DLT_END) {
389                if (!RT_INFO->dummy_pcap) {
390                        RT_INFO->dummy_pcap = trace_create_dead("pcap:-");
391                }
392                packet->trace = RT_INFO->dummy_pcap;
393                return 0;       
394        }
395
396        if (packet->type > TRACE_RT_DATA_BPF &&
397                        packet->type < TRACE_RT_DATA_BPF_END) {
398
399                if (!RT_INFO->dummy_bpf) {
400                        RT_INFO->dummy_bpf = trace_create_dead("bpf:-");
401                        /* This may fail on a non-BSD machine */
402                        if (trace_is_err(RT_INFO->dummy_bpf)) {
403                                trace_perror(RT_INFO->dummy_bpf, "Creating dead bpf trace");
404                                return -1;
405                        }
406                }
407                packet->trace = RT_INFO->dummy_bpf;
408                return 0;
409        }
410
411        switch (packet->type) {
412                case TRACE_RT_DUCK_2_4:
413                case TRACE_RT_DUCK_2_5:
414                case TRACE_RT_DUCK_5_0:
415                        if (!RT_INFO->dummy_duck) {
416                                RT_INFO->dummy_duck = trace_create_dead("duck:dummy");
417                        }
418                        packet->trace = RT_INFO->dummy_duck;
419                        break;
420                case TRACE_RT_DATA_ERF:
421                        if (!RT_INFO->dummy_erf) {
422                                RT_INFO->dummy_erf = trace_create_dead("erf:-");
423                        }
424                        packet->trace = RT_INFO->dummy_erf;
425                        break;
426                case TRACE_RT_DATA_LINUX_NATIVE:
427                        if (!RT_INFO->dummy_linux) {
428                                RT_INFO->dummy_linux = trace_create_dead("int:");
429                                /* This may fail on a non-Linux machine */
430                                if (trace_is_err(RT_INFO->dummy_linux)) {
431                                        trace_perror(RT_INFO->dummy_linux, "Creating dead int trace");
432                                        return -1;
433                                }
434                        }
435                        packet->trace = RT_INFO->dummy_linux;
436                        break;
437                case TRACE_RT_DATA_LINUX_RING:
438                        if (!RT_INFO->dummy_ring) {
439                                RT_INFO->dummy_ring = trace_create_dead("ring:");
440                                /* This may fail on a non-Linux machine */
441                                if (trace_is_err(RT_INFO->dummy_ring)) {
442                                        trace_perror(RT_INFO->dummy_ring, "Creating dead ring trace");
443                                        return -1;
444                                }
445                        }
446                        packet->trace = RT_INFO->dummy_ring;
447                        break;
448                case TRACE_RT_STATUS:
449                case TRACE_RT_METADATA:
450                        /* Just use the RT trace! */
451                        packet->trace = libtrace;
452                        break;
453                case TRACE_RT_DATA_LEGACY_ETH:
454                case TRACE_RT_DATA_LEGACY_ATM:
455                case TRACE_RT_DATA_LEGACY_POS:
456                        printf("Sending legacy over RT is currently not supported\n");
457                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, "Legacy packet cannot be sent over rt");
458                        return -1;
459                default:
460                        printf("Unrecognised format: %u\n", packet->type);
461                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, "Unrecognised packet format");
462                        return -1;
463        }
464        return 0; /* success */
465}               
466
467
468/* I've upped this to 10K to deal with jumbo-grams that have not been snapped
469 * in any way. This means we have a much larger memory overhead per packet
470 * (which won't be used in the vast majority of cases), so we may want to think
471 * about doing something smarter, e.g. allocate a smaller block of memory and
472 * only increase it as required.
473 *
474 * XXX Capturing off int: can still lead to packets that are larger than 10K,
475 * in instances where the fragmentation is done magically by the NIC. This
476 * is pretty nasty, but also very rare.
477 */
478#define RT_BUF_SIZE (LIBTRACE_PACKET_BUFSIZE * 2)
479
480static int rt_process_data_packet(libtrace_t *libtrace,
481                libtrace_packet_t *packet) {
482
483        uint32_t prep_flags = TRACE_PREP_DO_NOT_OWN_BUFFER;
484        rt_header_t *hdr = (rt_header_t *)packet->header;
485
486        /* Send an ACK if required */
487        if (RT_INFO->reliable > 0 && packet->type >= TRACE_RT_DATA_SIMPLE) {
488                RT_INFO->unacked ++;
489                if (RT_INFO->unacked >= RT_ACK_FREQUENCY) {
490                        if (rt_send_ack(libtrace, hdr->sequence) == -1)
491                                return -1;
492                        RT_INFO->unacked = 0;
493                }
494        }
495
496        /* Convert to the original capture format */
497        if (rt_set_format(libtrace, packet) < 0) {
498                return -1;
499        }
500
501        /* Update payload pointers and packet type to match the original
502         * format */
503        if (trace_prepare_packet(packet->trace, packet, packet->payload,
504                                packet->type, prep_flags)) {
505                return -1;
506        }
507
508        return 1;
509
510}
511
512/* Receives data from an RT server */
513static int rt_read(libtrace_t *libtrace, int block) {
514        int numbytes;
515
516        if (!RT_INFO->pkt_buffer) {
517                RT_INFO->pkt_buffer = (char*)malloc((size_t)RT_BUF_SIZE);
518                RT_INFO->buf_write = RT_INFO->pkt_buffer;
519                RT_INFO->buf_read = RT_INFO->pkt_buffer;
520                libtrace_create_new_bucket(RT_INFO->bucket, RT_INFO->pkt_buffer);
521        }
522
523#ifndef MSG_DONTWAIT
524#define MSG_DONTWAIT 0
525#endif
526#ifndef MSG_NOSIGNAL
527#  define MSG_NOSIGNAL 0
528#endif
529
530        if (block)
531                block=0;
532        else
533                block=MSG_DONTWAIT;
534
535        /* If the current buffer has plenty of space left, we can continue to
536         * read into it, otherwise create a new buffer and move anything in
537         * the old buffer over to it */
538        if (RT_INFO->buf_write - RT_INFO->pkt_buffer > RT_BUF_SIZE / 2) {
539                char *newbuf = (char*)malloc((size_t)RT_BUF_SIZE);
540
541                memcpy(newbuf, RT_INFO->buf_read, RT_INFO->buf_write - RT_INFO->buf_read);
542                RT_INFO->buf_write = newbuf + (RT_INFO->buf_write - RT_INFO->buf_read);
543                RT_INFO->buf_read = newbuf;
544                RT_INFO->pkt_buffer = newbuf;
545                libtrace_create_new_bucket(RT_INFO->bucket, newbuf);
546
547        }
548
549        /* Attempt to fill the buffer as much as we can */
550        if ((numbytes = recv(RT_INFO->input_fd, RT_INFO->buf_write,
551                        RT_BUF_SIZE - (RT_INFO->buf_write-RT_INFO->pkt_buffer),
552                        MSG_NOSIGNAL | block)) <= 0) {
553
554                if (numbytes == 0) {
555                        trace_set_err(libtrace, TRACE_ERR_RT_FAILURE,
556                                        "No data received by RT client");
557                        return -1;
558                }
559
560                if (errno == EINTR) {
561                        /* Ignore EINTR in case a caller is using signals */
562                        return 0;
563                }
564
565                if (errno == EAGAIN) {
566                        /* No data available and we are non-blocking */
567                        trace_set_err(libtrace, EAGAIN, "EAGAIN");
568                        return -1;
569                }
570
571                trace_set_err(libtrace, TRACE_ERR_RT_FAILURE,
572                                "Error reading from RT socket: %s",
573                                strerror(errno));
574                return -1;
575        }
576
577        RT_INFO->buf_write += numbytes;
578        return (RT_INFO->buf_write - RT_INFO->buf_read);
579
580}
581
582
583static int rt_get_next_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
584                int block) {
585
586        rt_header_t *rthdr;
587
588        if (packet->buffer && packet->buf_control == TRACE_CTRL_PACKET)
589                free(packet->buffer);
590
591        while (RT_INFO->buf_write - RT_INFO->buf_read <
592                                (uint32_t)sizeof(rt_header_t)) {
593                if (rt_read(libtrace, block) == -1)
594                        return -1;
595        }
596
597        rthdr = (rt_header_t *)RT_INFO->buf_read;
598
599        /* Check if we have enough payload */
600        while (RT_INFO->buf_write - (RT_INFO->buf_read + sizeof(rt_header_t))
601                        < ntohs(rthdr->length)) {
602                if (rt_read(libtrace, block) == -1)
603                        return -1;
604                rthdr = (rt_header_t *)RT_INFO->buf_read;
605        }
606
607
608        packet->buffer = RT_INFO->buf_read;
609        packet->header = RT_INFO->buf_read;
610        packet->type = ntohl(((rt_header_t *)packet->header)->type);
611        packet->payload = RT_INFO->buf_read + sizeof(rt_header_t);
612        packet->internalid = libtrace_push_into_bucket(RT_INFO->bucket);
613        assert(packet->internalid != 0);
614        packet->srcbucket = RT_INFO->bucket;
615        packet->buf_control = TRACE_CTRL_EXTERNAL;
616
617        RT_INFO->buf_read += ntohs(rthdr->length) + sizeof(rt_header_t);
618
619        if (packet->type >= TRACE_RT_DATA_SIMPLE) {
620                rt_process_data_packet(libtrace, packet);
621        } else {
622                switch(packet->type) {
623                        case TRACE_RT_DUCK_2_4:
624                        case TRACE_RT_DUCK_2_5:
625                        case TRACE_RT_STATUS:
626                        case TRACE_RT_METADATA:
627                                if (rt_process_data_packet(libtrace, packet) < 0)
628                                        return -1;
629                                break;
630                        case TRACE_RT_END_DATA:
631                        case TRACE_RT_KEYCHANGE:
632                        case TRACE_RT_LOSTCONN:
633                        case TRACE_RT_CLIENTDROP:
634                        case TRACE_RT_SERVERSTART:
635                                break;
636                        case TRACE_RT_PAUSE_ACK:
637                        case TRACE_RT_OPTION:
638                                break;
639                        default:
640                                fprintf(stderr, "Bad RT type for client: %d\n",
641                                                packet->type);
642                                return -1;
643                }
644        }
645
646        return ntohs(rthdr->length);
647
648}
649
650/* Shouldn't need to call this too often */
651static int rt_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
652                void *buffer, libtrace_rt_types_t rt_type, uint32_t flags) {
653
654        if (packet->buffer != buffer &&
655                        packet->buf_control == TRACE_CTRL_PACKET) {
656                free(packet->buffer);
657        }
658
659        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
660                packet->buf_control = TRACE_CTRL_PACKET;
661        } else
662                packet->buf_control = TRACE_CTRL_EXTERNAL;
663
664
665        packet->buffer = buffer;
666        packet->header = NULL;
667        packet->type = rt_type;
668        packet->payload = buffer;
669
670        if (libtrace->format_data == NULL) {
671                rt_init_format_data(libtrace);
672        }
673
674        return 0;
675}       
676
677/* Reads the next available packet in a blocking fashion */
678static int rt_read_packet(libtrace_t *libtrace,
679                libtrace_packet_t *packet) {
680        return rt_get_next_packet(libtrace,packet,1);
681}
682
683
684/* This should only get called for RT messages - RT-encapsulated data records
685 * should be converted to the appropriate capture format */
686static int rt_get_capture_length(const libtrace_packet_t *packet) {
687        rt_metadata_t *rt_md_hdr;
688        switch (packet->type) {
689                case TRACE_RT_STATUS:
690                        return sizeof(rt_status_t);
691                case TRACE_RT_HELLO:
692                        return sizeof(rt_hello_t);
693                case TRACE_RT_START:
694                        return 0;
695                case TRACE_RT_ACK:
696                        return sizeof(rt_ack_t);
697                case TRACE_RT_END_DATA:
698                        return 0;
699                case TRACE_RT_CLOSE:
700                        return 0;
701                case TRACE_RT_DENY_CONN:
702                        return sizeof(rt_deny_conn_t);
703                case TRACE_RT_PAUSE:
704                        return 0; 
705                case TRACE_RT_PAUSE_ACK:
706                        return 0;
707                case TRACE_RT_OPTION:
708                        return 0; /* FIXME */
709                case TRACE_RT_KEYCHANGE:
710                        return 0;
711                case TRACE_RT_LOSTCONN:
712                        return 0;
713                case TRACE_RT_SERVERSTART:
714                        return 0;
715                case TRACE_RT_CLIENTDROP:
716                        return 0;
717                case TRACE_RT_METADATA:
718                        /* This is a little trickier to work out */
719                        rt_md_hdr = (rt_metadata_t *)packet->buffer;
720                        return rt_md_hdr->label_len + rt_md_hdr->value_len + 
721                                sizeof(rt_metadata_t);
722                default:
723                        printf("Unknown type: %d\n", packet->type);
724                       
725        }
726        return 0;
727}
728
729/* RT messages do not have a wire length because they were not captured from
730 * the wire - they were generated by the capture process */
731static int rt_get_wire_length(UNUSED const libtrace_packet_t *packet) {
732        return 0;
733}
734
735/* Although RT messages do contain "framing", this framing is considered to be
736 * stripped as soon as the packet is read by the RT client */                   
737static int rt_get_framing_length(UNUSED const libtrace_packet_t *packet) {
738        return 0;
739}
740
741
742static libtrace_linktype_t rt_get_link_type(UNUSED const libtrace_packet_t *packet)
743{
744        /* RT messages don't have a link type */
745        return TRACE_TYPE_NONDATA;
746}
747
748static int rt_get_fd(const libtrace_t *trace) {
749        return ((struct rt_format_data_t *)trace->format_data)->input_fd;
750}
751
752static libtrace_eventobj_t trace_event_rt(libtrace_t *trace,
753                                        libtrace_packet_t *packet) 
754{
755        libtrace_eventobj_t event = {0,0,0.0,0};
756        libtrace_err_t read_err;
757
758        assert(trace);
759        assert(packet);
760       
761        if (trace->format->get_fd) {
762                event.fd = trace->format->get_fd(trace);
763        } else {
764                event.fd = 0;
765        }
766
767        do {
768
769                event.size = rt_get_next_packet(trace, packet, 0);
770                if (event.size == -1) {
771                        read_err = trace_get_err(trace);
772                        if (read_err.err_num == EAGAIN) {
773                                /* No data available - do an IOWAIT */
774                                event.type = TRACE_EVENT_IOWAIT;
775                        }
776                        else {
777                                trace_perror(trace, "Error doing a non-blocking read from rt");
778                                event.type = TRACE_EVENT_PACKET;
779                                break;
780                        }
781                } else if (event.size == 0) {
782                        /* RT gives us a specific indicator that there will be
783                         * no more packets. */
784                        if (packet->type == TRACE_RT_END_DATA)
785                                event.type = TRACE_EVENT_TERMINATE;
786                        else {
787                                /* Since several RT messages can have zero-byte
788                                 * length (once the framing is removed), an
789                                 * event size of zero can still indicate a
790                                 * PACKET event */
791                                event.type = TRACE_EVENT_PACKET;
792                                trace->accepted_packets ++;
793                        }
794
795                }       
796                else {
797                        event.type = TRACE_EVENT_PACKET;
798                        trace->accepted_packets ++;
799                }
800
801                if (trace->filter && event.type == TRACE_EVENT_PACKET) {
802                        if (!trace_apply_filter(trace->filter, packet)) {
803                                trace_clear_cache(packet);
804                                trace->filtered_packets ++;
805                                continue;
806                        }
807                }
808
809                break; 
810        } while (1);
811
812        return event;
813}
814
815static void rt_help(void) {
816        printf("rt format module\n");
817        printf("Supported input URIs:\n");
818        printf("\trt:hostname:port\n");
819        printf("\trt:hostname (connects on default port)\n");
820        printf("\n");
821        printf("\te.g.: rt:localhost\n");
822        printf("\te.g.: rt:localhost:32500\n");
823        printf("\n");
824
825}
826
827
828static struct libtrace_format_t rt = {
829        "rt",
830        "$Id$",
831        TRACE_FORMAT_RT,
832        NULL,                           /* probe filename */
833        NULL,                           /* probe magic */
834        rt_init_input,                  /* init_input */
835        NULL,                           /* config_input */
836        rt_start_input,                 /* start_input */
837        rt_pause_input,                 /* pause */
838        NULL,                           /* init_output */
839        NULL,                           /* config_output */
840        NULL,                           /* start_output */
841        rt_fin_input,                   /* fin_input */
842        NULL,                           /* fin_output */
843        rt_read_packet,                 /* read_packet */
844        rt_prepare_packet,              /* prepare_packet */
845        NULL,                           /* fin_packet */
846        NULL,                           /* write_packet */
847        NULL,                           /* flush_output */
848        rt_get_link_type,               /* get_link_type */
849        NULL,                           /* get_direction */
850        NULL,                           /* set_direction */
851        NULL,                           /* get_erf_timestamp */
852        NULL,                           /* get_timeval */
853        NULL,                           /* get_timespec */
854        NULL,                           /* get_seconds */
855        NULL,                           /* seek_erf */
856        NULL,                           /* seek_timeval */
857        NULL,                           /* seek_seconds */
858        rt_get_capture_length,          /* get_capture_length */
859        rt_get_wire_length,                     /* get_wire_length */
860        rt_get_framing_length,          /* get_framing_length */
861        NULL,                           /* set_capture_length */
862        NULL,                           /* get_received_packets */
863        NULL,                           /* get_filtered_packets */
864        NULL,                           /* get_dropped_packets */
865        NULL,                           /* get_statistics */
866        rt_get_fd,                      /* get_fd */
867        trace_event_rt,             /* trace_event */
868        rt_help,                        /* help */
869        NULL,                   /* next pointer */
870        NON_PARALLEL(true) /* This is normally live */
871};
872
873void rt_constructor(void) {
874        register_format(&rt);
875}
Note: See TracBrowser for help on using the repository browser.