source: lib/format_rt.c @ 756b8f9

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivendag_formatrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 756b8f9 was a984307, checked in by Shane Alcock <salcock@…>, 5 years ago

Merge remote-tracking branch 'origin/develop' into libtrace4

Conflicts:

INSTALL
README
lib/format_dpdk.c
lib/trace.c
tools/tracesplit/tracesplit.c

  • Property mode set to 100644
File size: 25.9 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007-2015 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Authors: Daniel Lawson
8 *          Perry Lorier
9 *          Shane Alcock
10 *         
11 * All rights reserved.
12 *
13 * This code has been developed by the University of Waikato WAND
14 * research group. For further information please see http://www.wand.net.nz/
15 *
16 * libtrace is free software; you can redistribute it and/or modify
17 * it under the terms of the GNU General Public License as published by
18 * the Free Software Foundation; either version 2 of the License, or
19 * (at your option) any later version.
20 *
21 * libtrace is distributed in the hope that it will be useful,
22 * but WITHOUT ANY WARRANTY; without even the implied warranty of
23 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24 * GNU General Public License for more details.
25 *
26 * You should have received a copy of the GNU General Public License
27 * along with libtrace; if not, write to the Free Software
28 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
29 *
30 * $Id$
31 *
32 */
33
34
35#define _GNU_SOURCE
36
37#include "config.h"
38#include "common.h"
39#include "libtrace.h"
40#include "libtrace_int.h"
41#include "format_helper.h"
42#include "rt_protocol.h"
43
44#include "data-struct/buckets.h"
45
46#include <sys/stat.h>
47#include <assert.h>
48#include <errno.h>
49#include <fcntl.h>
50#include <stdio.h>
51#include <string.h>
52#include <stdlib.h>
53#include <unistd.h>
54
55#ifndef WIN32
56# include <netdb.h>
57#endif
58
59#define RT_INFO ((struct rt_format_data_t*)libtrace->format_data)
60
61/* Convert the RT denial code into a nice printable and coherent string */
62static const char *rt_deny_reason(enum rt_conn_denied_t reason) 
63{
64        const char *string = 0;
65
66        switch(reason) {
67                case RT_DENY_WRAPPER:
68                        string = "Rejected by TCP Wrappers";
69                        break;
70                case RT_DENY_FULL:
71                        string = "Max connections reached on server";
72                        break;
73                case RT_DENY_AUTH:
74                        string = "Authentication failed";
75                        break;
76                default:
77                        string = "Unknown reason";
78        }
79
80        return string;
81}
82
83
84struct rt_format_data_t {
85        /* Name of the host to connect to */
86        char *hostname;
87        /* Buffer to store received packets into */
88        char *pkt_buffer;
89        /* Pointer to the next packet to be read from the buffer */
90        char *buf_read;
91        /* Pointer to the next unused byte in the buffer */
92        char *buf_write;
93        /* The port to connect to */
94        int port;
95        /* The file descriptor for the RT connection */
96        int input_fd;
97        /* Flag indicating whether the server is doing reliable RT */
98        int reliable;
99
100        int unacked;
101       
102        /* Dummy traces that can be assigned to the received packets to ensure
103         * that the appropriate functions can be used to process them */
104        libtrace_t *dummy_duck;
105        libtrace_t *dummy_erf;
106        libtrace_t *dummy_pcap;
107        libtrace_t *dummy_linux;
108        libtrace_t *dummy_ring;
109        libtrace_t *dummy_bpf;
110
111        /* Bucket structure for storing read packets until the user is
112         * done with them. */
113        libtrace_bucket_t *bucket;
114};
115
116/* Connects to an RT server
117 *
118 * Returns -1 if an error occurs
119 */
120static int rt_connect(libtrace_t *libtrace) {
121        struct hostent *he;
122        struct sockaddr_in remote;
123        rt_header_t connect_msg;
124        rt_deny_conn_t deny_hdr;       
125        rt_hello_t hello_opts;
126        uint8_t reason;
127       
128        if ((he=gethostbyname(RT_INFO->hostname)) == NULL) {
129                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
130                                "Failed to convert hostname %s to address",
131                                RT_INFO->hostname);
132                return -1;
133        }
134        if ((RT_INFO->input_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
135                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
136                                "Could not create socket");
137                return -1;
138        }
139
140        memset(&remote,0, sizeof(remote));
141        remote.sin_family = AF_INET;
142        remote.sin_port = htons(RT_INFO->port);
143        remote.sin_addr = *((struct in_addr *)he->h_addr);
144
145        if (connect(RT_INFO->input_fd, (struct sockaddr *)&remote,
146                                (socklen_t)sizeof(struct sockaddr)) == -1) {
147                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
148                                "Could not connect to host %s on port %d",
149                                RT_INFO->hostname, RT_INFO->port);
150                return -1;
151        }
152
153        /* We are connected, now receive message from server */
154       
155        if (recv(RT_INFO->input_fd, (void*)&connect_msg, sizeof(rt_header_t), 0) != sizeof(rt_header_t) ) {
156                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
157                                "Could not receive connection message from %s",
158                                RT_INFO->hostname);
159                return -1;
160        }
161
162        if (connect_msg.magic != LIBTRACE_RT_MAGIC) {
163                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
164                        "RT version mismatch: magic byte is incorrect");
165                return -1;
166        }
167
168        if (connect_msg.version != LIBTRACE_RT_VERSION) {
169                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
170                        "RT version mismatch: version is incorrect (expected %d, got %d",
171                        LIBTRACE_RT_VERSION, connect_msg.version);
172                return -1;
173        }
174
175               
176       
177        switch (ntohl(connect_msg.type)) {
178                case TRACE_RT_DENY_CONN:
179                        /* Connection was denied */
180                       
181                        if (recv(RT_INFO->input_fd, (void*)&deny_hdr, 
182                                                sizeof(rt_deny_conn_t),
183                                                0) != sizeof(rt_deny_conn_t)) {
184                                reason = 0;
185                        }       
186                        reason = ntohl(deny_hdr.reason);
187                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
188                                "Connection attempt is denied: %s",
189                                rt_deny_reason(reason));       
190                        return -1;
191                case TRACE_RT_HELLO:
192                        /* Hello message - read the options sent to us by the
193                         * server */
194                        if (recv(RT_INFO->input_fd, (void*)&hello_opts, 
195                                                sizeof(rt_hello_t), 0)
196                                        != sizeof(rt_hello_t)) {
197                                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
198                                        "Failed to receive TRACE_RT_HELLO options");
199                                return -1;
200                        }
201                       
202                       
203                        RT_INFO->reliable = hello_opts.reliable;
204                        RT_INFO->unacked = 0;
205                        return 0;
206                default:
207                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
208                                        "Unknown message type received: %d",
209                                        connect_msg.type);
210                        return -1;
211        }
212        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
213                        "Somehow you managed to reach this unreachable code");
214        return -1;
215}
216
217static void rt_init_format_data(libtrace_t *libtrace) {
218        libtrace->format_data = malloc(sizeof(struct rt_format_data_t));
219
220        RT_INFO->dummy_duck = NULL;
221        RT_INFO->dummy_erf = NULL;
222        RT_INFO->dummy_pcap = NULL;
223        RT_INFO->dummy_linux = NULL;
224        RT_INFO->dummy_ring = NULL;
225        RT_INFO->dummy_bpf = NULL;
226        RT_INFO->pkt_buffer = NULL;
227        RT_INFO->buf_read = NULL;
228        RT_INFO->buf_write = NULL;
229        RT_INFO->hostname = NULL;
230        RT_INFO->port = 0;
231        RT_INFO->unacked = 0;
232
233        RT_INFO->bucket = libtrace_bucket_init();
234}
235
236static int rt_init_input(libtrace_t *libtrace) {
237        char *scan;
238        char *uridata = libtrace->uridata;
239
240        rt_init_format_data(libtrace);
241
242        /* If the user specifies "rt:" then assume localhost and the default
243         * port */     
244        if (strlen(uridata) == 0) {
245                RT_INFO->hostname =
246                        strdup("localhost");
247                RT_INFO->port =
248                        COLLECTOR_PORT;
249        } else {
250                /* If the user does not specify a port, assume the default
251                 * port */
252                if ((scan = strchr(uridata,':')) == NULL) {
253                        RT_INFO->hostname =
254                                strdup(uridata);
255                        RT_INFO->port =
256                                COLLECTOR_PORT;
257                } else {
258                        RT_INFO->hostname =
259                                (char *)strndup(uridata,
260                                                (size_t)(scan - uridata));
261                        RT_INFO->port =
262                                atoi(++scan);
263                }
264        }
265
266        return 0;
267}
268       
269static int rt_start_input(libtrace_t *libtrace) {
270        rt_header_t start_msg;
271
272        start_msg.type = htonl(TRACE_RT_START);
273        start_msg.length = 0;
274        start_msg.sequence = 0;
275        start_msg.version = LIBTRACE_RT_VERSION;
276        start_msg.magic = LIBTRACE_RT_MAGIC; 
277
278        if (rt_connect(libtrace) == -1)
279                return -1;
280       
281        /* Need to send start message to server */
282        if (send(RT_INFO->input_fd, (void*)&start_msg, sizeof(rt_header_t) +
283                                start_msg.length, 0) != sizeof(rt_header_t)) {
284                printf("Failed to send start message to server\n");
285                return -1;
286        }
287
288        return 0;
289}
290
291static int rt_pause_input(libtrace_t *libtrace) {
292        rt_header_t close_msg;
293
294        close_msg.type = htonl(TRACE_RT_CLOSE);
295        close_msg.length = 0; 
296       
297        /* Send a close message to the server */
298        if (send(RT_INFO->input_fd, (void*)&close_msg, sizeof(rt_header_t) + 
299                                close_msg.length, 0) != (int)sizeof(rt_header_t)
300                                + close_msg.length) {
301                printf("Failed to send close message to server\n");
302       
303        }
304
305        close(RT_INFO->input_fd);
306        return 0;
307}
308
309static int rt_fin_input(libtrace_t *libtrace) {
310        /* Make sure we clean up any dummy traces that we have been using */
311
312        if (RT_INFO->dummy_duck)
313                trace_destroy_dead(RT_INFO->dummy_duck);
314
315        if (RT_INFO->dummy_erf)
316                trace_destroy_dead(RT_INFO->dummy_erf);
317
318        if (RT_INFO->dummy_pcap)
319                trace_destroy_dead(RT_INFO->dummy_pcap);
320
321        if (RT_INFO->dummy_linux)
322                trace_destroy_dead(RT_INFO->dummy_linux);
323
324        if (RT_INFO->dummy_ring)
325                trace_destroy_dead(RT_INFO->dummy_ring);
326
327        if (RT_INFO->dummy_bpf)
328                trace_destroy_dead(RT_INFO->dummy_bpf);
329
330        if (RT_INFO->bucket)
331                libtrace_bucket_destroy(RT_INFO->bucket);
332        free(libtrace->format_data);
333        return 0;
334}
335
336/* Sends an RT ACK to the server to acknowledge receipt of packets */
337static int rt_send_ack(libtrace_t *libtrace, 
338                uint32_t seqno)  {
339       
340        static char *ack_buffer = 0;
341        char *buf_ptr;
342        int numbytes = 0;
343        size_t to_write = 0;
344        rt_header_t *hdr;
345        rt_ack_t *ack_hdr;
346       
347        if (!ack_buffer) {
348                ack_buffer = (char*)malloc(sizeof(rt_header_t) 
349                                                        + sizeof(rt_ack_t));
350        }
351       
352        hdr = (rt_header_t *) ack_buffer;
353        ack_hdr = (rt_ack_t *) (ack_buffer + sizeof(rt_header_t));
354       
355        hdr->type = htonl(TRACE_RT_ACK);
356        hdr->length = htons(sizeof(rt_ack_t));
357
358        ack_hdr->sequence = htonl(seqno);
359       
360        to_write = sizeof(rt_ack_t) + sizeof(rt_header_t);
361        buf_ptr = ack_buffer;
362
363        /* Keep trying until we write the entire ACK */
364        while (to_write > 0) {
365                numbytes = send(RT_INFO->input_fd, buf_ptr, to_write, 0); 
366                if (numbytes == -1) {
367                        if (errno == EINTR || errno == EAGAIN) {
368                                continue;
369                        }
370                        else {
371                                printf("Error sending ack\n");
372                                perror("send");
373                                trace_set_err(libtrace, TRACE_ERR_RT_FAILURE, 
374                                                "Error sending ack");
375                                return -1;
376                        }
377                }
378                to_write = to_write - numbytes;
379                buf_ptr = buf_ptr + to_write;
380               
381        }
382
383        return 1;
384}
385
386/* Sets the trace format for the packet to match the format it was originally
387 * captured in, rather than the RT format */
388static int rt_set_format(libtrace_t *libtrace, libtrace_packet_t *packet) 
389{
390
391        /* We need to assign the packet to a "dead" trace */
392
393        /* Try to minimize the number of corrupt packets that slip through
394         * while making it easy to identify new pcap DLTs */
395        if (packet->type > TRACE_RT_DATA_DLT && 
396                        packet->type < TRACE_RT_DATA_DLT_END) {
397                if (!RT_INFO->dummy_pcap) {
398                        RT_INFO->dummy_pcap = trace_create_dead("pcap:-");
399                }
400                packet->trace = RT_INFO->dummy_pcap;
401                return 0;       
402        }
403
404        if (packet->type > TRACE_RT_DATA_BPF &&
405                        packet->type < TRACE_RT_DATA_BPF_END) {
406
407                if (!RT_INFO->dummy_bpf) {
408                        RT_INFO->dummy_bpf = trace_create_dead("bpf:-");
409                        /* This may fail on a non-BSD machine */
410                        if (trace_is_err(RT_INFO->dummy_bpf)) {
411                                trace_perror(RT_INFO->dummy_bpf, "Creating dead bpf trace");
412                                return -1;
413                        }
414                }
415                packet->trace = RT_INFO->dummy_bpf;
416                return 0;
417        }
418
419        switch (packet->type) {
420                case TRACE_RT_DUCK_2_4:
421                case TRACE_RT_DUCK_2_5:
422                case TRACE_RT_DUCK_5_0:
423                        if (!RT_INFO->dummy_duck) {
424                                RT_INFO->dummy_duck = trace_create_dead("duck:dummy");
425                        }
426                        packet->trace = RT_INFO->dummy_duck;
427                        break;
428                case TRACE_RT_DATA_ERF:
429                        if (!RT_INFO->dummy_erf) {
430                                RT_INFO->dummy_erf = trace_create_dead("erf:-");
431                        }
432                        packet->trace = RT_INFO->dummy_erf;
433                        break;
434                case TRACE_RT_DATA_LINUX_NATIVE:
435                        if (!RT_INFO->dummy_linux) {
436                                RT_INFO->dummy_linux = trace_create_dead("int:");
437                                /* This may fail on a non-Linux machine */
438                                if (trace_is_err(RT_INFO->dummy_linux)) {
439                                        trace_perror(RT_INFO->dummy_linux, "Creating dead int trace");
440                                        return -1;
441                                }
442                        }
443                        packet->trace = RT_INFO->dummy_linux;
444                        break;
445                case TRACE_RT_DATA_LINUX_RING:
446                        if (!RT_INFO->dummy_ring) {
447                                RT_INFO->dummy_ring = trace_create_dead("ring:");
448                                /* This may fail on a non-Linux machine */
449                                if (trace_is_err(RT_INFO->dummy_ring)) {
450                                        trace_perror(RT_INFO->dummy_ring, "Creating dead ring trace");
451                                        return -1;
452                                }
453                        }
454                        packet->trace = RT_INFO->dummy_ring;
455                        break;
456                case TRACE_RT_STATUS:
457                case TRACE_RT_METADATA:
458                        /* Just use the RT trace! */
459                        packet->trace = libtrace;
460                        break;
461                case TRACE_RT_DATA_LEGACY_ETH:
462                case TRACE_RT_DATA_LEGACY_ATM:
463                case TRACE_RT_DATA_LEGACY_POS:
464                        printf("Sending legacy over RT is currently not supported\n");
465                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, "Legacy packet cannot be sent over rt");
466                        return -1;
467                default:
468                        printf("Unrecognised format: %u\n", packet->type);
469                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, "Unrecognised packet format");
470                        return -1;
471        }
472        return 0; /* success */
473}               
474
475
476/* I've upped this to 10K to deal with jumbo-grams that have not been snapped
477 * in any way. This means we have a much larger memory overhead per packet
478 * (which won't be used in the vast majority of cases), so we may want to think
479 * about doing something smarter, e.g. allocate a smaller block of memory and
480 * only increase it as required.
481 *
482 * XXX Capturing off int: can still lead to packets that are larger than 10K,
483 * in instances where the fragmentation is done magically by the NIC. This
484 * is pretty nasty, but also very rare.
485 */
486#define RT_BUF_SIZE (LIBTRACE_PACKET_BUFSIZE * 2)
487
488static int rt_process_data_packet(libtrace_t *libtrace,
489                libtrace_packet_t *packet) {
490
491        uint32_t prep_flags = TRACE_PREP_DO_NOT_OWN_BUFFER;
492        rt_header_t *hdr = (rt_header_t *)packet->header;
493
494        /* Send an ACK if required */
495        if (RT_INFO->reliable > 0 && packet->type >= TRACE_RT_DATA_SIMPLE) {
496                RT_INFO->unacked ++;
497                if (RT_INFO->unacked >= RT_ACK_FREQUENCY) {
498                        if (rt_send_ack(libtrace, hdr->sequence) == -1)
499                                return -1;
500                        RT_INFO->unacked = 0;
501                }
502        }
503
504        /* Convert to the original capture format */
505        if (rt_set_format(libtrace, packet) < 0) {
506                return -1;
507        }
508
509        /* Update payload pointers and packet type to match the original
510         * format */
511        if (trace_prepare_packet(packet->trace, packet, packet->payload,
512                                packet->type, prep_flags)) {
513                return -1;
514        }
515
516        return 1;
517
518}
519
520/* Receives data from an RT server */
521static int rt_read(libtrace_t *libtrace, int block) {
522        int numbytes;
523
524        if (!RT_INFO->pkt_buffer) {
525                RT_INFO->pkt_buffer = (char*)malloc((size_t)RT_BUF_SIZE);
526                RT_INFO->buf_write = RT_INFO->pkt_buffer;
527                RT_INFO->buf_read = RT_INFO->pkt_buffer;
528                libtrace_create_new_bucket(RT_INFO->bucket, RT_INFO->pkt_buffer);
529        }
530
531#ifndef MSG_DONTWAIT
532#define MSG_DONTWAIT 0
533#endif
534#ifndef MSG_NOSIGNAL
535#  define MSG_NOSIGNAL 0
536#endif
537
538        if (block)
539                block=0;
540        else
541                block=MSG_DONTWAIT;
542
543        /* If the current buffer has plenty of space left, we can continue to
544         * read into it, otherwise create a new buffer and move anything in
545         * the old buffer over to it */
546        if (RT_INFO->buf_write - RT_INFO->pkt_buffer > RT_BUF_SIZE / 2) {
547                char *newbuf = (char*)malloc((size_t)RT_BUF_SIZE);
548
549                memcpy(newbuf, RT_INFO->buf_read, RT_INFO->buf_write - RT_INFO->buf_read);
550                RT_INFO->buf_write = newbuf + (RT_INFO->buf_write - RT_INFO->buf_read);
551                RT_INFO->buf_read = newbuf;
552                RT_INFO->pkt_buffer = newbuf;
553                libtrace_create_new_bucket(RT_INFO->bucket, newbuf);
554
555        }
556
557        /* Attempt to fill the buffer as much as we can */
558        if ((numbytes = recv(RT_INFO->input_fd, RT_INFO->buf_write,
559                        RT_BUF_SIZE - (RT_INFO->buf_write-RT_INFO->pkt_buffer),
560                        MSG_NOSIGNAL | block)) <= 0) {
561
562                if (numbytes == 0) {
563                        trace_set_err(libtrace, TRACE_ERR_RT_FAILURE,
564                                        "No data received by RT client");
565                        return -1;
566                }
567
568                if (errno == EINTR) {
569                        /* Ignore EINTR in case a caller is using signals */
570                        return 0;
571                }
572
573                if (errno == EAGAIN) {
574                        /* No data available and we are non-blocking */
575                        trace_set_err(libtrace, EAGAIN, "EAGAIN");
576                        return -1;
577                }
578
579                trace_set_err(libtrace, TRACE_ERR_RT_FAILURE,
580                                "Error reading from RT socket: %s",
581                                strerror(errno));
582                return -1;
583        }
584
585        RT_INFO->buf_write += numbytes;
586        return (RT_INFO->buf_write - RT_INFO->buf_read);
587
588}
589
590
591static int rt_get_next_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
592                int block) {
593
594        rt_header_t *rthdr;
595
596        if (packet->buffer && packet->buf_control == TRACE_CTRL_PACKET)
597                free(packet->buffer);
598
599        while (RT_INFO->buf_write - RT_INFO->buf_read <
600                                (uint32_t)sizeof(rt_header_t)) {
601                if (rt_read(libtrace, block) == -1)
602                        return -1;
603        }
604
605        rthdr = (rt_header_t *)RT_INFO->buf_read;
606
607        /* Check if we have enough payload */
608        while (RT_INFO->buf_write - (RT_INFO->buf_read + sizeof(rt_header_t))
609                        < ntohs(rthdr->length)) {
610                if (rt_read(libtrace, block) == -1)
611                        return -1;
612                rthdr = (rt_header_t *)RT_INFO->buf_read;
613        }
614
615
616        packet->buffer = RT_INFO->buf_read;
617        packet->header = RT_INFO->buf_read;
618        packet->type = ntohl(((rt_header_t *)packet->header)->type);
619        packet->payload = RT_INFO->buf_read + sizeof(rt_header_t);
620        packet->internalid = libtrace_push_into_bucket(RT_INFO->bucket);
621        assert(packet->internalid != 0);
622        packet->srcbucket = RT_INFO->bucket;
623        packet->buf_control = TRACE_CTRL_EXTERNAL;
624
625        RT_INFO->buf_read += ntohs(rthdr->length) + sizeof(rt_header_t);
626
627        if (packet->type >= TRACE_RT_DATA_SIMPLE) {
628                rt_process_data_packet(libtrace, packet);
629        } else {
630                switch(packet->type) {
631                        case TRACE_RT_DUCK_2_4:
632                        case TRACE_RT_DUCK_2_5:
633                        case TRACE_RT_STATUS:
634                        case TRACE_RT_METADATA:
635                                if (rt_process_data_packet(libtrace, packet) < 0)
636                                        return -1;
637                                break;
638                        case TRACE_RT_END_DATA:
639                        case TRACE_RT_KEYCHANGE:
640                        case TRACE_RT_LOSTCONN:
641                        case TRACE_RT_CLIENTDROP:
642                        case TRACE_RT_SERVERSTART:
643                                break;
644                        case TRACE_RT_PAUSE_ACK:
645                        case TRACE_RT_OPTION:
646                                break;
647                        default:
648                                fprintf(stderr, "Bad RT type for client: %d\n",
649                                                packet->type);
650                                return -1;
651                }
652        }
653
654        return ntohs(rthdr->length);
655
656}
657
658/* Shouldn't need to call this too often */
659static int rt_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
660                void *buffer, libtrace_rt_types_t rt_type, uint32_t flags) {
661
662        if (packet->buffer != buffer &&
663                        packet->buf_control == TRACE_CTRL_PACKET) {
664                free(packet->buffer);
665        }
666
667        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
668                packet->buf_control = TRACE_CTRL_PACKET;
669        } else
670                packet->buf_control = TRACE_CTRL_EXTERNAL;
671
672
673        packet->buffer = buffer;
674        packet->header = NULL;
675        packet->type = rt_type;
676        packet->payload = buffer;
677
678        if (libtrace->format_data == NULL) {
679                rt_init_format_data(libtrace);
680        }
681
682        return 0;
683}       
684
685/* Reads the next available packet in a blocking fashion */
686static int rt_read_packet(libtrace_t *libtrace,
687                libtrace_packet_t *packet) {
688        return rt_get_next_packet(libtrace,packet,1);
689}
690
691
692/* This should only get called for RT messages - RT-encapsulated data records
693 * should be converted to the appropriate capture format */
694static int rt_get_capture_length(const libtrace_packet_t *packet) {
695        rt_metadata_t *rt_md_hdr;
696        switch (packet->type) {
697                case TRACE_RT_STATUS:
698                        return sizeof(rt_status_t);
699                case TRACE_RT_HELLO:
700                        return sizeof(rt_hello_t);
701                case TRACE_RT_START:
702                        return 0;
703                case TRACE_RT_ACK:
704                        return sizeof(rt_ack_t);
705                case TRACE_RT_END_DATA:
706                        return 0;
707                case TRACE_RT_CLOSE:
708                        return 0;
709                case TRACE_RT_DENY_CONN:
710                        return sizeof(rt_deny_conn_t);
711                case TRACE_RT_PAUSE:
712                        return 0; 
713                case TRACE_RT_PAUSE_ACK:
714                        return 0;
715                case TRACE_RT_OPTION:
716                        return 0; /* FIXME */
717                case TRACE_RT_KEYCHANGE:
718                        return 0;
719                case TRACE_RT_LOSTCONN:
720                        return 0;
721                case TRACE_RT_SERVERSTART:
722                        return 0;
723                case TRACE_RT_CLIENTDROP:
724                        return 0;
725                case TRACE_RT_METADATA:
726                        /* This is a little trickier to work out */
727                        rt_md_hdr = (rt_metadata_t *)packet->buffer;
728                        return rt_md_hdr->label_len + rt_md_hdr->value_len + 
729                                sizeof(rt_metadata_t);
730                default:
731                        printf("Unknown type: %d\n", packet->type);
732                       
733        }
734        return 0;
735}
736
737/* RT messages do not have a wire length because they were not captured from
738 * the wire - they were generated by the capture process */
739static int rt_get_wire_length(UNUSED const libtrace_packet_t *packet) {
740        return 0;
741}
742
743/* Although RT messages do contain "framing", this framing is considered to be
744 * stripped as soon as the packet is read by the RT client */                   
745static int rt_get_framing_length(UNUSED const libtrace_packet_t *packet) {
746        return 0;
747}
748
749
750static libtrace_linktype_t rt_get_link_type(UNUSED const libtrace_packet_t *packet)
751{
752        /* RT messages don't have a link type */
753        return TRACE_TYPE_NONDATA;
754}
755
756static int rt_get_fd(const libtrace_t *trace) {
757        return ((struct rt_format_data_t *)trace->format_data)->input_fd;
758}
759
760static libtrace_eventobj_t trace_event_rt(libtrace_t *trace,
761                                        libtrace_packet_t *packet) 
762{
763        libtrace_eventobj_t event = {0,0,0.0,0};
764        libtrace_err_t read_err;
765
766        assert(trace);
767        assert(packet);
768       
769        if (trace->format->get_fd) {
770                event.fd = trace->format->get_fd(trace);
771        } else {
772                event.fd = 0;
773        }
774
775        do {
776
777                event.size = rt_get_next_packet(trace, packet, 0);
778                if (event.size == -1) {
779                        read_err = trace_get_err(trace);
780                        if (read_err.err_num == EAGAIN) {
781                                /* No data available - do an IOWAIT */
782                                event.type = TRACE_EVENT_IOWAIT;
783                        }
784                        else {
785                                trace_perror(trace, "Error doing a non-blocking read from rt");
786                                event.type = TRACE_EVENT_PACKET;
787                                break;
788                        }
789                } else if (event.size == 0) {
790                        /* RT gives us a specific indicator that there will be
791                         * no more packets. */
792                        if (packet->type == TRACE_RT_END_DATA)
793                                event.type = TRACE_EVENT_TERMINATE;
794                        else {
795                                /* Since several RT messages can have zero-byte
796                                 * length (once the framing is removed), an
797                                 * event size of zero can still indicate a
798                                 * PACKET event */
799                                event.type = TRACE_EVENT_PACKET;
800                                trace->accepted_packets ++;
801                        }
802
803                }       
804                else {
805                        event.type = TRACE_EVENT_PACKET;
806                        trace->accepted_packets ++;
807                }
808
809                if (trace->filter && event.type == TRACE_EVENT_PACKET) {
810                        if (!trace_apply_filter(trace->filter, packet)) {
811                                trace_clear_cache(packet);
812                                trace->filtered_packets ++;
813                                continue;
814                        }
815                }
816
817                break; 
818        } while (1);
819
820        return event;
821}
822
823static void rt_help(void) {
824        printf("rt format module\n");
825        printf("Supported input URIs:\n");
826        printf("\trt:hostname:port\n");
827        printf("\trt:hostname (connects on default port)\n");
828        printf("\n");
829        printf("\te.g.: rt:localhost\n");
830        printf("\te.g.: rt:localhost:32500\n");
831        printf("\n");
832
833}
834
835
836static struct libtrace_format_t rt = {
837        "rt",
838        "$Id$",
839        TRACE_FORMAT_RT,
840        NULL,                           /* probe filename */
841        NULL,                           /* probe magic */
842        rt_init_input,                  /* init_input */
843        NULL,                           /* config_input */
844        rt_start_input,                 /* start_input */
845        rt_pause_input,                 /* pause */
846        NULL,                           /* init_output */
847        NULL,                           /* config_output */
848        NULL,                           /* start_output */
849        rt_fin_input,                   /* fin_input */
850        NULL,                           /* fin_output */
851        rt_read_packet,                 /* read_packet */
852        rt_prepare_packet,              /* prepare_packet */
853        NULL,                           /* fin_packet */
854        NULL,                           /* write_packet */
855        rt_get_link_type,               /* get_link_type */
856        NULL,                           /* get_direction */
857        NULL,                           /* set_direction */
858        NULL,                           /* get_erf_timestamp */
859        NULL,                           /* get_timeval */
860        NULL,                           /* get_timespec */
861        NULL,                           /* get_seconds */
862        NULL,                           /* seek_erf */
863        NULL,                           /* seek_timeval */
864        NULL,                           /* seek_seconds */
865        rt_get_capture_length,          /* get_capture_length */
866        rt_get_wire_length,                     /* get_wire_length */
867        rt_get_framing_length,          /* get_framing_length */
868        NULL,                           /* set_capture_length */
869        NULL,                           /* get_received_packets */
870        NULL,                           /* get_filtered_packets */
871        NULL,                           /* get_dropped_packets */
872        NULL,                           /* get_statistics */
873        rt_get_fd,                      /* get_fd */
874        trace_event_rt,             /* trace_event */
875        rt_help,                        /* help */
876        NULL,                   /* next pointer */
877        NON_PARALLEL(true) /* This is normally live */
878};
879
880void rt_constructor(void) {
881        register_format(&rt);
882}
Note: See TracBrowser for help on using the repository browser.