source: lib/format_rt.c @ 10553bf

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 10553bf was 5478d3d, checked in by Shane Alcock <salcock@…>, 6 years ago

Fix all outstanding warnings

Implemented trace_get_statistics for formats that were missing it, so
we no longer need to use the deprecated trace_get_dropped_packets anywhere
within libtrace.

  • Property mode set to 100644
File size: 26.1 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Authors: Daniel Lawson
8 *          Perry Lorier
9 *          Shane Alcock
10 *         
11 * All rights reserved.
12 *
13 * This code has been developed by the University of Waikato WAND
14 * research group. For further information please see http://www.wand.net.nz/
15 *
16 * libtrace is free software; you can redistribute it and/or modify
17 * it under the terms of the GNU General Public License as published by
18 * the Free Software Foundation; either version 2 of the License, or
19 * (at your option) any later version.
20 *
21 * libtrace is distributed in the hope that it will be useful,
22 * but WITHOUT ANY WARRANTY; without even the implied warranty of
23 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24 * GNU General Public License for more details.
25 *
26 * You should have received a copy of the GNU General Public License
27 * along with libtrace; if not, write to the Free Software
28 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
29 *
30 * $Id$
31 *
32 */
33
34
35#define _GNU_SOURCE
36
37#include "config.h"
38#include "common.h"
39#include "libtrace.h"
40#include "libtrace_int.h"
41#include "format_helper.h"
42#include "rt_protocol.h"
43
44#include <sys/stat.h>
45#include <assert.h>
46#include <errno.h>
47#include <fcntl.h>
48#include <stdio.h>
49#include <string.h>
50#include <stdlib.h>
51#include <unistd.h>
52
53#ifndef WIN32
54# include <netdb.h>
55#endif
56
57#define RT_INFO ((struct rt_format_data_t*)libtrace->format_data)
58
59/* Convert the RT denial code into a nice printable and coherent string */
60static const char *rt_deny_reason(enum rt_conn_denied_t reason) 
61{
62        const char *string = 0;
63
64        switch(reason) {
65                case RT_DENY_WRAPPER:
66                        string = "Rejected by TCP Wrappers";
67                        break;
68                case RT_DENY_FULL:
69                        string = "Max connections reached on server";
70                        break;
71                case RT_DENY_AUTH:
72                        string = "Authentication failed";
73                        break;
74                default:
75                        string = "Unknown reason";
76        }
77
78        return string;
79}
80
81
82struct rt_format_data_t {
83        /* Name of the host to connect to */
84        char *hostname;
85        /* Buffer to store received packets into */
86        char *pkt_buffer;
87        /* Pointer to the next packet to be read from the buffer */
88        char *buf_current;
89        /* Amount of buffer space used */
90        size_t buf_filled;
91        /* The port to connect to */
92        int port;
93        /* The file descriptor for the RT connection */
94        int input_fd;
95        /* Flag indicating whether the server is doing reliable RT */
96        int reliable;
97
98        /* Header for the packet currently being received */
99        rt_header_t rt_hdr;
100
101        int unacked;
102       
103        /* Dummy traces that can be assigned to the received packets to ensure
104         * that the appropriate functions can be used to process them */
105        libtrace_t *dummy_duck;
106        libtrace_t *dummy_erf;
107        libtrace_t *dummy_pcap;
108        libtrace_t *dummy_linux;
109        libtrace_t *dummy_ring;
110        libtrace_t *dummy_bpf;
111};
112
113/* Connects to an RT server
114 *
115 * Returns -1 if an error occurs
116 */
117static int rt_connect(libtrace_t *libtrace) {
118        struct hostent *he;
119        struct sockaddr_in remote;
120        rt_header_t connect_msg;
121        rt_deny_conn_t deny_hdr;       
122        rt_hello_t hello_opts;
123        uint8_t reason;
124       
125        if ((he=gethostbyname(RT_INFO->hostname)) == NULL) {
126                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
127                                "Failed to convert hostname %s to address",
128                                RT_INFO->hostname);
129                return -1;
130        }
131        if ((RT_INFO->input_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
132                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
133                                "Could not create socket");
134                return -1;
135        }
136
137        memset(&remote,0, sizeof(remote));
138        remote.sin_family = AF_INET;
139        remote.sin_port = htons(RT_INFO->port);
140        remote.sin_addr = *((struct in_addr *)he->h_addr);
141
142        if (connect(RT_INFO->input_fd, (struct sockaddr *)&remote,
143                                (socklen_t)sizeof(struct sockaddr)) == -1) {
144                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
145                                "Could not connect to host %s on port %d",
146                                RT_INFO->hostname, RT_INFO->port);
147                return -1;
148        }
149
150        /* We are connected, now receive message from server */
151       
152        if (recv(RT_INFO->input_fd, (void*)&connect_msg, sizeof(rt_header_t), 0) != sizeof(rt_header_t) ) {
153                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
154                                "Could not receive connection message from %s",
155                                RT_INFO->hostname);
156                return -1;
157        }
158       
159        if (connect_msg.magic != LIBTRACE_RT_MAGIC) {
160                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
161                        "RT version mismatch: magic byte is incorrect");
162                return -1;
163        }
164
165        if (connect_msg.version != LIBTRACE_RT_VERSION) {
166                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
167                        "RT version mismatch: version is incorrect (expected %d, got %d",
168                        LIBTRACE_RT_VERSION, connect_msg.version);
169                return -1;
170        }
171
172               
173       
174        switch (ntohl(connect_msg.type)) {
175                case TRACE_RT_DENY_CONN:
176                        /* Connection was denied */
177                       
178                        if (recv(RT_INFO->input_fd, (void*)&deny_hdr, 
179                                                sizeof(rt_deny_conn_t),
180                                                0) != sizeof(rt_deny_conn_t)) {
181                                reason = 0;
182                        }       
183                        reason = ntohl(deny_hdr.reason);
184                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
185                                "Connection attempt is denied: %s",
186                                rt_deny_reason(reason));       
187                        return -1;
188                case TRACE_RT_HELLO:
189                        /* Hello message - read the options sent to us by the
190                         * server */
191                        if (recv(RT_INFO->input_fd, (void*)&hello_opts, 
192                                                sizeof(rt_hello_t), 0)
193                                        != sizeof(rt_hello_t)) {
194                                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
195                                        "Failed to receive TRACE_RT_HELLO options");
196                                return -1;
197                        }
198                       
199                       
200                        RT_INFO->reliable = hello_opts.reliable;
201                        RT_INFO->unacked = 0;
202                        return 0;
203                default:
204                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
205                                        "Unknown message type received: %d",
206                                        connect_msg.type);
207                        return -1;
208        }
209        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
210                        "Somehow you managed to reach this unreachable code");
211        return -1;
212}
213
214static void rt_init_format_data(libtrace_t *libtrace) {
215        libtrace->format_data = malloc(sizeof(struct rt_format_data_t));
216
217        RT_INFO->dummy_duck = NULL;
218        RT_INFO->dummy_erf = NULL;
219        RT_INFO->dummy_pcap = NULL;
220        RT_INFO->dummy_linux = NULL;
221        RT_INFO->dummy_ring = NULL;
222        RT_INFO->dummy_bpf = NULL;
223        RT_INFO->pkt_buffer = NULL;
224        RT_INFO->buf_current = NULL;
225        RT_INFO->buf_filled = 0;
226        RT_INFO->hostname = NULL;
227        RT_INFO->port = 0;
228        RT_INFO->unacked = 0;
229}
230
231static int rt_init_input(libtrace_t *libtrace) {
232        char *scan;
233        char *uridata = libtrace->uridata;
234
235        rt_init_format_data(libtrace);
236
237        /* If the user specifies "rt:" then assume localhost and the default
238         * port */     
239        if (strlen(uridata) == 0) {
240                RT_INFO->hostname =
241                        strdup("localhost");
242                RT_INFO->port =
243                        COLLECTOR_PORT;
244        } else {
245                /* If the user does not specify a port, assume the default
246                 * port */
247                if ((scan = strchr(uridata,':')) == NULL) {
248                        RT_INFO->hostname =
249                                strdup(uridata);
250                        RT_INFO->port =
251                                COLLECTOR_PORT;
252                } else {
253                        RT_INFO->hostname =
254                                (char *)strndup(uridata,
255                                                (size_t)(scan - uridata));
256                        RT_INFO->port =
257                                atoi(++scan);
258                }
259        }
260
261        return 0;
262}
263       
264static int rt_start_input(libtrace_t *libtrace) {
265        rt_header_t start_msg;
266
267        start_msg.type = htonl(TRACE_RT_START);
268        start_msg.length = 0;
269        start_msg.sequence = 0;
270        start_msg.version = LIBTRACE_RT_VERSION;
271        start_msg.magic = LIBTRACE_RT_MAGIC; 
272
273        if (rt_connect(libtrace) == -1)
274                return -1;
275       
276        /* Need to send start message to server */
277        if (send(RT_INFO->input_fd, (void*)&start_msg, sizeof(rt_header_t) +
278                                start_msg.length, 0) != sizeof(rt_header_t)) {
279                printf("Failed to send start message to server\n");
280                return -1;
281        }
282        RT_INFO->rt_hdr.type = TRACE_RT_LAST;
283
284        return 0;
285}
286
287static int rt_pause_input(libtrace_t *libtrace) {
288        rt_header_t close_msg;
289
290        close_msg.type = htonl(TRACE_RT_CLOSE);
291        close_msg.length = 0; 
292       
293        /* Send a close message to the server */
294        if (send(RT_INFO->input_fd, (void*)&close_msg, sizeof(rt_header_t) + 
295                                close_msg.length, 0) != (int)sizeof(rt_header_t)
296                                + close_msg.length) {
297                printf("Failed to send close message to server\n");
298       
299        }
300
301        close(RT_INFO->input_fd);
302        return 0;
303}
304
305static int rt_fin_input(libtrace_t *libtrace) {
306        /* Make sure we clean up any dummy traces that we have been using */
307       
308        if (RT_INFO->dummy_duck)
309                trace_destroy_dead(RT_INFO->dummy_duck);
310
311        if (RT_INFO->dummy_erf) 
312                trace_destroy_dead(RT_INFO->dummy_erf);
313               
314        if (RT_INFO->dummy_pcap)
315                trace_destroy_dead(RT_INFO->dummy_pcap);
316
317        if (RT_INFO->dummy_linux)
318                trace_destroy_dead(RT_INFO->dummy_linux);
319       
320        if (RT_INFO->dummy_ring)
321                trace_destroy_dead(RT_INFO->dummy_ring);
322
323        if (RT_INFO->dummy_bpf)
324                trace_destroy_dead(RT_INFO->dummy_bpf);
325        free(libtrace->format_data);
326        return 0;
327}
328
329
330/* I've upped this to 10K to deal with jumbo-grams that have not been snapped
331 * in any way. This means we have a much larger memory overhead per packet
332 * (which won't be used in the vast majority of cases), so we may want to think
333 * about doing something smarter, e.g. allocate a smaller block of memory and
334 * only increase it as required.
335 *
336 * XXX Capturing off int: can still lead to packets that are larger than 10K,
337 * in instances where the fragmentation is done magically by the NIC. This
338 * is pretty nasty, but also very rare.
339 */
340#define RT_BUF_SIZE (LIBTRACE_PACKET_BUFSIZE * 2)
341
342/* Receives data from an RT server */
343static int rt_read(libtrace_t *libtrace, void *buffer, size_t len, int block) 
344{
345        int numbytes;
346       
347        assert(len <= RT_BUF_SIZE);
348       
349        if (!RT_INFO->pkt_buffer) {
350                RT_INFO->pkt_buffer = (char*)malloc((size_t)RT_BUF_SIZE);
351                RT_INFO->buf_current = RT_INFO->pkt_buffer;
352                RT_INFO->buf_filled = 0;
353        }
354
355#ifndef MSG_DONTWAIT
356#define MSG_DONTWAIT 0
357#endif
358
359        if (block)
360                block=0;
361        else
362                block=MSG_DONTWAIT;
363
364        /* If we don't have enough buffer space for the amount we want to
365         * read, move the current buffer contents to the front of the buffer
366         * to make room */
367        if (len > RT_INFO->buf_filled) {
368                memcpy(RT_INFO->pkt_buffer, RT_INFO->buf_current, 
369                                RT_INFO->buf_filled);
370                RT_INFO->buf_current = RT_INFO->pkt_buffer;
371#ifndef MSG_NOSIGNAL
372#  define MSG_NOSIGNAL 0
373#endif
374                /* Loop as long as we don't have all the data that we were
375                 * asked for */
376                while (len > RT_INFO->buf_filled) {
377                        if ((numbytes = recv(RT_INFO->input_fd,
378                                                RT_INFO->buf_current + 
379                                                RT_INFO->buf_filled,
380                                                RT_BUF_SIZE-RT_INFO->buf_filled,
381                                                MSG_NOSIGNAL|block)) <= 0) {
382                                if (numbytes == 0) {
383                                        trace_set_err(libtrace, TRACE_ERR_RT_FAILURE, 
384                                                        "No data received");
385                                        return -1;
386                                }
387                               
388                                if (errno == EINTR) {
389                                        /* ignore EINTR in case
390                                         * a caller is using signals
391                                         */
392                                        continue;
393                                }
394                                if (errno == EAGAIN) {
395                                        /* We asked for non-blocking mode, so
396                                         * we need to return now */
397                                        trace_set_err(libtrace,
398                                                        EAGAIN,
399                                                        "EAGAIN");
400                                        return -1;
401                                }
402                               
403                                perror("recv");
404                                trace_set_err(libtrace, errno,
405                                                "Failed to read data into rt recv buffer");
406                                return -1;
407                        }
408                        RT_INFO->buf_filled+=numbytes;
409                }
410
411        }
412        memcpy(buffer, RT_INFO->buf_current, len);
413        RT_INFO->buf_current += len;
414        RT_INFO->buf_filled -= len;
415        return len;
416}
417
418
419/* Sets the trace format for the packet to match the format it was originally
420 * captured in, rather than the RT format */
421static int rt_set_format(libtrace_t *libtrace, libtrace_packet_t *packet) 
422{
423
424        /* We need to assign the packet to a "dead" trace */
425
426        /* Try to minimize the number of corrupt packets that slip through
427         * while making it easy to identify new pcap DLTs */
428        if (packet->type > TRACE_RT_DATA_DLT && 
429                        packet->type < TRACE_RT_DATA_DLT_END) {
430                if (!RT_INFO->dummy_pcap) {
431                        RT_INFO->dummy_pcap = trace_create_dead("pcap:-");
432                }
433                packet->trace = RT_INFO->dummy_pcap;
434                return 0;       
435        }
436
437        if (packet->type > TRACE_RT_DATA_BPF &&
438                        packet->type < TRACE_RT_DATA_BPF_END) {
439
440                if (!RT_INFO->dummy_bpf) {
441                        RT_INFO->dummy_bpf = trace_create_dead("bpf:-");
442                        /* This may fail on a non-BSD machine */
443                        if (trace_is_err(RT_INFO->dummy_bpf)) {
444                                trace_perror(RT_INFO->dummy_bpf, "Creating dead bpf trace");
445                                return -1;
446                        }
447                }
448                packet->trace = RT_INFO->dummy_bpf;
449                return 0;
450        }
451
452        switch (packet->type) {
453                case TRACE_RT_DUCK_2_4:
454                case TRACE_RT_DUCK_2_5:
455                case TRACE_RT_DUCK_5_0:
456                        if (!RT_INFO->dummy_duck) {
457                                RT_INFO->dummy_duck = trace_create_dead("duck:dummy");
458                        }
459                        packet->trace = RT_INFO->dummy_duck;
460                        break;
461                case TRACE_RT_DATA_ERF:
462                        if (!RT_INFO->dummy_erf) {
463                                RT_INFO->dummy_erf = trace_create_dead("erf:-");
464                        }
465                        packet->trace = RT_INFO->dummy_erf;
466                        break;
467                case TRACE_RT_DATA_LINUX_NATIVE:
468                        if (!RT_INFO->dummy_linux) {
469                                RT_INFO->dummy_linux = trace_create_dead("int:");
470                                /* This may fail on a non-Linux machine */
471                                if (trace_is_err(RT_INFO->dummy_linux)) {
472                                        trace_perror(RT_INFO->dummy_linux, "Creating dead int trace");
473                                        return -1;
474                                }
475                        }
476                        packet->trace = RT_INFO->dummy_linux;
477                        break;
478                case TRACE_RT_DATA_LINUX_RING:
479                        if (!RT_INFO->dummy_ring) {
480                                RT_INFO->dummy_ring = trace_create_dead("ring:");
481                                /* This may fail on a non-Linux machine */
482                                if (trace_is_err(RT_INFO->dummy_ring)) {
483                                        trace_perror(RT_INFO->dummy_ring, "Creating dead ring trace");
484                                        return -1;
485                                }
486                        }
487                        packet->trace = RT_INFO->dummy_ring;
488                        break;
489                case TRACE_RT_STATUS:
490                case TRACE_RT_METADATA:
491                        /* Just use the RT trace! */
492                        packet->trace = libtrace;
493                        break;
494                case TRACE_RT_DATA_LEGACY_ETH:
495                case TRACE_RT_DATA_LEGACY_ATM:
496                case TRACE_RT_DATA_LEGACY_POS:
497                        printf("Sending legacy over RT is currently not supported\n");
498                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, "Legacy packet cannot be sent over rt");
499                        return -1;
500                default:
501                        printf("Unrecognised format: %u\n", packet->type);
502                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, "Unrecognised packet format");
503                        return -1;
504        }
505        return 0; /* success */
506}               
507
508/* Sends an RT ACK to the server to acknowledge receipt of packets */
509static int rt_send_ack(libtrace_t *libtrace, 
510                uint32_t seqno)  {
511       
512        static char *ack_buffer = 0;
513        char *buf_ptr;
514        int numbytes = 0;
515        size_t to_write = 0;
516        rt_header_t *hdr;
517        rt_ack_t *ack_hdr;
518       
519        if (!ack_buffer) {
520                ack_buffer = (char*)malloc(sizeof(rt_header_t) 
521                                                        + sizeof(rt_ack_t));
522        }
523       
524        hdr = (rt_header_t *) ack_buffer;
525        ack_hdr = (rt_ack_t *) (ack_buffer + sizeof(rt_header_t));
526       
527        hdr->type = htonl(TRACE_RT_ACK);
528        hdr->length = htons(sizeof(rt_ack_t));
529
530        ack_hdr->sequence = htonl(seqno);
531       
532        to_write = sizeof(rt_ack_t) + sizeof(rt_header_t);
533        buf_ptr = ack_buffer;
534
535        /* Keep trying until we write the entire ACK */
536        while (to_write > 0) {
537                numbytes = send(RT_INFO->input_fd, buf_ptr, to_write, 0); 
538                if (numbytes == -1) {
539                        if (errno == EINTR || errno == EAGAIN) {
540                                continue;
541                        }
542                        else {
543                                printf("Error sending ack\n");
544                                perror("send");
545                                trace_set_err(libtrace, TRACE_ERR_RT_FAILURE, 
546                                                "Error sending ack");
547                                return -1;
548                        }
549                }
550                to_write = to_write - numbytes;
551                buf_ptr = buf_ptr + to_write;
552               
553        }
554
555        return 1;
556}
557
558/* Shouldn't need to call this too often */
559static int rt_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
560                void *buffer, libtrace_rt_types_t rt_type, uint32_t flags) {
561
562        if (packet->buffer != buffer &&
563                        packet->buf_control == TRACE_CTRL_PACKET) {
564                free(packet->buffer);
565        }
566
567        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
568                packet->buf_control = TRACE_CTRL_PACKET;
569        } else
570                packet->buf_control = TRACE_CTRL_EXTERNAL;
571
572
573        packet->buffer = buffer;
574        packet->header = NULL;
575        packet->type = rt_type;
576        packet->payload = buffer;
577
578        if (libtrace->format_data == NULL) {
579                rt_init_format_data(libtrace);
580        }
581
582        return 0;
583}       
584
585/* Reads the body of an RT packet from the network */
586static int rt_read_data_packet(libtrace_t *libtrace,
587                libtrace_packet_t *packet, int blocking) {
588        uint32_t prep_flags = 0;
589
590        prep_flags |= TRACE_PREP_OWN_BUFFER;
591
592        /* The stored RT header will tell us how much data we need to read */
593        if (rt_read(libtrace, packet->buffer, (size_t)RT_INFO->rt_hdr.length, 
594                                blocking) != RT_INFO->rt_hdr.length) {
595                return -1;
596        }
597
598        /* Send an ACK if required */
599        if (RT_INFO->reliable > 0 && packet->type >= TRACE_RT_DATA_SIMPLE) {
600                RT_INFO->unacked ++;
601                if (RT_INFO->unacked >= RT_ACK_FREQUENCY) {
602                        if (rt_send_ack(libtrace, RT_INFO->rt_hdr.sequence) 
603                                        == -1)
604                                return -1;
605                        RT_INFO->unacked = 0;
606                }
607        }
608       
609        /* Convert to the original capture format */
610        if (rt_set_format(libtrace, packet) < 0) {
611                return -1;
612        }
613               
614        /* Update payload pointers and packet type to match the original
615         * format */
616        if (trace_prepare_packet(packet->trace, packet, packet->buffer,
617                                packet->type, prep_flags)) {
618                return -1;
619        }
620
621        return 0;
622}
623
624/* Reads an RT packet from the network. Will block if the "blocking" flag is
625 * set to 1, otherwise will return if insufficient data is available */
626static int rt_read_packet_versatile(libtrace_t *libtrace,
627                libtrace_packet_t *packet,int blocking) {
628        rt_header_t hdr;
629        rt_header_t *pkt_hdr = NULL;
630        libtrace_rt_types_t switch_type;
631       
632        /* RT_LAST indicates that we need to read the RT header for the next
633         * packet. This is a touch hax, I admit */
634        if (RT_INFO->rt_hdr.type == TRACE_RT_LAST) {
635                /* FIXME: Better error handling required */
636                if (rt_read(libtrace, (void *)&hdr, 
637                                sizeof(rt_header_t),blocking) !=
638                                sizeof(rt_header_t)) {
639                        return -1;
640                }
641                /* Need to store these in case the next rt_read overwrites
642                 * the buffer they came from! */
643                RT_INFO->rt_hdr.type = ntohl(hdr.type);
644                RT_INFO->rt_hdr.length = ntohs(hdr.length);
645                RT_INFO->rt_hdr.sequence = ntohl(hdr.sequence);
646        }
647
648        if (packet->buf_control == TRACE_CTRL_PACKET) {
649                if (packet->buffer == NULL) {
650                        packet->buffer = malloc(RT_INFO->rt_hdr.length + sizeof(rt_header_t));
651
652                } else if (RT_INFO->rt_hdr.length > sizeof(packet->buffer)) {
653                        packet->buffer = realloc(packet->buffer, RT_INFO->rt_hdr.length + sizeof(rt_header_t));
654                }
655        }
656       
657        packet->type = RT_INFO->rt_hdr.type;
658        packet->payload = packet->buffer;
659       
660       
661        /* All data-bearing packets (as opposed to RT internal messages)
662         * should be treated the same way when it comes to reading the rest
663         * of the packet */
664        if (packet->type >= TRACE_RT_DATA_SIMPLE) {
665                switch_type = TRACE_RT_DATA_SIMPLE;
666        } else {
667                switch_type = packet->type;
668        }
669
670        switch(switch_type) {
671                case TRACE_RT_DATA_SIMPLE:
672                case TRACE_RT_DUCK_2_4:
673                case TRACE_RT_DUCK_2_5:
674                case TRACE_RT_STATUS:
675                case TRACE_RT_METADATA:
676                        if (rt_read_data_packet(libtrace, packet, blocking))
677                                return -1;
678                        break;
679                case TRACE_RT_END_DATA:
680                case TRACE_RT_KEYCHANGE:
681                case TRACE_RT_LOSTCONN:
682                case TRACE_RT_CLIENTDROP:
683                case TRACE_RT_SERVERSTART:
684                        /* All these have no payload */
685                        packet->header = packet->buffer;
686                        packet->payload = ((char *)packet->buffer + sizeof(rt_header_t));
687                        pkt_hdr = (rt_header_t *)packet->header;
688                        pkt_hdr->type = ntohl(RT_INFO->rt_hdr.type);
689                        pkt_hdr->length = ntohs(RT_INFO->rt_hdr.length);
690                        pkt_hdr->sequence = ntohl(RT_INFO->rt_hdr.sequence);
691
692                        /* XXX Do we need to save the other crap? */
693                        break;
694                case TRACE_RT_PAUSE_ACK:
695                        /* XXX: Add support for this */
696                        break;
697                case TRACE_RT_OPTION:
698                        /* XXX: Add support for this */
699                        break;
700                default:
701                        printf("Bad rt type for client receipt: %d\n",
702                                        switch_type);
703                        return -1;
704        }
705       
706        /* Return the number of bytes read from the stream */
707        RT_INFO->rt_hdr.type = TRACE_RT_LAST;
708        return RT_INFO->rt_hdr.length + sizeof(rt_header_t);
709}
710
711/* Reads the next available packet in a blocking fashion */
712static int rt_read_packet(libtrace_t *libtrace,
713                libtrace_packet_t *packet) {
714        return rt_read_packet_versatile(libtrace,packet,1);
715}
716
717
718/* This should only get called for RT messages - RT-encapsulated data records
719 * should be converted to the appropriate capture format */
720static int rt_get_capture_length(const libtrace_packet_t *packet) {
721        rt_metadata_t *rt_md_hdr;
722        switch (packet->type) {
723                case TRACE_RT_STATUS:
724                        return sizeof(rt_status_t);
725                case TRACE_RT_HELLO:
726                        return sizeof(rt_hello_t);
727                case TRACE_RT_START:
728                        return 0;
729                case TRACE_RT_ACK:
730                        return sizeof(rt_ack_t);
731                case TRACE_RT_END_DATA:
732                        return 0;
733                case TRACE_RT_CLOSE:
734                        return 0;
735                case TRACE_RT_DENY_CONN:
736                        return sizeof(rt_deny_conn_t);
737                case TRACE_RT_PAUSE:
738                        return 0; 
739                case TRACE_RT_PAUSE_ACK:
740                        return 0;
741                case TRACE_RT_OPTION:
742                        return 0; /* FIXME */
743                case TRACE_RT_KEYCHANGE:
744                        return 0;
745                case TRACE_RT_LOSTCONN:
746                        return 0;
747                case TRACE_RT_SERVERSTART:
748                        return 0;
749                case TRACE_RT_CLIENTDROP:
750                        return 0;
751                case TRACE_RT_METADATA:
752                        /* This is a little trickier to work out */
753                        rt_md_hdr = (rt_metadata_t *)packet->buffer;
754                        return rt_md_hdr->label_len + rt_md_hdr->value_len + 
755                                sizeof(rt_metadata_t);
756                default:
757                        printf("Unknown type: %d\n", packet->type);
758                       
759        }
760        return 0;
761}
762
763/* RT messages do not have a wire length because they were not captured from
764 * the wire - they were generated by the capture process */
765static int rt_get_wire_length(UNUSED const libtrace_packet_t *packet) {
766        return 0;
767}
768
769/* Although RT messages do contain "framing", this framing is considered to be
770 * stripped as soon as the packet is read by the RT client */                   
771static int rt_get_framing_length(UNUSED const libtrace_packet_t *packet) {
772        return 0;
773}
774
775
776static libtrace_linktype_t rt_get_link_type(UNUSED const libtrace_packet_t *packet)
777{
778        /* RT messages don't have a link type */
779        return TRACE_TYPE_NONDATA;
780}
781
782static int rt_get_fd(const libtrace_t *trace) {
783        return ((struct rt_format_data_t *)trace->format_data)->input_fd;
784}
785
786static libtrace_eventobj_t trace_event_rt(libtrace_t *trace,
787                                        libtrace_packet_t *packet) 
788{
789        libtrace_eventobj_t event = {0,0,0.0,0};
790        libtrace_err_t read_err;
791
792        assert(trace);
793        assert(packet);
794       
795        if (trace->format->get_fd) {
796                event.fd = trace->format->get_fd(trace);
797        } else {
798                event.fd = 0;
799        }
800
801        do {
802
803                event.size = rt_read_packet_versatile(trace, packet, 0);
804                if (event.size == -1) {
805                        read_err = trace_get_err(trace);
806                        if (read_err.err_num == EAGAIN) {
807                                /* No data available - do an IOWAIT */
808                                event.type = TRACE_EVENT_IOWAIT;
809                        }
810                        else {
811                                trace_perror(trace, "Error doing a non-blocking read from rt");
812                                event.type = TRACE_EVENT_PACKET;
813                                break;
814                        }
815                } else if (event.size == 0) {
816                        /* RT gives us a specific indicator that there will be
817                         * no more packets. */
818                        if (packet->type == TRACE_RT_END_DATA)
819                                event.type = TRACE_EVENT_TERMINATE;
820                        else {
821                                /* Since several RT messages can have zero-byte
822                                 * length (once the framing is removed), an
823                                 * event size of zero can still indicate a
824                                 * PACKET event */
825                                event.type = TRACE_EVENT_PACKET;
826                                trace->accepted_packets ++;
827                        }
828
829                }       
830                else {
831                        event.type = TRACE_EVENT_PACKET;
832                        trace->accepted_packets ++;
833                }
834
835                if (trace->filter && event.type == TRACE_EVENT_PACKET) {
836                        if (!trace_apply_filter(trace->filter, packet)) {
837                                trace_clear_cache(packet);
838                                trace->filtered_packets ++;
839                                continue;
840                        }
841                }
842
843                break; 
844        } while (1);
845
846        return event;
847}
848
849static void rt_help(void) {
850        printf("rt format module\n");
851        printf("Supported input URIs:\n");
852        printf("\trt:hostname:port\n");
853        printf("\trt:hostname (connects on default port)\n");
854        printf("\n");
855        printf("\te.g.: rt:localhost\n");
856        printf("\te.g.: rt:localhost:32500\n");
857        printf("\n");
858
859}
860
861
862static struct libtrace_format_t rt = {
863        "rt",
864        "$Id$",
865        TRACE_FORMAT_RT,
866        NULL,                           /* probe filename */
867        NULL,                           /* probe magic */
868        rt_init_input,                  /* init_input */
869        NULL,                           /* config_input */
870        rt_start_input,                 /* start_input */
871        rt_pause_input,                 /* pause */
872        NULL,                           /* init_output */
873        NULL,                           /* config_output */
874        NULL,                           /* start_output */
875        rt_fin_input,                   /* fin_input */
876        NULL,                           /* fin_output */
877        rt_read_packet,                 /* read_packet */
878        rt_prepare_packet,              /* prepare_packet */
879        NULL,                           /* fin_packet */
880        NULL,                           /* write_packet */
881        rt_get_link_type,               /* get_link_type */
882        NULL,                           /* get_direction */
883        NULL,                           /* set_direction */
884        NULL,                           /* get_erf_timestamp */
885        NULL,                           /* get_timeval */
886        NULL,                           /* get_timespec */
887        NULL,                           /* get_seconds */
888        NULL,                           /* seek_erf */
889        NULL,                           /* seek_timeval */
890        NULL,                           /* seek_seconds */
891        rt_get_capture_length,          /* get_capture_length */
892        rt_get_wire_length,                     /* get_wire_length */
893        rt_get_framing_length,          /* get_framing_length */
894        NULL,                           /* set_capture_length */
895        NULL,                           /* get_received_packets */
896        NULL,                           /* get_filtered_packets */
897        NULL,                           /* get_dropped_packets */
898        NULL,                           /* get_statistics */
899        rt_get_fd,                      /* get_fd */
900        trace_event_rt,             /* trace_event */
901        rt_help,                        /* help */
902        NULL,                   /* next pointer */
903        NON_PARALLEL(true) /* This is normally live */
904};
905
906void rt_constructor(void) {
907        register_format(&rt);
908}
Note: See TracBrowser for help on using the repository browser.