source: lib/format_rt.c @ ef5ba20

develop
Last change on this file since ef5ba20 was ef5ba20, checked in by Jacob Van Walraven <jcv9@…>, 22 months ago

add abilty to get custom option from meta packets, add abilty to get entire section from meta packet, meta api now returns libtrace_meta_t structure

  • Property mode set to 100644
File size: 26.3 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27#define _GNU_SOURCE
28
29#include "config.h"
30#include "common.h"
31#include "libtrace.h"
32#include "libtrace_int.h"
33#include "format_helper.h"
34#include "rt_protocol.h"
35
36#include "data-struct/buckets.h"
37
38#include <sys/stat.h>
39#include <errno.h>
40#include <fcntl.h>
41#include <stdio.h>
42#include <string.h>
43#include <stdlib.h>
44#include <unistd.h>
45
46#ifndef WIN32
47# include <netdb.h>
48#endif
49
50#define RT_INFO ((struct rt_format_data_t*)libtrace->format_data)
51
52/* Convert the RT denial code into a nice printable and coherent string */
53static const char *rt_deny_reason(enum rt_conn_denied_t reason) 
54{
55        const char *string = 0;
56
57        switch(reason) {
58                case RT_DENY_WRAPPER:
59                        string = "Rejected by TCP Wrappers";
60                        break;
61                case RT_DENY_FULL:
62                        string = "Max connections reached on server";
63                        break;
64                case RT_DENY_AUTH:
65                        string = "Authentication failed";
66                        break;
67                default:
68                        string = "Unknown reason";
69        }
70
71        return string;
72}
73
74
75struct rt_format_data_t {
76        /* Name of the host to connect to */
77        char *hostname;
78        /* Buffer to store received packets into */
79        char *pkt_buffer;
80        /* Pointer to the next packet to be read from the buffer */
81        char *buf_read;
82        /* Pointer to the next unused byte in the buffer */
83        char *buf_write;
84        /* The port to connect to */
85        int port;
86        /* The file descriptor for the RT connection */
87        int input_fd;
88        /* Flag indicating whether the server is doing reliable RT */
89        int reliable;
90
91        int unacked;
92       
93        /* Dummy traces that can be assigned to the received packets to ensure
94         * that the appropriate functions can be used to process them */
95        libtrace_t *dummy_duck;
96        libtrace_t *dummy_erf;
97        libtrace_t *dummy_pcap;
98        libtrace_t *dummy_linux;
99        libtrace_t *dummy_ring;
100        libtrace_t *dummy_bpf;
101
102        /* Bucket structure for storing read packets until the user is
103         * done with them. */
104        libtrace_bucket_t *bucket;
105};
106
107/* Connects to an RT server
108 *
109 * Returns -1 if an error occurs
110 */
111static int rt_connect(libtrace_t *libtrace) {
112        struct hostent *he;
113        struct sockaddr_in remote;
114        rt_header_t connect_msg;
115        rt_deny_conn_t deny_hdr;       
116        rt_hello_t hello_opts;
117        uint8_t reason;
118       
119        if ((he=gethostbyname(RT_INFO->hostname)) == NULL) {
120                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
121                                "Failed to convert hostname %s to address",
122                                RT_INFO->hostname);
123                return -1;
124        }
125        if ((RT_INFO->input_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
126                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
127                                "Could not create socket");
128                return -1;
129        }
130
131        memset(&remote,0, sizeof(remote));
132        remote.sin_family = AF_INET;
133        remote.sin_port = htons(RT_INFO->port);
134        remote.sin_addr = *((struct in_addr *)he->h_addr);
135
136        if (connect(RT_INFO->input_fd, (struct sockaddr *)&remote,
137                                (socklen_t)sizeof(struct sockaddr)) == -1) {
138                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
139                                "Could not connect to host %s on port %d",
140                                RT_INFO->hostname, RT_INFO->port);
141                return -1;
142        }
143
144        /* We are connected, now receive message from server */
145       
146        if (recv(RT_INFO->input_fd, (void*)&connect_msg, sizeof(rt_header_t), 0) != sizeof(rt_header_t) ) {
147                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
148                                "Could not receive connection message from %s",
149                                RT_INFO->hostname);
150                return -1;
151        }
152
153        if (connect_msg.magic != LIBTRACE_RT_MAGIC) {
154                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
155                        "RT version mismatch: magic byte is incorrect");
156                return -1;
157        }
158
159        if (connect_msg.version != LIBTRACE_RT_VERSION) {
160                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
161                        "RT version mismatch: version is incorrect (expected %d, got %d",
162                        LIBTRACE_RT_VERSION, connect_msg.version);
163                return -1;
164        }
165
166               
167       
168        switch (ntohl(connect_msg.type)) {
169                case TRACE_RT_DENY_CONN:
170                        /* Connection was denied */
171                       
172                        if (recv(RT_INFO->input_fd, (void*)&deny_hdr, 
173                                                sizeof(rt_deny_conn_t),
174                                                0) != sizeof(rt_deny_conn_t)) {
175                                reason = 0;
176                        }       
177                        reason = ntohl(deny_hdr.reason);
178                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
179                                "Connection attempt is denied: %s",
180                                rt_deny_reason(reason));       
181                        return -1;
182                case TRACE_RT_HELLO:
183                        /* Hello message - read the options sent to us by the
184                         * server */
185                        if (recv(RT_INFO->input_fd, (void*)&hello_opts, 
186                                                sizeof(rt_hello_t), 0)
187                                        != sizeof(rt_hello_t)) {
188                                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
189                                        "Failed to receive TRACE_RT_HELLO options");
190                                return -1;
191                        }
192                       
193                       
194                        RT_INFO->reliable = hello_opts.reliable;
195                        RT_INFO->unacked = 0;
196                        return 0;
197                default:
198                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
199                                        "Unknown message type received: %d",
200                                        connect_msg.type);
201                        return -1;
202        }
203        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
204                        "Somehow you managed to reach this unreachable code");
205        return -1;
206}
207
208static void rt_init_format_data(libtrace_t *libtrace) {
209        libtrace->format_data = malloc(sizeof(struct rt_format_data_t));
210
211        RT_INFO->dummy_duck = NULL;
212        RT_INFO->dummy_erf = NULL;
213        RT_INFO->dummy_pcap = NULL;
214        RT_INFO->dummy_linux = NULL;
215        RT_INFO->dummy_ring = NULL;
216        RT_INFO->dummy_bpf = NULL;
217        RT_INFO->pkt_buffer = NULL;
218        RT_INFO->buf_read = NULL;
219        RT_INFO->buf_write = NULL;
220        RT_INFO->hostname = NULL;
221        RT_INFO->port = 0;
222        RT_INFO->unacked = 0;
223
224        RT_INFO->bucket = libtrace_bucket_init();
225}
226
227static int rt_init_input(libtrace_t *libtrace) {
228        char *scan;
229        char *uridata = libtrace->uridata;
230
231        rt_init_format_data(libtrace);
232
233        /* If the user specifies "rt:" then assume localhost and the default
234         * port */
235        if (strlen(uridata) == 0) {
236                RT_INFO->hostname =
237                        strdup("localhost");
238                RT_INFO->port =
239                        COLLECTOR_PORT;
240        } else {
241                /* If the user does not specify a port, assume the default
242                 * port */
243                if ((scan = strchr(uridata,':')) == NULL) {
244                        RT_INFO->hostname =
245                                strdup(uridata);
246                        RT_INFO->port =
247                                COLLECTOR_PORT;
248                } else {
249                        RT_INFO->hostname =
250                                (char *)strndup(uridata,
251                                                (size_t)(scan - uridata));
252                        RT_INFO->port =
253                                atoi(++scan);
254                }
255        }
256
257        return 0;
258}
259       
260static int rt_start_input(libtrace_t *libtrace) {
261        rt_header_t start_msg;
262
263        start_msg.type = htonl(TRACE_RT_START);
264        start_msg.length = 0;
265        start_msg.sequence = 0;
266        start_msg.version = LIBTRACE_RT_VERSION;
267        start_msg.magic = LIBTRACE_RT_MAGIC; 
268
269        if (rt_connect(libtrace) == -1)
270                return -1;
271       
272        /* Need to send start message to server */
273        if (send(RT_INFO->input_fd, (void*)&start_msg, sizeof(rt_header_t) +
274                                start_msg.length, 0) != sizeof(rt_header_t)) {
275                printf("Failed to send start message to server\n");
276                return -1;
277        }
278
279        return 0;
280}
281
282static int rt_pause_input(libtrace_t *libtrace) {
283        rt_header_t close_msg;
284
285        close_msg.type = htonl(TRACE_RT_CLOSE);
286        close_msg.length = 0; 
287       
288        /* Send a close message to the server */
289        if (send(RT_INFO->input_fd, (void*)&close_msg, sizeof(rt_header_t) + 
290                                close_msg.length, 0) != (int)sizeof(rt_header_t)
291                                + close_msg.length) {
292                printf("Failed to send close message to server\n");
293       
294        }
295
296        close(RT_INFO->input_fd);
297        return 0;
298}
299
300static int rt_fin_input(libtrace_t *libtrace) {
301        /* Make sure we clean up any dummy traces that we have been using */
302
303        if (RT_INFO->dummy_duck)
304                trace_destroy_dead(RT_INFO->dummy_duck);
305
306        if (RT_INFO->dummy_erf)
307                trace_destroy_dead(RT_INFO->dummy_erf);
308
309        if (RT_INFO->dummy_pcap)
310                trace_destroy_dead(RT_INFO->dummy_pcap);
311
312        if (RT_INFO->dummy_linux)
313                trace_destroy_dead(RT_INFO->dummy_linux);
314
315        if (RT_INFO->dummy_ring)
316                trace_destroy_dead(RT_INFO->dummy_ring);
317
318        if (RT_INFO->dummy_bpf)
319                trace_destroy_dead(RT_INFO->dummy_bpf);
320
321        if (RT_INFO->bucket)
322                libtrace_bucket_destroy(RT_INFO->bucket);
323        free(libtrace->format_data);
324        return 0;
325}
326
327/* Sends an RT ACK to the server to acknowledge receipt of packets */
328static int rt_send_ack(libtrace_t *libtrace, 
329                uint32_t seqno)  {
330       
331        static char *ack_buffer = 0;
332        char *buf_ptr;
333        int numbytes = 0;
334        size_t to_write = 0;
335        rt_header_t *hdr;
336        rt_ack_t *ack_hdr;
337       
338        if (!ack_buffer) {
339                ack_buffer = (char*)malloc(sizeof(rt_header_t) 
340                                                        + sizeof(rt_ack_t));
341        }
342       
343        hdr = (rt_header_t *) ack_buffer;
344        ack_hdr = (rt_ack_t *) (ack_buffer + sizeof(rt_header_t));
345       
346        hdr->type = htonl(TRACE_RT_ACK);
347        hdr->length = htons(sizeof(rt_ack_t));
348
349        ack_hdr->sequence = htonl(seqno);
350       
351        to_write = sizeof(rt_ack_t) + sizeof(rt_header_t);
352        buf_ptr = ack_buffer;
353
354        /* Keep trying until we write the entire ACK */
355        while (to_write > 0) {
356                numbytes = send(RT_INFO->input_fd, buf_ptr, to_write, 0); 
357                if (numbytes == -1) {
358                        if (errno == EINTR || errno == EAGAIN) {
359                                continue;
360                        }
361                        else {
362                                printf("Error sending ack\n");
363                                perror("send");
364                                trace_set_err(libtrace, TRACE_ERR_RT_FAILURE, 
365                                                "Error sending ack");
366                                return -1;
367                        }
368                }
369                to_write = to_write - numbytes;
370                buf_ptr = buf_ptr + to_write;
371               
372        }
373
374        return 1;
375}
376
377/* Sets the trace format for the packet to match the format it was originally
378 * captured in, rather than the RT format */
379static int rt_set_format(libtrace_t *libtrace, libtrace_packet_t *packet) 
380{
381
382        /* We need to assign the packet to a "dead" trace */
383
384        /* Try to minimize the number of corrupt packets that slip through
385         * while making it easy to identify new pcap DLTs */
386        if (packet->type > TRACE_RT_DATA_DLT && 
387                        packet->type < TRACE_RT_DATA_DLT_END) {
388                if (!RT_INFO->dummy_pcap) {
389                        RT_INFO->dummy_pcap = trace_create_dead("pcap:-");
390                }
391                packet->trace = RT_INFO->dummy_pcap;
392                return 0;       
393        }
394
395        if (packet->type > TRACE_RT_DATA_BPF &&
396                        packet->type < TRACE_RT_DATA_BPF_END) {
397
398                if (!RT_INFO->dummy_bpf) {
399                        RT_INFO->dummy_bpf = trace_create_dead("bpf:-");
400                        /* This may fail on a non-BSD machine */
401                        if (trace_is_err(RT_INFO->dummy_bpf)) {
402                                trace_perror(RT_INFO->dummy_bpf, "Creating dead bpf trace");
403                                return -1;
404                        }
405                }
406                packet->trace = RT_INFO->dummy_bpf;
407                return 0;
408        }
409
410        switch (packet->type) {
411                case TRACE_RT_DUCK_2_4:
412                case TRACE_RT_DUCK_2_5:
413                case TRACE_RT_DUCK_5_0:
414                        if (!RT_INFO->dummy_duck) {
415                                RT_INFO->dummy_duck = trace_create_dead("duck:dummy");
416                        }
417                        packet->trace = RT_INFO->dummy_duck;
418                        break;
419                case TRACE_RT_DATA_ERF:
420                        if (!RT_INFO->dummy_erf) {
421                                RT_INFO->dummy_erf = trace_create_dead("erf:-");
422                        }
423                        packet->trace = RT_INFO->dummy_erf;
424                        break;
425                case TRACE_RT_DATA_LINUX_NATIVE:
426                        if (!RT_INFO->dummy_linux) {
427                                RT_INFO->dummy_linux = trace_create_dead("int:");
428                                /* This may fail on a non-Linux machine */
429                                if (trace_is_err(RT_INFO->dummy_linux)) {
430                                        trace_perror(RT_INFO->dummy_linux, "Creating dead int trace");
431                                        return -1;
432                                }
433                        }
434                        packet->trace = RT_INFO->dummy_linux;
435                        break;
436                case TRACE_RT_DATA_LINUX_RING:
437                        if (!RT_INFO->dummy_ring) {
438                                RT_INFO->dummy_ring = trace_create_dead("ring:");
439                                /* This may fail on a non-Linux machine */
440                                if (trace_is_err(RT_INFO->dummy_ring)) {
441                                        trace_perror(RT_INFO->dummy_ring, "Creating dead ring trace");
442                                        return -1;
443                                }
444                        }
445                        packet->trace = RT_INFO->dummy_ring;
446                        break;
447                case TRACE_RT_STATUS:
448                case TRACE_RT_METADATA:
449                        /* Just use the RT trace! */
450                        packet->trace = libtrace;
451                        break;
452                case TRACE_RT_DATA_LEGACY_ETH:
453                case TRACE_RT_DATA_LEGACY_ATM:
454                case TRACE_RT_DATA_LEGACY_POS:
455                        printf("Sending legacy over RT is currently not supported\n");
456                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, "Legacy packet cannot be sent over rt");
457                        return -1;
458                default:
459                        printf("Unrecognised format: %u\n", packet->type);
460                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, "Unrecognised packet format");
461                        return -1;
462        }
463        return 0; /* success */
464}               
465
466
467/* I've upped this to 10K to deal with jumbo-grams that have not been snapped
468 * in any way. This means we have a much larger memory overhead per packet
469 * (which won't be used in the vast majority of cases), so we may want to think
470 * about doing something smarter, e.g. allocate a smaller block of memory and
471 * only increase it as required.
472 *
473 * XXX Capturing off int: can still lead to packets that are larger than 10K,
474 * in instances where the fragmentation is done magically by the NIC. This
475 * is pretty nasty, but also very rare.
476 */
477#define RT_BUF_SIZE (LIBTRACE_PACKET_BUFSIZE * 2)
478
479static int rt_process_data_packet(libtrace_t *libtrace,
480                libtrace_packet_t *packet) {
481
482        uint32_t prep_flags = TRACE_PREP_DO_NOT_OWN_BUFFER;
483        rt_header_t *hdr = (rt_header_t *)packet->header;
484
485        /* Send an ACK if required */
486        if (RT_INFO->reliable > 0 && packet->type >= TRACE_RT_DATA_SIMPLE) {
487                RT_INFO->unacked ++;
488                if (RT_INFO->unacked >= RT_ACK_FREQUENCY) {
489                        if (rt_send_ack(libtrace, hdr->sequence) == -1)
490                                return -1;
491                        RT_INFO->unacked = 0;
492                }
493        }
494
495        /* Convert to the original capture format */
496        if (rt_set_format(libtrace, packet) < 0) {
497                return -1;
498        }
499
500        /* Update payload pointers and packet type to match the original
501         * format */
502        if (trace_prepare_packet(packet->trace, packet, packet->payload,
503                                packet->type, prep_flags)) {
504                return -1;
505        }
506
507        return 1;
508
509}
510
511/* Receives data from an RT server */
512static int rt_read(libtrace_t *libtrace, int block) {
513        int numbytes;
514
515        if (!RT_INFO->pkt_buffer) {
516                RT_INFO->pkt_buffer = (char*)malloc((size_t)RT_BUF_SIZE);
517                RT_INFO->buf_write = RT_INFO->pkt_buffer;
518                RT_INFO->buf_read = RT_INFO->pkt_buffer;
519                libtrace_create_new_bucket(RT_INFO->bucket, RT_INFO->pkt_buffer);
520        }
521
522#ifndef MSG_DONTWAIT
523#define MSG_DONTWAIT 0
524#endif
525#ifndef MSG_NOSIGNAL
526#  define MSG_NOSIGNAL 0
527#endif
528
529        if (block)
530                block=0;
531        else
532                block=MSG_DONTWAIT;
533
534        /* If the current buffer has plenty of space left, we can continue to
535         * read into it, otherwise create a new buffer and move anything in
536         * the old buffer over to it */
537        if (RT_INFO->buf_write - RT_INFO->pkt_buffer > RT_BUF_SIZE / 2) {
538                char *newbuf = (char*)malloc((size_t)RT_BUF_SIZE);
539
540                memcpy(newbuf, RT_INFO->buf_read, RT_INFO->buf_write - RT_INFO->buf_read);
541                RT_INFO->buf_write = newbuf + (RT_INFO->buf_write - RT_INFO->buf_read);
542                RT_INFO->buf_read = newbuf;
543                RT_INFO->pkt_buffer = newbuf;
544                libtrace_create_new_bucket(RT_INFO->bucket, newbuf);
545
546        }
547
548        /* Attempt to fill the buffer as much as we can */
549        if ((numbytes = recv(RT_INFO->input_fd, RT_INFO->buf_write,
550                        RT_BUF_SIZE - (RT_INFO->buf_write-RT_INFO->pkt_buffer),
551                        MSG_NOSIGNAL | block)) <= 0) {
552
553                if (numbytes == 0) {
554                        trace_set_err(libtrace, TRACE_ERR_RT_FAILURE,
555                                        "No data received by RT client");
556                        return -1;
557                }
558
559                if (errno == EINTR) {
560                        /* Ignore EINTR in case a caller is using signals */
561                        return 0;
562                }
563
564                if (errno == EAGAIN) {
565                        /* No data available and we are non-blocking */
566                        trace_set_err(libtrace, EAGAIN, "EAGAIN");
567                        return -1;
568                }
569
570                trace_set_err(libtrace, TRACE_ERR_RT_FAILURE,
571                                "Error reading from RT socket: %s",
572                                strerror(errno));
573                return -1;
574        }
575
576        RT_INFO->buf_write += numbytes;
577        return (RT_INFO->buf_write - RT_INFO->buf_read);
578
579}
580
581
582static int rt_get_next_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
583                int block) {
584
585        rt_header_t *rthdr;
586
587        if (packet->buffer && packet->buf_control == TRACE_CTRL_PACKET)
588                free(packet->buffer);
589
590        while (RT_INFO->buf_write - RT_INFO->buf_read <
591                                (uint32_t)sizeof(rt_header_t)) {
592                if (rt_read(libtrace, block) == -1)
593                        return -1;
594        }
595
596        rthdr = (rt_header_t *)RT_INFO->buf_read;
597
598        /* Check if we have enough payload */
599        while (RT_INFO->buf_write - (RT_INFO->buf_read + sizeof(rt_header_t))
600                        < ntohs(rthdr->length)) {
601                if (rt_read(libtrace, block) == -1)
602                        return -1;
603                rthdr = (rt_header_t *)RT_INFO->buf_read;
604        }
605
606
607        packet->buffer = RT_INFO->buf_read;
608        packet->header = RT_INFO->buf_read;
609        packet->type = ntohl(((rt_header_t *)packet->header)->type);
610        packet->payload = RT_INFO->buf_read + sizeof(rt_header_t);
611        packet->internalid = libtrace_push_into_bucket(RT_INFO->bucket);
612        if (!packet->internalid) {
613                trace_set_err(libtrace, TRACE_ERR_RT_FAILURE, "packet->internalid is 0 in rt_get_next_packet()");
614                return -1;
615        }
616        packet->srcbucket = RT_INFO->bucket;
617        packet->buf_control = TRACE_CTRL_EXTERNAL;
618
619        RT_INFO->buf_read += ntohs(rthdr->length) + sizeof(rt_header_t);
620
621        if (packet->type >= TRACE_RT_DATA_SIMPLE) {
622                rt_process_data_packet(libtrace, packet);
623        } else {
624                switch(packet->type) {
625                        case TRACE_RT_DUCK_2_4:
626                        case TRACE_RT_DUCK_2_5:
627                        case TRACE_RT_STATUS:
628                        case TRACE_RT_METADATA:
629                                if (rt_process_data_packet(libtrace, packet) < 0)
630                                        return -1;
631                                break;
632                        case TRACE_RT_END_DATA:
633                        case TRACE_RT_KEYCHANGE:
634                        case TRACE_RT_LOSTCONN:
635                        case TRACE_RT_CLIENTDROP:
636                        case TRACE_RT_SERVERSTART:
637                                break;
638                        case TRACE_RT_PAUSE_ACK:
639                        case TRACE_RT_OPTION:
640                                break;
641                        default:
642                                fprintf(stderr, "Bad RT type for client: %d\n",
643                                                packet->type);
644                                return -1;
645                }
646        }
647
648        return ntohs(rthdr->length);
649
650}
651
652/* Shouldn't need to call this too often */
653static int rt_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
654                void *buffer, libtrace_rt_types_t rt_type, uint32_t flags) {
655
656        if (packet->buffer != buffer &&
657                        packet->buf_control == TRACE_CTRL_PACKET) {
658                free(packet->buffer);
659        }
660
661        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
662                packet->buf_control = TRACE_CTRL_PACKET;
663        } else
664                packet->buf_control = TRACE_CTRL_EXTERNAL;
665
666
667        packet->buffer = buffer;
668        packet->header = NULL;
669        packet->type = rt_type;
670        packet->payload = buffer;
671
672        if (libtrace->format_data == NULL) {
673                rt_init_format_data(libtrace);
674        }
675
676        return 0;
677}       
678
679/* Reads the next available packet in a blocking fashion */
680static int rt_read_packet(libtrace_t *libtrace,
681                libtrace_packet_t *packet) {
682        return rt_get_next_packet(libtrace,packet,1);
683}
684
685
686/* This should only get called for RT messages - RT-encapsulated data records
687 * should be converted to the appropriate capture format */
688static int rt_get_capture_length(const libtrace_packet_t *packet) {
689        rt_metadata_t *rt_md_hdr;
690        switch (packet->type) {
691                case TRACE_RT_STATUS:
692                        return sizeof(rt_status_t);
693                case TRACE_RT_HELLO:
694                        return sizeof(rt_hello_t);
695                case TRACE_RT_START:
696                        return 0;
697                case TRACE_RT_ACK:
698                        return sizeof(rt_ack_t);
699                case TRACE_RT_END_DATA:
700                        return 0;
701                case TRACE_RT_CLOSE:
702                        return 0;
703                case TRACE_RT_DENY_CONN:
704                        return sizeof(rt_deny_conn_t);
705                case TRACE_RT_PAUSE:
706                        return 0; 
707                case TRACE_RT_PAUSE_ACK:
708                        return 0;
709                case TRACE_RT_OPTION:
710                        return 0; /* FIXME */
711                case TRACE_RT_KEYCHANGE:
712                        return 0;
713                case TRACE_RT_LOSTCONN:
714                        return 0;
715                case TRACE_RT_SERVERSTART:
716                        return 0;
717                case TRACE_RT_CLIENTDROP:
718                        return 0;
719                case TRACE_RT_METADATA:
720                        /* This is a little trickier to work out */
721                        rt_md_hdr = (rt_metadata_t *)packet->buffer;
722                        return rt_md_hdr->label_len + rt_md_hdr->value_len + 
723                                sizeof(rt_metadata_t);
724                default:
725                        printf("Unknown type: %d\n", packet->type);
726                       
727        }
728        return 0;
729}
730
731/* RT messages do not have a wire length because they were not captured from
732 * the wire - they were generated by the capture process */
733static int rt_get_wire_length(UNUSED const libtrace_packet_t *packet) {
734        return 0;
735}
736
737/* Although RT messages do contain "framing", this framing is considered to be
738 * stripped as soon as the packet is read by the RT client */                   
739static int rt_get_framing_length(UNUSED const libtrace_packet_t *packet) {
740        return 0;
741}
742
743
744static libtrace_linktype_t rt_get_link_type(UNUSED const libtrace_packet_t *packet)
745{
746        /* RT messages don't have a link type */
747        return TRACE_TYPE_NONDATA;
748}
749
750static int rt_get_fd(const libtrace_t *trace) {
751        return ((struct rt_format_data_t *)trace->format_data)->input_fd;
752}
753
754static libtrace_eventobj_t trace_event_rt(libtrace_t *trace,
755                                        libtrace_packet_t *packet) 
756{
757        libtrace_eventobj_t event = {0,0,0.0,0};
758        libtrace_err_t read_err;
759
760        if (!trace) {
761                fprintf(stderr, "NULL trace passed into trace_event_rt()\n");
762                /* Return empty event on error? */
763                return event;
764        }
765        if (!packet) {
766                trace_set_err(trace, TRACE_ERR_NULL_PACKET, "NULL packet passed into trace_event_rt()");
767                /* Return empty event on error? */
768                return event;
769        }
770
771        if (trace->format->get_fd) {
772                event.fd = trace->format->get_fd(trace);
773        } else {
774                event.fd = 0;
775        }
776
777        do {
778
779                event.size = rt_get_next_packet(trace, packet, 0);
780                if (event.size == -1) {
781                        read_err = trace_get_err(trace);
782                        if (read_err.err_num == EAGAIN) {
783                                /* No data available - do an IOWAIT */
784                                event.type = TRACE_EVENT_IOWAIT;
785                        }
786                        else {
787                                trace_perror(trace, "Error doing a non-blocking read from rt");
788                                event.type = TRACE_EVENT_PACKET;
789                                break;
790                        }
791                } else if (event.size == 0) {
792                        /* RT gives us a specific indicator that there will be
793                         * no more packets. */
794                        if (packet->type == TRACE_RT_END_DATA)
795                                event.type = TRACE_EVENT_TERMINATE;
796                        else {
797                                /* Since several RT messages can have zero-byte
798                                 * length (once the framing is removed), an
799                                 * event size of zero can still indicate a
800                                 * PACKET event */
801                                event.type = TRACE_EVENT_PACKET;
802                                trace->accepted_packets ++;
803                        }
804
805                }       
806                else {
807                        event.type = TRACE_EVENT_PACKET;
808                        trace->accepted_packets ++;
809                }
810
811                if (trace->filter && event.type == TRACE_EVENT_PACKET) {
812                        if (!trace_apply_filter(trace->filter, packet)) {
813                                trace_clear_cache(packet);
814                                trace->filtered_packets ++;
815                                continue;
816                        }
817                }
818
819                break; 
820        } while (1);
821
822        return event;
823}
824
825static void rt_help(void) {
826        printf("rt format module\n");
827        printf("Supported input URIs:\n");
828        printf("\trt:hostname:port\n");
829        printf("\trt:hostname (connects on default port)\n");
830        printf("\n");
831        printf("\te.g.: rt:localhost\n");
832        printf("\te.g.: rt:localhost:32500\n");
833        printf("\n");
834
835}
836
837
838static struct libtrace_format_t rt = {
839        "rt",
840        "$Id$",
841        TRACE_FORMAT_RT,
842        NULL,                           /* probe filename */
843        NULL,                           /* probe magic */
844        rt_init_input,                  /* init_input */
845        NULL,                           /* config_input */
846        rt_start_input,                 /* start_input */
847        rt_pause_input,                 /* pause */
848        NULL,                           /* init_output */
849        NULL,                           /* config_output */
850        NULL,                           /* start_output */
851        rt_fin_input,                   /* fin_input */
852        NULL,                           /* fin_output */
853        rt_read_packet,                 /* read_packet */
854        rt_prepare_packet,              /* prepare_packet */
855        NULL,                           /* fin_packet */
856        NULL,                           /* write_packet */
857        NULL,                           /* flush_output */
858        rt_get_link_type,               /* get_link_type */
859        NULL,                           /* get_direction */
860        NULL,                           /* set_direction */
861        NULL,                           /* get_erf_timestamp */
862        NULL,                           /* get_timeval */
863        NULL,                           /* get_timespec */
864        NULL,                           /* get_seconds */
865        NULL,                           /* get_meta_section */
866        NULL,                           /* get_meta_section_item */
867        NULL,                           /* seek_erf */
868        NULL,                           /* seek_timeval */
869        NULL,                           /* seek_seconds */
870        rt_get_capture_length,          /* get_capture_length */
871        rt_get_wire_length,                     /* get_wire_length */
872        rt_get_framing_length,          /* get_framing_length */
873        NULL,                           /* set_capture_length */
874        NULL,                           /* get_received_packets */
875        NULL,                           /* get_filtered_packets */
876        NULL,                           /* get_dropped_packets */
877        NULL,                           /* get_statistics */
878        rt_get_fd,                      /* get_fd */
879        trace_event_rt,             /* trace_event */
880        rt_help,                        /* help */
881        NULL,                   /* next pointer */
882        NON_PARALLEL(true) /* This is normally live */
883};
884
885void rt_constructor(void) {
886        register_format(&rt);
887}
Note: See TracBrowser for help on using the repository browser.