source: lib/format_rt.c @ 1fbd938

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivegetfragoffhelplibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 1fbd938 was 1fbd938, checked in by Perry Lorier <perry@…>, 15 years ago

More -pedantic cleanups
Changed the format structure (again) for pause and seek support

  • Property mode set to 100644
File size: 13.9 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2004 The University of Waikato, Hamilton, New Zealand.
5 * Authors: Daniel Lawson
6 *          Perry Lorier
7 *          Shane Alcock
8 *
9 * All rights reserved.
10 *
11 * This code has been developed by the University of Waikato WAND
12 * research group. For further information please see http://www.wand.net.nz/
13 *
14 * libtrace is free software; you can redistribute it and/or modify
15 * it under the terms of the GNU General Public License as published by
16 * the Free Software Foundation; either version 2 of the License, or
17 * (at your option) any later version.
18 *
19 * libtrace is distributed in the hope that it will be useful,
20 * but WITHOUT ANY WARRANTY; without even the implied warranty of
21 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22 * GNU General Public License for more details.
23 *
24 * You should have received a copy of the GNU General Public License
25 * along with libtrace; if not, write to the Free Software
26 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
27 *
28 * $Id$
29 *
30 */
31
32#define _GNU_SOURCE
33
34#include "config.h"
35#include "common.h"
36#include "libtrace.h"
37#include "libtrace_int.h"
38#include "format_helper.h"
39#include "parse_cmd.h"
40#include "rt_protocol.h"
41
42#ifdef HAVE_INTTYPES_H
43#  include <inttypes.h>
44#else
45#  error "Can't find inttypes.h - this needs to be fixed"
46#endif
47
48#ifdef HAVE_STDDEF_H
49#  include <stddef.h>
50#else
51# error "Can't find stddef.h - do you define ptrdiff_t elsewhere?"
52#endif
53#include <sys/types.h>
54#include <sys/socket.h>
55#include <sys/un.h>
56#include <sys/mman.h>
57#include <sys/stat.h>
58#include <unistd.h>
59#include <assert.h>
60#include <errno.h>
61#include <netdb.h>
62#include <fcntl.h>
63#include <getopt.h>
64#include <stdio.h>
65#include <string.h>
66#include <stdlib.h>
67
68#define RT_INFO libtrace->format_data
69
70char *rt_deny_reason(uint8_t reason) {
71        char *string = 0;
72
73        switch(reason) {
74                case RT_DENY_WRAPPER:
75                        string = "Rejected by TCP Wrappers";
76                        break;
77                case RT_DENY_FULL:
78                        string = "Max connections reached on server";
79                        break;
80                case RT_DENY_AUTH:
81                        string = "Authentication failed";
82                        break;
83                default:
84                        string = "Unknown reason";
85        }
86
87        return string;
88}
89
90
91struct libtrace_format_data_t {
92        char *hostname;
93        int port;
94        int input_fd;
95        int reliable;
96
97        struct libtrace_t *dummy_erf;
98        struct libtrace_t *dummy_pcap;
99        struct libtrace_t *dummy_wag;
100};
101
102static struct libtrace_format_t rt;
103
104static int rt_connect(struct libtrace_t *libtrace) {
105        struct hostent *he;
106        struct sockaddr_in remote;
107        rt_header_t connect_msg;
108        uint8_t reason; 
109       
110        if ((he=gethostbyname(RT_INFO->hostname)) == NULL) {
111                perror("gethostbyname");
112                return 0;
113        }
114        if ((RT_INFO->input_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
115                perror("socket");
116                return 0;
117        }
118
119        remote.sin_family = AF_INET;
120        remote.sin_port = htons(RT_INFO->port);
121        remote.sin_addr = *((struct in_addr *)he->h_addr);
122        bzero(&(remote.sin_zero), 8);
123
124        if (connect(RT_INFO->input_fd, (struct sockaddr *)&remote,
125                                sizeof(struct sockaddr)) == -1) {
126                perror("connect (inet)");
127                return 0;
128        }
129       
130        /* We are connected, now receive message from server */
131       
132        if (recv(RT_INFO->input_fd, &connect_msg, sizeof(rt_header_t), 0) != sizeof(rt_header_t) ) {
133                printf("An error occured while connecting to %s\n", RT_INFO->hostname);
134                return -1;
135        }
136
137        switch (connect_msg.type) {
138                case RT_DENY_CONN:
139                       
140                        if (recv(RT_INFO->input_fd, &reason, 1, 0) != 1) {
141                                reason = 0;
142                        }       
143                        printf("Connection attempt is denied by the server: %s\n",
144                                        rt_deny_reason(reason));
145                        return -1;
146                case RT_HELLO:
147                        // do something with options
148                       
149                       
150                        return 1;
151                case RT_DATA:
152                        printf("Server needs to send RT_HELLO before sending data to clients\n");
153                        return -1;
154                default:
155                        printf("Unexpected message type: %d\n", connect_msg.type);
156                        return -1;
157        }
158       
159        return -1;
160}
161
162
163static int rt_init_input(struct libtrace_t *libtrace) {
164        char *scan;
165        char *uridata = libtrace->uridata;
166        libtrace->format_data = (struct libtrace_format_data_t *)
167                malloc(sizeof(struct libtrace_format_data_t));
168
169        //libtrace->sourcetype = RT;
170
171        RT_INFO->dummy_erf = NULL;
172        RT_INFO->dummy_pcap = NULL;
173        RT_INFO->dummy_wag = NULL;
174       
175        if (strlen(uridata) == 0) {
176                RT_INFO->hostname =
177                        strdup("localhost");
178                RT_INFO->port =
179                        COLLECTOR_PORT;
180        } else {
181                if ((scan = strchr(uridata,':')) == NULL) {
182                        RT_INFO->hostname =
183                                strdup(uridata);
184                        RT_INFO->port =
185                                COLLECTOR_PORT;
186                } else {
187                        RT_INFO->hostname =
188                                (char *)strndup(uridata,
189                                                (scan - uridata));
190                        RT_INFO->port =
191                                atoi(++scan);
192                }
193        }
194
195        return rt_connect(libtrace);
196}
197       
198static int rt_start_input(struct libtrace_t *libtrace) {
199        rt_header_t start_msg;
200
201        start_msg.type = RT_START;
202        start_msg.length = sizeof(rt_header_t);
203       
204        // Need to send start message to server
205        if (send(RT_INFO->input_fd, &start_msg, sizeof(rt_header_t), 0) != sizeof(rt_header_t)) {
206                printf("Failed to send start message to server\n");
207                return -1;
208        }
209
210        return 1;
211}
212
213static int rt_fin_input(struct libtrace_t *libtrace) {
214        rt_header_t close_msg;
215
216        close_msg.type = RT_CLOSE;
217        close_msg.length = sizeof(rt_header_t);
218       
219        // Send a close message to the server
220        if (send(RT_INFO->input_fd, &close_msg, sizeof(rt_header_t), 0) != sizeof(rt_header_t)) {
221                printf("Failed to send close message to server\n");
222       
223        }
224        if (RT_INFO->dummy_erf) 
225                trace_destroy_dead(RT_INFO->dummy_erf);
226               
227        if (RT_INFO->dummy_pcap)
228                trace_destroy_dead(RT_INFO->dummy_pcap);
229
230        if (RT_INFO->dummy_wag)
231                trace_destroy_dead(RT_INFO->dummy_wag);
232        close(RT_INFO->input_fd);
233        return 0;
234}
235
236static int rt_read(struct libtrace_t *libtrace, void *buffer, size_t len) {
237        int numbytes;
238
239        if (buffer == 0)
240                buffer = malloc(len);
241        while(1) {
242#ifndef MSG_NOSIGNAL
243#  define MSG_NOSIGNAL 0
244#endif
245                if ((numbytes = recv(RT_INFO->input_fd,
246                                                buffer,
247                                                len,
248                                                MSG_NOSIGNAL)) == -1) {
249                        if (errno == EINTR) {
250                                //ignore EINTR in case
251                                // a caller is using signals
252                                continue;
253                        }
254                        perror("recv");
255                        return -1;
256                }
257                break;
258
259        }
260        return numbytes;
261}
262
263
264static int rt_set_format(struct libtrace_t *libtrace, 
265                struct libtrace_packet_t *packet, uint16_t format) {
266        switch (format) {
267                case RT_FORMAT_ERF:
268                        if (!RT_INFO->dummy_erf) {
269                                RT_INFO->dummy_erf = trace_create_dead("erf:-");
270                        }
271                        packet->trace = RT_INFO->dummy_erf;
272                        break;
273                case RT_FORMAT_PCAP:
274                        if (!RT_INFO->dummy_pcap) {
275                                RT_INFO->dummy_pcap = trace_create_dead("pcap:-");
276                        }
277                        packet->trace = RT_INFO->dummy_pcap;
278                        break;
279                case RT_FORMAT_WAG:
280                        if (!RT_INFO->dummy_wag) {
281                                RT_INFO->dummy_wag = trace_create_dead("wtf:-");
282                        }
283                        packet->trace = RT_INFO->dummy_wag;
284                        break;
285                default:
286                        printf("Unrecognised format: %d\n", format);
287                        return -1;
288        }
289        return 1;
290}               
291
292static void rt_set_payload(struct libtrace_packet_t *packet, uint16_t format) {
293        dag_record_t *erfptr;
294       
295        switch (format) {
296                case RT_FORMAT_ERF:
297                        erfptr = (dag_record_t *)packet->header;
298                       
299                        if (erfptr->flags.rxerror == 1) {
300                                packet->payload = NULL;
301                        } else {
302                                packet->payload = (char *)packet->buffer
303                                        + trace_get_framing_length(packet);
304                        }
305                        break;
306                default:
307                        packet->payload = (char *)packet->buffer +
308                                trace_get_framing_length(packet);
309                        break;
310        }
311}
312
313static int rt_send_ack(struct libtrace_t *libtrace, 
314                struct libtrace_packet_t *packet)  {
315       
316        static char *ack_buffer = 0;
317        char *buf_ptr;
318        int numbytes = 0;
319        int to_write = 0;
320        rt_header_t *hdr;
321        rt_ack_t *ack_hdr;
322       
323        if (!ack_buffer) {
324                ack_buffer = malloc(sizeof(rt_header_t) + sizeof(rt_ack_t));
325        }
326       
327        hdr = (rt_header_t *) ack_buffer;
328        ack_hdr = (rt_ack_t *) (ack_buffer + sizeof(rt_header_t));
329       
330        hdr->type = RT_ACK;
331        hdr->length = sizeof(rt_header_t) + sizeof(rt_ack_t);
332
333        ack_hdr->timestamp = trace_get_erf_timestamp(packet);
334       
335        to_write = hdr->length;
336        buf_ptr = ack_buffer;
337       
338        while (to_write > 0) {
339                numbytes = send(RT_INFO->input_fd, buf_ptr, to_write, 0); 
340                if (numbytes == -1) {
341                        if (errno == EINTR || errno == EAGAIN) {
342                                continue;
343                        }
344                        else {
345                                printf("Error sending ack\n");
346                                return -1;
347                        }
348                }
349                to_write = to_write - numbytes;
350                buf_ptr = buf_ptr + to_write;
351               
352        }
353
354        return 1;
355}
356       
357static int rt_read_packet(struct libtrace_t *libtrace, 
358                struct libtrace_packet_t *packet) {
359       
360        int numbytes = 0;
361        char buf[RP_BUFSIZE];
362        int read_required = 0;
363        rt_header_t pkt_hdr;
364        uint16_t format;
365        char msg_buf[RP_BUFSIZE];
366
367       
368        void *buffer = 0;
369
370        packet->trace = libtrace;
371
372        if (packet->buf_control == EXTERNAL) {
373                packet->buf_control = PACKET;
374                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
375        }
376
377        buffer = packet->buffer;
378        packet->header = packet->buffer;
379
380
381        do {
382                if (tracefifo_out_available(libtrace->fifo) == 0 || read_required) {
383                        if ((numbytes = rt_read(
384                                        libtrace,buf,RP_BUFSIZE))<=0) {
385                                return numbytes;
386                        }
387                        tracefifo_write(libtrace->fifo,buf,numbytes);
388                        read_required = 0;
389                }
390                // Read rt header
391                if (tracefifo_out_read(libtrace->fifo,
392                                &pkt_hdr, sizeof(rt_header_t)) == 0) {
393                        read_required = 1;
394                        continue;
395                }
396                tracefifo_out_update(libtrace->fifo, sizeof(rt_header_t));
397               
398                packet->size = pkt_hdr.length;
399                packet->type = pkt_hdr.type;
400
401                switch (packet->type) {
402                        case RT_DATA:
403                                if (tracefifo_out_read(libtrace->fifo, 
404                                                        &format, 
405                                                        sizeof(uint16_t)) == 0) 
406                                {
407                                        tracefifo_out_reset(libtrace->fifo);
408                                        read_required = 1;
409                                        break;
410                                }
411                                if (tracefifo_out_read(libtrace->fifo, buffer, 
412                                                        packet->size - sizeof(uint16_t))== 0)
413                                {
414                                        tracefifo_out_reset(libtrace->fifo);
415                                        read_required = 1;
416                                        break;
417                                }
418                                // set packet->trace
419                                if (rt_set_format(libtrace, packet, format) < 0) {
420                                        return -1;
421                                }
422                                // set packet->payload
423                                rt_set_payload(packet, format);
424                               
425                                // send ack
426                                if (rt_send_ack(libtrace, packet) == -1) {
427                                        return -1;
428                                }
429                               
430                                break;
431                        case RT_STATUS:
432                                if (tracefifo_out_read(libtrace->fifo, buffer,
433                                                sizeof(rt_status_t)) == 0)
434                                {
435                                        tracefifo_out_reset(libtrace->fifo);
436                                        read_required = 1;
437                                        break;
438                                }
439                                break;
440                        case RT_DUCK:
441                                if (tracefifo_out_read(libtrace->fifo, buffer,
442                                                sizeof(rt_duck_t)) == 0)
443                                {
444                                        tracefifo_out_reset(libtrace->fifo);
445                                        read_required = 1;
446                                        break;
447                                }
448                                break;
449
450                        case RT_END_DATA:
451                                // need to do something sensible here
452                                return 0;       
453
454                        case RT_PAUSE_ACK:
455                                // Check if we asked for a pause
456                               
457                               
458                                break;
459
460                        case RT_OPTION:
461                                // Server is requesting some option?
462
463                                break;
464
465                        default:
466                                printf("Bad rt client type: %d\n", packet->type);
467                                return -1;
468                               
469                }
470                if (read_required)
471                        continue;
472                               
473               
474                // got in our whole packet, so...
475                tracefifo_out_update(libtrace->fifo,packet->size);
476
477                tracefifo_ack_update(libtrace->fifo,packet->size +
478                                sizeof(rt_header_t));
479                return 1;
480        } while(1);
481       
482}
483
484static int rt_get_fd(const struct libtrace_packet_t *packet) {
485        return packet->trace->format_data->input_fd;
486}
487
488
489
490static void rt_help() {
491        printf("rt format module\n");
492        printf("Supported input URIs:\n");
493        printf("\trt:hostname:port\n");
494        printf("\trt:hostname (connects on default port)\n");
495        printf("\n");
496        printf("\te.g.: rt:localhost\n");
497        printf("\te.g.: rt:localhost:32500\n");
498        printf("\n");
499
500}
501
502
503static struct libtrace_format_t rt = {
504        "rt",
505        "$Id$",
506        "rt",
507        rt_init_input,                  /* init_input */
508        NULL,                           /* config_input */
509        rt_start_input,                 /* start_input */
510        NULL,                           /* init_output */
511        NULL,                           /* config_output */
512        NULL,                           /* start_output */
513        NULL,                           /* pause_output */
514        rt_fin_input,                   /* fin_input */
515        NULL,                           /* fin_output */
516        rt_read_packet,                 /* read_packet */
517        NULL,                           /* write_packet */
518        NULL,                           /* get_link_type */
519        NULL,                           /* get_direction */
520        NULL,                           /* set_direction */
521        NULL,                           /* get_erf_timestamp */
522        NULL,                           /* get_timeval */
523        NULL,                           /* get_seconds */
524        NULL,                           /* seek_erf */
525        NULL,                           /* seek_timeval */
526        NULL,                           /* seek_seconds */
527        NULL,                           /* get_capture_length */
528        NULL,                           /* get_wire_length */
529        NULL,                           /* get_framing_length */
530        NULL,                           /* set_capture_length */
531        rt_get_fd,                /* get_fd */
532        trace_event_device,             /* trace_event */
533        rt_help                   /* help */
534};
535
536void __attribute__((constructor)) rt_constructor() {
537        register_format(&rt);
538}
Note: See TracBrowser for help on using the repository browser.