source: lib/format_rt.c @ 6561682

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivegetfragoffhelplibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 6561682 was 6561682, checked in by Shane Alcock <salcock@…>, 16 years ago

Added rudimentary acking of received data

  • Property mode set to 100644
File size: 13.4 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2004 The University of Waikato, Hamilton, New Zealand.
5 * Authors: Daniel Lawson
6 *          Perry Lorier
7 *          Shane Alcock
8 *
9 * All rights reserved.
10 *
11 * This code has been developed by the University of Waikato WAND
12 * research group. For further information please see http://www.wand.net.nz/
13 *
14 * libtrace is free software; you can redistribute it and/or modify
15 * it under the terms of the GNU General Public License as published by
16 * the Free Software Foundation; either version 2 of the License, or
17 * (at your option) any later version.
18 *
19 * libtrace is distributed in the hope that it will be useful,
20 * but WITHOUT ANY WARRANTY; without even the implied warranty of
21 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22 * GNU General Public License for more details.
23 *
24 * You should have received a copy of the GNU General Public License
25 * along with libtrace; if not, write to the Free Software
26 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
27 *
28 * $Id$
29 *
30 */
31
32#define _GNU_SOURCE
33
34#include "config.h"
35#include "common.h"
36#include "libtrace.h"
37#include "libtrace_int.h"
38#include "format_helper.h"
39#include "parse_cmd.h"
40#include "rt_protocol.h"
41
42#ifdef HAVE_INTTYPES_H
43#  include <inttypes.h>
44#else
45#  error "Can't find inttypes.h - this needs to be fixed"
46#endif
47
48#ifdef HAVE_STDDEF_H
49#  include <stddef.h>
50#else
51# error "Can't find stddef.h - do you define ptrdiff_t elsewhere?"
52#endif
53#include <sys/types.h>
54#include <sys/socket.h>
55#include <sys/un.h>
56#include <sys/mman.h>
57#include <sys/stat.h>
58#include <unistd.h>
59#include <assert.h>
60#include <errno.h>
61#include <netdb.h>
62#include <fcntl.h>
63#include <getopt.h>
64#include <stdio.h>
65#include <string.h>
66#include <stdlib.h>
67
68#define RT_INFO libtrace->format_data
69
70struct libtrace_format_data_t {
71        char *hostname;
72        int port;
73        int input_fd;
74        int reliable;
75
76        struct libtrace_t *dummy_erf;
77        struct libtrace_t *dummy_pcap;
78        struct libtrace_t *dummy_wag;
79};
80
81static struct libtrace_format_t rt;
82
83static int rt_connect(struct libtrace_t *libtrace) {
84        struct hostent *he;
85        struct sockaddr_in remote;
86        rt_header_t connect_msg;
87        uint8_t reason; 
88       
89        if ((he=gethostbyname(RT_INFO->hostname)) == NULL) {
90                perror("gethostbyname");
91                return 0;
92        }
93        if ((RT_INFO->input_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
94                perror("socket");
95                return 0;
96        }
97
98        remote.sin_family = AF_INET;
99        remote.sin_port = htons(RT_INFO->port);
100        remote.sin_addr = *((struct in_addr *)he->h_addr);
101        bzero(&(remote.sin_zero), 8);
102
103        if (connect(RT_INFO->input_fd, (struct sockaddr *)&remote,
104                                sizeof(struct sockaddr)) == -1) {
105                perror("connect (inet)");
106                return 0;
107        }
108       
109        // We are connected, now receive message from server
110       
111        if (recv(RT_INFO->input_fd, &connect_msg, sizeof(rt_header_t), 0) != sizeof(rt_header_t) ) {
112                printf("An error occured while connecting to %s\n", RT_INFO->hostname);
113                return -1;
114        }
115
116        switch (connect_msg.type) {
117                case RT_DENY_CONN:
118                       
119                        if (recv(RT_INFO->input_fd, &reason, 1, 0) != 1) {
120                                reason = 0;
121                        }       
122                        printf("Connection attempt is denied by the server: %s\n",
123                                        rt_deny_reason(reason));
124                        return -1;
125                case RT_HELLO:
126                        // do something with options
127                       
128                       
129                        return 1;
130                case RT_DATA:
131                        printf("Server needs to send RT_HELLO before sending data to clients\n");
132                        return -1;
133                default:
134                        printf("Unexpected message type: %d\n", connect_msg.type);
135                        return -1;
136        }
137       
138        return -1;
139}
140
141
142static int rt_init_input(struct libtrace_t *libtrace) {
143        char *scan;
144        char *uridata = libtrace->uridata;
145        libtrace->format_data = (struct libtrace_format_data_t *)
146                malloc(sizeof(struct libtrace_format_data_t));
147
148        //libtrace->sourcetype = RT;
149
150        RT_INFO->dummy_erf = NULL;
151        RT_INFO->dummy_pcap = NULL;
152        RT_INFO->dummy_wag = NULL;
153       
154        if (strlen(uridata) == 0) {
155                RT_INFO->hostname =
156                        strdup("localhost");
157                RT_INFO->port =
158                        COLLECTOR_PORT;
159        } else {
160                if ((scan = strchr(uridata,':')) == NULL) {
161                        RT_INFO->hostname =
162                                strdup(uridata);
163                        RT_INFO->port =
164                                COLLECTOR_PORT;
165                } else {
166                        RT_INFO->hostname =
167                                (char *)strndup(uridata,
168                                                (scan - uridata));
169                        RT_INFO->port =
170                                atoi(++scan);
171                }
172        }
173
174        return rt_connect(libtrace);
175}
176       
177static int rt_start_input(struct libtrace_t *libtrace) {
178        rt_header_t start_msg;
179
180        start_msg.type = RT_START;
181        start_msg.length = sizeof(rt_header_t);
182       
183        // Need to send start message to server
184        if (send(RT_INFO->input_fd, &start_msg, sizeof(rt_header_t), 0) != sizeof(rt_header_t)) {
185                printf("Failed to send start message to server\n");
186                return -1;
187        }
188
189        return 1;
190}
191
192static int rt_fin_input(struct libtrace_t *libtrace) {
193        rt_header_t close_msg;
194
195        close_msg.type = RT_CLOSE;
196        close_msg.length = sizeof(rt_header_t);
197       
198        // Send a close message to the server
199        if (send(RT_INFO->input_fd, &close_msg, sizeof(rt_header_t), 0) != sizeof(rt_header_t)) {
200                printf("Failed to send close message to server\n");
201       
202        }
203        if (RT_INFO->dummy_erf) 
204                trace_destroy_dead(RT_INFO->dummy_erf);
205               
206        if (RT_INFO->dummy_pcap)
207                trace_destroy_dead(RT_INFO->dummy_pcap);
208
209        if (RT_INFO->dummy_wag)
210                trace_destroy_dead(RT_INFO->dummy_wag);
211        close(RT_INFO->input_fd);
212        return 0;
213}
214
215static int rt_read(struct libtrace_t *libtrace, void *buffer, size_t len) {
216        int numbytes;
217
218        if (buffer == 0)
219                buffer = malloc(len);
220        while(1) {
221#ifndef MSG_NOSIGNAL
222#  define MSG_NOSIGNAL 0
223#endif
224                if ((numbytes = recv(RT_INFO->input_fd,
225                                                buffer,
226                                                len,
227                                                MSG_NOSIGNAL)) == -1) {
228                        if (errno == EINTR) {
229                                //ignore EINTR in case
230                                // a caller is using signals
231                                continue;
232                        }
233                        perror("recv");
234                        return -1;
235                }
236                break;
237
238        }
239        return numbytes;
240}
241
242
243static int rt_set_format(struct libtrace_t *libtrace, 
244                struct libtrace_packet_t *packet, uint16_t format) {
245        switch (format) {
246                case RT_FORMAT_ERF:
247                        if (!RT_INFO->dummy_erf) {
248                                RT_INFO->dummy_erf = trace_create_dead("erf:-");
249                        }
250                        packet->trace = RT_INFO->dummy_erf;
251                        break;
252                case RT_FORMAT_PCAP:
253                        if (!RT_INFO->dummy_pcap) {
254                                RT_INFO->dummy_pcap = trace_create_dead("pcap:-");
255                        }
256                        packet->trace = RT_INFO->dummy_pcap;
257                        break;
258                case RT_FORMAT_WAG:
259                        if (!RT_INFO->dummy_wag) {
260                                RT_INFO->dummy_wag = trace_create_dead("wtf:-");
261                        }
262                        packet->trace = RT_INFO->dummy_wag;
263                        break;
264                default:
265                        printf("Unrecognised format: %d\n", format);
266                        return -1;
267        }
268        return 1;
269}               
270
271static void rt_set_payload(struct libtrace_packet_t *packet, uint16_t format) {
272        dag_record_t *erfptr;
273       
274        switch (format) {
275                case RT_FORMAT_ERF:
276                        erfptr = (dag_record_t *)packet->header;
277                       
278                        if (erfptr->flags.rxerror == 1) {
279                                packet->payload = NULL;
280                        } else {
281                                packet->payload = (char *)packet->buffer
282                                        + trace_get_framing_length(packet);
283                        }
284                        break;
285                default:
286                        packet->payload = (char *)packet->buffer +
287                                trace_get_framing_length(packet);
288                        break;
289        }
290}
291
292static int rt_send_ack(struct libtrace_t *libtrace, 
293                struct libtrace_packet_t *packet)  {
294       
295        static char *ack_buffer = 0;
296        char *buf_ptr;
297        int numbytes = 0;
298        int to_write = 0;
299        rt_header_t *hdr;
300        rt_ack_t *ack_hdr;
301       
302        if (!ack_buffer) {
303                ack_buffer = malloc(sizeof(rt_header_t) + sizeof(rt_ack_t));
304        }
305       
306        hdr = (rt_header_t) ack_buffer;
307        ack_hdr = (rt_ack_t) (ack_buffer + sizeof(rt_header_t));
308       
309        hdr->type = RT_ACK;
310        hdr->length = sizeof(rt_header_t) + sizeof(rt_ack_t);
311
312        ack_hdr->ts = trace_get_erf_timestamp(packet);
313       
314        to_write = hdr->length;
315        buf_ptr = ack_buffer;
316       
317        while (to_write > 0) {
318                numbytes = send(RT_INFO.input_fd, buf_ptr, to_write); 
319                if (numbytes == -1) {
320                        if (errno == EINTR || errno == EAGAIN) {
321                                continue;
322                        }
323                        else {
324                                printf("Error sending ack\n");
325                                return -1;
326                        }
327                }
328                to_write = to_write - numbytes;
329                buf_ptr = buf_ptr + to_write;
330               
331        }
332
333        return 1;
334}
335       
336static int rt_read_packet(struct libtrace_t *libtrace, 
337                struct libtrace_packet_t *packet) {
338       
339        int numbytes = 0;
340        char buf[RP_BUFSIZE];
341        int read_required = 0;
342        rt_header_t pkt_hdr;
343        uint16_t format;
344        char msg_buf[RP_BUFSIZE];
345
346       
347        void *buffer = 0;
348
349        packet->trace = libtrace;
350
351        if (packet->buf_control == EXTERNAL) {
352                packet->buf_control = PACKET;
353                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
354        }
355
356        buffer = packet->buffer;
357        packet->header = packet->buffer;
358
359
360        do {
361                if (tracefifo_out_available(libtrace->fifo) == 0 || read_required) {
362                        if ((numbytes = rt_read(
363                                        libtrace,buf,RP_BUFSIZE))<=0) {
364                                return numbytes;
365                        }
366                        tracefifo_write(libtrace->fifo,buf,numbytes);
367                        read_required = 0;
368                }
369                // Read rt header
370                if (tracefifo_out_read(libtrace->fifo,
371                                &pkt_hdr, sizeof(rt_header_t)) == 0) {
372                        read_required = 1;
373                        continue;
374                }
375                tracefifo_out_update(libtrace->fifo, sizeof(rt_header_t));
376               
377                packet->size = pkt_hdr.length;
378                packet->type = pkt_hdr.type;
379
380                switch (packet->type) {
381                        case RT_DATA:
382                                if (tracefifo_out_read(libtrace->fifo, 
383                                                        &format, 
384                                                        sizeof(uint16_t)) == 0) 
385                                {
386                                        tracefifo_out_reset(libtrace->fifo);
387                                        read_required = 1;
388                                        break;
389                                }
390                                if (tracefifo_out_read(libtrace->fifo, buffer, 
391                                                        packet->size - sizeof(uint16_t))== 0)
392                                {
393                                        tracefifo_out_reset(libtrace->fifo);
394                                        read_required = 1;
395                                        break;
396                                }
397                                // set packet->trace
398                                if (rt_set_format(libtrace, packet, format) < 0) {
399                                        return -1;
400                                }
401                                // set packet->payload
402                                rt_set_payload(packet, format);
403                               
404                                // send ack
405                                if (rt_send_ack(libtrace, packet) == -1) {
406                                        return -1;
407                                }
408                               
409                                break;
410                        case RT_STATUS:
411                                if (tracefifo_out_read(libtrace->fifo, buffer,
412                                                sizeof(rt_status_t)) == 0)
413                                {
414                                        tracefifo_out_reset(libtrace->fifo);
415                                        read_required = 1;
416                                        break;
417                                }
418                                break;
419                        case RT_DUCK:
420                                if (tracefifo_out_read(libtrace->fifo, buffer,
421                                                sizeof(rt_duck_t)) == 0)
422                                {
423                                        tracefifo_out_reset(libtrace->fifo);
424                                        read_required = 1;
425                                        break;
426                                }
427                                break;
428
429                        case RT_END_DATA:
430                                // need to do something sensible here
431                               
432                                break;
433
434                        case RT_PAUSE_ACK:
435                                // Check if we asked for a pause
436                               
437                               
438                                break;
439
440                        case RT_OPTION:
441                                // Server is requesting some option?
442
443                                break;
444
445                        default:
446                                printf("Bad rt client type: %d\n", packet->type);
447                                return -1;
448                               
449                }
450                if (read_required)
451                        continue;
452                               
453               
454                // got in our whole packet, so...
455                tracefifo_out_update(libtrace->fifo,packet->size);
456
457                tracefifo_ack_update(libtrace->fifo,packet->size +
458                                sizeof(rt_header_t));
459                return 1;
460        } while(1);
461       
462}
463
464static int rt_get_fd(const struct libtrace_packet_t *packet) {
465        return packet->trace->format_data->input_fd;
466}
467
468
469
470static void rt_help() {
471        printf("rt format module\n");
472        printf("Supported input URIs:\n");
473        printf("\trt:hostname:port\n");
474        printf("\trt:hostname (connects on default port)\n");
475        printf("\n");
476        printf("\te.g.: rt:localhost\n");
477        printf("\te.g.: rt:localhost:32500\n");
478        printf("\n");
479
480}
481
482
483static struct libtrace_format_t rt = {
484        "rt",
485        "$Id$",
486        "rt",
487        rt_init_input,            /* init_input */
488        NULL,                           /* config_input */
489        rt_start_input,           /* start_input */
490        NULL,                           /* init_output */
491        NULL,                           /* config_output */
492        NULL,                           /* start_output */
493        rt_fin_input,             /* fin_input */
494        NULL,                           /* fin_output */
495        rt_read_packet,           /* read_packet */
496        NULL,                           /* write_packet */
497        NULL,                           /* get_link_type */
498        NULL,                           /* get_direction */
499        NULL,                           /* set_direction */
500        NULL,                           /* get_erf_timestamp */
501        NULL,                           /* get_timeval */
502        NULL,                           /* get_seconds */
503        NULL,                           /* get_capture_length */
504        NULL,                           /* get_wire_length */
505        NULL,                           /* get_framing_length */
506        NULL,                           /* set_capture_length */
507        rt_get_fd,                /* get_fd */
508        trace_event_device,             /* trace_event */
509        rt_help                   /* help */
510};
511
512void __attribute__((constructor)) rt_constructor() {
513        register_format(&rt);
514}
Note: See TracBrowser for help on using the repository browser.