source: lib/format_erf.c @ 5945854

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivegetfragoffhelplibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 5945854 was 5945854, checked in by Shane Alcock <salcock@…>, 15 years ago

Fixed packet->size calculation for dag packets - dag seems to like throwing in extra padding at the end of packet too

  • Property mode set to 100644
File size: 31.1 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2004 The University of Waikato, Hamilton, New Zealand.
5 * Authors: Daniel Lawson
6 *          Perry Lorier
7 *         
8 * All rights reserved.
9 *
10 * This code has been developed by the University of Waikato WAND
11 * research group. For further information please see http://www.wand.net.nz/
12 *
13 * libtrace is free software; you can redistribute it and/or modify
14 * it under the terms of the GNU General Public License as published by
15 * the Free Software Foundation; either version 2 of the License, or
16 * (at your option) any later version.
17 *
18 * libtrace is distributed in the hope that it will be useful,
19 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21 * GNU General Public License for more details.
22 *
23 * You should have received a copy of the GNU General Public License
24 * along with libtrace; if not, write to the Free Software
25 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26 *
27 * $Id$
28 *
29 */
30#define _GNU_SOURCE
31
32#include "config.h"
33#include "common.h"
34#include "libtrace.h"
35#include "libtrace_int.h"
36#include "format_helper.h"
37#include "parse_cmd.h"
38
39#include <assert.h>
40#include <errno.h>
41#include <fcntl.h>
42#include <stdio.h>
43#include <string.h>
44#include <stdlib.h>
45
46#ifdef HAVE_DAG
47#include <sys/mman.h>
48#endif
49
50#ifdef WIN32
51#  include <io.h>
52#  include <share.h>
53#  define PATH_MAX _MAX_PATH
54#  define snprintf sprintf_s
55#else
56#  include <netdb.h>
57#  ifndef PATH_MAX
58#       define PATH_MAX 4096
59#  endif
60#  include <sys/ioctl.h>
61#endif
62
63
64#define COLLECTOR_PORT 3435
65
66static struct libtrace_format_t erf;
67static struct libtrace_format_t rtclient;
68#ifdef HAVE_DAG
69static struct libtrace_format_t dag;
70#endif
71
72#define DATA(x) ((struct erf_format_data_t *)x->format_data)
73#define DATAOUT(x) ((struct erf_format_data_out_t *)x->format_data)
74
75#define CONNINFO DATA(libtrace)->conn_info
76#define INPUT DATA(libtrace)->input
77#define OUTPUT DATAOUT(libtrace)->output
78#ifdef HAVE_DAG
79#define DAG DATA(libtrace)->dag
80#define DUCK DATA(libtrace)->duck
81#endif
82#define OPTIONS DATAOUT(libtrace)->options
83struct erf_format_data_t {
84        union {
85                struct {
86                        char *hostname;
87                        short port;
88                } rt;
89        } conn_info;
90       
91        union {
92                int fd;
93                libtrace_io_t *file;
94        } input;
95
96        struct {
97                enum { INDEX_UNKNOWN=0, INDEX_NONE, INDEX_EXISTS } exists;
98                libtrace_io_t *index;
99                off_t index_len;
100        } seek;
101
102#ifdef HAVE_DAG
103        struct {
104                uint32_t last_duck;     
105                uint32_t duck_freq;
106                uint32_t last_pkt;
107                libtrace_t *dummy_duck;
108        } duck;
109       
110        struct {
111                void *buf; 
112                unsigned bottom;
113                unsigned top;
114                unsigned diff;
115                unsigned curr;
116                unsigned offset;
117                unsigned int dagstream;
118        } dag;
119#endif
120};
121
122struct erf_format_data_out_t {
123        union {
124                struct {
125                        char *hostname;
126                        short port;
127                } rt;
128                char *path;
129        } conn_info;
130
131        union {
132                struct {
133                        int level;
134                        int fileflag;
135                } erf;
136               
137        } options;
138       
139        union {
140                int fd;
141                struct rtserver_t * rtserver;
142                libtrace_io_t *file;
143        } output;
144};
145
146/** Structure holding status information for a packet */
147typedef struct libtrace_packet_status {
148        uint8_t type;
149        uint8_t reserved;
150        uint16_t message;
151} libtrace_packet_status_t;
152
153typedef struct erf_index_t {
154        uint64_t timestamp;
155        uint64_t offset; 
156} erf_index_t;
157
158#ifdef HAVE_DAG
159static int dag_init_input(libtrace_t *libtrace) {
160        struct stat buf;
161
162        libtrace->format_data = (struct erf_format_data_t *)
163                malloc(sizeof(struct erf_format_data_t));
164        if (stat(libtrace->uridata, &buf) == -1) {
165                trace_set_err(libtrace,errno,"stat(%s)",libtrace->uridata);
166                return -1;
167        } 
168
169        DAG.dagstream = 0;
170       
171        if (S_ISCHR(buf.st_mode)) {
172                /* DEVICE */
173                if((INPUT.fd = dag_open(libtrace->uridata)) < 0) {
174                        trace_set_err(libtrace,errno,"Cannot open DAG %s",
175                                        libtrace->uridata);
176                        return -1;
177                }
178                if((DAG.buf = (void *)dag_mmap(INPUT.fd)) == MAP_FAILED) {
179                        trace_set_err(libtrace,errno,"Cannot mmap DAG %s",
180                                        libtrace->uridata);
181                        return -1;
182                }
183        } else {
184                trace_set_err(libtrace,errno,"Not a valid dag device: %s",
185                                libtrace->uridata);
186                return -1;
187        }
188
189        DUCK.last_duck = 0;
190        DUCK.duck_freq = 0; 
191        DUCK.last_pkt = 0;
192        DUCK.dummy_duck = NULL;
193       
194        return 0;
195}
196
197static int dag_config_input(libtrace_t *libtrace, trace_option_t option,
198                                void *data) {
199        switch(option) {
200                case TRACE_META_FREQ:
201                        DUCK.duck_freq = *(int *)data;
202                        return 0;
203                case TRACE_OPTION_SNAPLEN:
204                        /* Surely we can set this?? Fall through for now*/
205               
206                case TRACE_OPTION_PROMISC:
207                        /* DAG already operates in a promisc fashion */
208
209                case TRACE_OPTION_FILTER:
210
211                default:
212                        trace_set_err(libtrace, TRACE_ERR_UNKNOWN_OPTION,
213                                        "Unknown or unsupported option: %i",
214                                        option);
215                        return -1;
216        }
217        assert (0);
218}
219#endif
220
221/* Dag erf ether packets have a 2 byte padding before the packet
222 * so that the ip header is aligned on a 32 bit boundary.
223 */
224static int erf_get_padding(const libtrace_packet_t *packet)
225{
226        if (packet->trace->format->type==TRACE_FORMAT_ERF) {
227                dag_record_t *erfptr = (dag_record_t *)packet->header;
228                switch(erfptr->type) {
229                        case TYPE_ETH:          return 2;
230                        default:                return 0;
231                }
232        }
233        else {
234                switch(trace_get_link_type(packet)) {
235                        case TYPE_ETH:          return 2;
236                        default:                return 0;
237                }
238        }
239}
240
241static int erf_get_framing_length(const libtrace_packet_t *packet)
242{
243        return dag_record_size + erf_get_padding(packet);
244}
245
246
247static int erf_init_input(libtrace_t *libtrace) 
248{
249        libtrace->format_data = malloc(sizeof(struct erf_format_data_t));
250       
251        INPUT.file = 0;
252
253        return 0; /* success */
254}
255
256static int erf_start_input(libtrace_t *libtrace)
257{
258        if (INPUT.file)
259                return 0; /* success */
260
261        INPUT.file = trace_open_file(libtrace);
262
263        if (!INPUT.file)
264                return -1;
265
266        return 0; /* success */
267}
268
269/* Binary search through the index to find the closest point before
270 * the packet.  Consider in future having a btree index perhaps?
271 */
272static int erf_fast_seek_start(libtrace_t *libtrace,uint64_t erfts)
273{
274        size_t max_off = DATA(libtrace)->seek.index_len/sizeof(erf_index_t);
275        size_t min_off = 0;
276        off_t current;
277        erf_index_t record;
278        do {
279                current=(max_off+min_off)>>2;
280
281                libtrace_io_seek(DATA(libtrace)->seek.index,
282                                current*sizeof(record),
283                                SEEK_SET);
284                libtrace_io_read(DATA(libtrace)->seek.index,
285                                &record,sizeof(record));
286                if (record.timestamp < erfts) {
287                        min_off=current;
288                }
289                if (record.timestamp > erfts) {
290                        max_off=current;
291                }
292                if (record.timestamp == erfts)
293                        break;
294        } while(min_off<max_off);
295
296        /* If we've passed it, seek backwards.  This loop shouldn't
297         * execute more than twice.
298         */
299        do {
300                libtrace_io_seek(DATA(libtrace)->seek.index,
301                                current*sizeof(record),SEEK_SET);
302                libtrace_io_read(DATA(libtrace)->seek.index,
303                                &record,sizeof(record));
304                current--;
305        } while(record.timestamp>erfts);
306
307        /* We've found our location in the trace, now use it. */
308        libtrace_io_seek(INPUT.file,record.offset,SEEK_SET);
309
310        return 0; /* success */
311}
312
313/* There is no index.  Seek through the entire trace from the start, nice
314 * and slowly.
315 */
316static int erf_slow_seek_start(libtrace_t *libtrace,uint64_t erfts)
317{
318        if (INPUT.file) {
319                libtrace_io_close(INPUT.file);
320        }
321        INPUT.file = trace_open_file(libtrace);
322        if (!INPUT.file)
323                return -1;
324        return 0;
325}
326
327static int erf_seek_erf(libtrace_t *libtrace,uint64_t erfts)
328{
329        libtrace_packet_t *packet;
330        off_t off = 0;
331
332        if (DATA(libtrace)->seek.exists==INDEX_UNKNOWN) {
333                char buffer[PATH_MAX];
334                snprintf(buffer,sizeof(buffer),"%s.idx",libtrace->uridata);
335                DATA(libtrace)->seek.index=libtrace_io_open(buffer,"rb");
336                if (DATA(libtrace)->seek.index) {
337                        DATA(libtrace)->seek.exists=INDEX_EXISTS;
338                }
339                else {
340                        DATA(libtrace)->seek.exists=INDEX_NONE;
341                }
342        }
343
344        /* If theres an index, use it to find the nearest packet that isn't
345         * after the time we're looking for.  If there is no index we need
346         * to seek slowly through the trace from the beginning.  Sigh.
347         */
348        switch(DATA(libtrace)->seek.exists) {
349                case INDEX_EXISTS:
350                        erf_fast_seek_start(libtrace,erfts);
351                        break;
352                case INDEX_NONE:
353                        erf_slow_seek_start(libtrace,erfts);
354                        break;
355                case INDEX_UNKNOWN:
356                        assert(0);
357                        break;
358        }
359
360        /* Now seek forward looking for the correct timestamp */
361        packet=trace_create_packet();
362        do {
363                trace_read_packet(libtrace,packet);
364                if (trace_get_erf_timestamp(packet)==erfts)
365                        break;
366                off=libtrace_io_tell(INPUT.file);
367        } while(trace_get_erf_timestamp(packet)<erfts);
368
369        libtrace_io_seek(INPUT.file,off,SEEK_SET);
370
371        return 0;
372}
373
374static int rtclient_init_input(libtrace_t *libtrace) {
375        char *scan;
376        libtrace->format_data = malloc(sizeof(struct erf_format_data_t));
377
378        if (strlen(libtrace->uridata) == 0) {
379                CONNINFO.rt.hostname = 
380                        strdup("localhost");
381                CONNINFO.rt.port = 
382                        COLLECTOR_PORT;
383        } else {
384                if ((scan = strchr(libtrace->uridata,':')) == NULL) {
385                        CONNINFO.rt.hostname = 
386                                strdup(libtrace->uridata);
387                        CONNINFO.rt.port =
388                                COLLECTOR_PORT;
389                } else {
390                        CONNINFO.rt.hostname = 
391                                (char *)strndup(libtrace->uridata,
392                                                (scan - libtrace->uridata));
393                        CONNINFO.rt.port = 
394                                atoi(++scan);
395                }
396        }
397
398        return 0; /* success */
399}
400
401static int rtclient_start_input(libtrace_t *libtrace)
402{
403        struct hostent *he;
404        struct sockaddr_in remote;
405        if ((he=gethostbyname(CONNINFO.rt.hostname)) == NULL) { 
406                trace_set_err(libtrace,TRACE_ERR_INIT_FAILED,"failed to resolve %s",
407                                CONNINFO.rt.hostname);
408                return -1;
409        } 
410        if ((INPUT.fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
411                trace_set_err(libtrace,errno,"socket(AF_INET,SOCK_STREAM)");
412                return -1;
413        }
414
415        remote.sin_family = AF_INET;   
416        remote.sin_port = htons(CONNINFO.rt.port);
417        remote.sin_addr = *((struct in_addr *)he->h_addr);
418        memset(&(remote.sin_zero), 0, 8);
419
420        if (connect(INPUT.fd, (struct sockaddr *)&remote,
421                                sizeof(struct sockaddr)) == -1) {
422                trace_set_err(libtrace,errno,"connect(%s)",
423                                CONNINFO.rt.hostname);
424                return -1;
425        }
426        return 0; /* success */
427}
428
429static int rtclient_pause_input(libtrace_t *libtrace)
430{
431        close(INPUT.fd);
432        return 0; /* success */
433}
434
435static int erf_init_output(libtrace_out_t *libtrace) {
436        libtrace->format_data = malloc(sizeof(struct erf_format_data_out_t));
437
438        OPTIONS.erf.level = 0;
439        OPTIONS.erf.fileflag = O_CREAT | O_WRONLY;
440        OUTPUT.file = 0;
441
442        return 0;
443}
444
445static int erf_config_output(libtrace_out_t *libtrace, trace_option_output_t option,
446                void *value) {
447
448        switch (option) {
449                case TRACE_OPTION_OUTPUT_COMPRESS:
450                        OPTIONS.erf.level = *(int*)value;
451                        return 0;
452                case TRACE_OPTION_OUTPUT_FILEFLAGS:
453                        OPTIONS.erf.fileflag = *(int*)value;
454                        return 0;
455                default:
456                        /* Unknown option */
457                        trace_set_err_out(libtrace,TRACE_ERR_UNKNOWN_OPTION,
458                                        "Unknown option");
459                        return -1;
460        }
461}
462
463
464#ifdef HAVE_DAG
465static int dag_pause_input(libtrace_t *libtrace) {
466#ifdef DAG_VERSION_2_4
467        dag_stop(INPUT.fd);
468#else
469        if (dag_stop_stream(INPUT.fd, DAG.dagstream) < 0) {
470                trace_set_err(libtrace, errno, "Could not stop DAG stream");
471                return -1;
472        }
473        if (dag_detach_stream(INPUT.fd, DAG.dagstream) < 0) {
474                trace_set_err(libtrace, errno, "Could not detach DAG stream");
475                return -1;
476        }
477#endif
478        return 0; /* success */
479}
480
481static int dag_fin_input(libtrace_t *libtrace) {
482        /* dag pause input implicitly called to cleanup before this */
483       
484        dag_close(INPUT.fd);
485        if (DUCK.dummy_duck)
486                trace_destroy_dead(DUCK.dummy_duck);
487        free(libtrace->format_data);
488        return 0; /* success */
489}
490#endif
491
492static int rtclient_fin_input(libtrace_t *libtrace) {
493        free(CONNINFO.rt.hostname);
494        close(INPUT.fd);
495        free(libtrace->format_data);
496        return 0;
497}
498
499static int erf_fin_input(libtrace_t *libtrace) {
500        if (INPUT.file)
501                libtrace_io_close(INPUT.file);
502        free(libtrace->format_data);
503        return 0;
504}
505
506static int erf_fin_output(libtrace_out_t *libtrace) {
507        libtrace_io_close(OUTPUT.file);
508        free(libtrace->format_data);
509        return 0;
510}
511 
512#ifdef HAVE_DAG
513#ifdef DAG_VERSION_2_4
514static int dag_get_duckinfo(libtrace_t *libtrace, 
515                                libtrace_packet_t *packet) {
516        dag_inf lt_dag_inf;
517       
518        if (packet->buf_control == TRACE_CTRL_EXTERNAL || 
519                        !packet->buffer) {
520                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
521                packet->buf_control = TRACE_CTRL_PACKET;
522                if (!packet->buffer) {
523                        trace_set_err(libtrace, errno,
524                                        "Cannot allocate packet buffer");
525                        return -1;
526                }
527        }
528       
529        packet->header = 0;
530        packet->payload = packet->buffer;
531       
532        if ((ioctl(INPUT.fd, DAG_IOINF, &lt_dag_inf) < 0)) {
533                trace_set_err(libtrace, errno,
534                                "Error using DAG_IOINF");
535                return -1;
536        }
537        if (!IsDUCK(&lt_dag_inf)) {
538                printf("WARNING: %s does not have modern clock support - No DUCK information will be gathered\n", libtrace->uridata);
539                return 0;
540        }
541
542        if ((ioctl(INPUT.fd, DAG_IOGETDUCK, (duck_inf *)packet->payload) 
543                                < 0)) {
544                trace_set_err(libtrace, errno, "Error using DAG_IOGETDUCK");
545                return -1;
546        }
547
548        packet->type = RT_DUCK_2_4;
549        packet->size = sizeof(duck_inf);
550        if (!DUCK.dummy_duck) 
551                DUCK.dummy_duck = trace_create_dead("duck:dummy");
552        packet->trace = DUCK.dummy_duck;
553        return packet->size;
554}       
555#else
556static int dag_get_duckinfo(libtrace_t *libtrace, 
557                                libtrace_packet_t *packet) {
558        daginf_t lt_dag_inf;
559       
560        if (packet->buf_control == TRACE_CTRL_EXTERNAL || 
561                        !packet->buffer) {
562                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
563                packet->buf_control = TRACE_CTRL_PACKET;
564                if (!packet->buffer) {
565                        trace_set_err(libtrace, errno,
566                                        "Cannot allocate packet buffer");
567                        return -1;
568                }
569        }
570       
571        packet->header = 0;
572        packet->payload = packet->buffer;
573       
574        /* No need to check if we can get DUCK or not - we're modern
575         * enough */
576        if ((ioctl(INPUT.fd, DAGIOCDUCK, (duckinf_t *)packet->payload) 
577                                < 0)) {
578                trace_set_err(libtrace, errno, "Error using DAGIOCDUCK");
579                return -1;
580        }
581
582        packet->type = RT_DUCK_2_5;
583        packet->size = sizeof(duckinf_t);
584        if (!DUCK.dummy_duck) 
585                DUCK.dummy_duck = trace_create_dead("rt:localhost:3434");
586        packet->trace = DUCK.dummy_duck;       
587        return packet->size;
588}       
589#endif
590
591static int dag_read(libtrace_t *libtrace, int block_flag) {
592
593        if (DAG.diff != 0) 
594                return DAG.diff;
595
596        DAG.bottom = DAG.top;
597
598        DAG.top = dag_offset(
599                        INPUT.fd,
600                        &(DAG.bottom),
601                        block_flag);
602
603        DAG.diff = DAG.top - DAG.bottom;
604
605        DAG.offset = 0;
606        return DAG.diff;
607}
608
609/* FIXME: dag_read_packet shouldn't update the pointers, dag_fin_packet
610 * should do that.
611 */
612static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
613        int numbytes;
614        int size;
615        struct timeval tv;
616        dag_record_t *erfptr;
617
618        if (DUCK.last_pkt - DUCK.last_duck > DUCK.duck_freq && 
619                        DUCK.duck_freq != 0) {
620                size = dag_get_duckinfo(libtrace, packet);
621                DUCK.last_duck = DUCK.last_pkt;
622                if (size != 0) {
623                        return size;
624                }
625                /* No DUCK support, so don't waste our time anymore */
626                DUCK.duck_freq = 0;
627        }
628       
629        if (packet->buf_control == TRACE_CTRL_PACKET) {
630                packet->buf_control = TRACE_CTRL_EXTERNAL;
631                free(packet->buffer);
632                packet->buffer = 0;
633        }
634       
635        packet->type = RT_DATA_ERF;
636       
637        if ((numbytes = dag_read(libtrace,0)) < 0) 
638                return numbytes;
639        assert(numbytes>0);
640
641        /*DAG always gives us whole packets */
642        erfptr = (dag_record_t *) ((char *)DAG.buf + 
643                        (DAG.bottom + DAG.offset));
644        size = ntohs(erfptr->rlen);
645
646        assert( size >= dag_record_size );
647        assert( size < LIBTRACE_PACKET_BUFSIZE);
648       
649        packet->buffer = erfptr;
650        packet->header = erfptr;
651        if (((dag_record_t *)packet->buffer)->flags.rxerror == 1) {
652                /* rxerror means the payload is corrupt - drop it
653                 * by tweaking rlen */
654                packet->payload = NULL;
655                erfptr->rlen = htons(erf_get_framing_length(packet));
656        } else {
657                packet->payload = (char*)packet->buffer
658                        + erf_get_framing_length(packet);
659        }
660
661        DAG.offset += size;
662        DAG.diff -= size;
663
664        if (packet->payload != NULL)
665                packet->size = trace_get_capture_length(packet) + 
666                        erf_get_framing_length(packet);
667        else 
668                packet->size = erf_get_framing_length(packet);
669       
670        tv = trace_get_timeval(packet);
671        DUCK.last_pkt = tv.tv_sec;
672       
673        return (packet->size);
674}
675
676static int dag_start_input(libtrace_t *libtrace) {
677#ifdef DAG_VERSION_2_4
678        if(dag_start(INPUT.fd) < 0) {
679                trace_set_err(libtrace,errno,"Cannot start DAG %s",
680                                libtrace->uridata);
681                return -1;
682        }
683#else
684        if (dag_attach_stream(INPUT.fd, DAG.dagstream, 0, 0) < 0) {
685                trace_set_err(libtrace, errno, "Cannot attach DAG stream");
686                return -1;
687        }
688        if (dag_start_stream(INPUT.fd, DAG.dagstream) < 0) {
689                trace_set_err(libtrace, errno, "Cannot start DAG stream");
690                return -1;
691        }
692#endif
693        /* dags appear to have a bug where if you call dag_start after
694         * calling dag_stop, and at least one packet has arrived, bad things
695         * happen.  flush the memory hole
696         */
697        while(dag_read(libtrace,1)!=0)
698                DAG.diff=0;
699        return 0;
700}
701#endif
702
703static int erf_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
704        int numbytes;
705        int size;
706        void *buffer2 = packet->buffer;
707        int rlen;
708
709        if (!packet->buffer || packet->buf_control == TRACE_CTRL_EXTERNAL) {
710                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
711                packet->buf_control = TRACE_CTRL_PACKET;
712                if (!packet->buffer) {
713                        trace_set_err(libtrace, errno, 
714                                        "Cannot allocate memory");
715                        return -1;
716                }
717        }
718
719       
720       
721        packet->header = packet->buffer;
722        packet->type = RT_DATA_ERF;
723
724        if ((numbytes=libtrace_io_read(INPUT.file,
725                                        packet->buffer,
726                                        dag_record_size)) == -1) {
727                trace_set_err(libtrace,errno,"read(%s)",
728                                libtrace->uridata);
729                return -1;
730        }
731        /* EOF */
732        if (numbytes == 0) {
733                return 0;
734        }
735
736        rlen = ntohs(((dag_record_t *)packet->buffer)->rlen);
737        buffer2 = (char*)packet->buffer + dag_record_size;
738        size = rlen - dag_record_size;
739
740        assert(size < LIBTRACE_PACKET_BUFSIZE && size >= dag_record_size);
741
742        /* Unknown/corrupt */
743        assert(((dag_record_t *)packet->buffer)->type < 10);
744       
745        /* read in the rest of the packet */
746        if ((numbytes=libtrace_io_read(INPUT.file,
747                                        buffer2,
748                                        size)) != size) {
749                if (numbytes==-1) {
750                        trace_set_err(libtrace,errno, "read(%s)", libtrace->uridata);
751                        return -1;
752                }
753                trace_set_err(libtrace,EIO,"Truncated packet (wanted %d, got %d)", size, numbytes);
754                /* Failed to read the full packet?  must be EOF */
755                return -1;
756        }
757        if (((dag_record_t *)packet->buffer)->flags.rxerror == 1) {
758                packet->payload = NULL;
759        } else {
760                packet->payload = (char*)packet->buffer + erf_get_framing_length(packet);
761        }
762        return rlen;
763}
764
765static int rtclient_read(libtrace_t *libtrace, void *buffer, size_t len) {
766        int numbytes;
767
768        while(1) {
769#ifndef MSG_NOSIGNAL
770#  define MSG_NOSIGNAL 0
771#endif
772                if ((numbytes = recv(INPUT.fd,
773                                                buffer,
774                                                len,
775                                                MSG_NOSIGNAL)) == -1) {
776                        if (errno == EINTR) {
777                                /*ignore EINTR in case
778                                 *a caller is using signals
779                                 */
780                                continue;
781                        }
782                        trace_set_err(libtrace,errno,"recv(%s)",
783                                        libtrace->uridata);
784                        return -1;
785                }
786                break;
787
788        }
789        return numbytes;
790}
791
792static int rtclient_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
793        int numbytes = 0;
794        char buf[RP_BUFSIZE];
795        int read_required = 0;
796       
797        void *buffer = 0;
798
799        if (packet->buf_control == TRACE_CTRL_EXTERNAL || !packet->buffer) {
800                packet->buf_control = TRACE_CTRL_PACKET;
801                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
802        }
803
804        buffer = packet->buffer;
805        packet->header = packet->buffer;
806       
807        packet->type = RT_DATA_ERF;
808
809       
810        do {
811                libtrace_packet_status_t status;
812                int size;
813                if (tracefifo_out_available(libtrace->fifo) == 0 
814                                || read_required) {
815                        if ((numbytes = rtclient_read(
816                                        libtrace,buf,RP_BUFSIZE))<=0) {
817                                return numbytes;
818                        }
819                        tracefifo_write(libtrace->fifo,buf,numbytes);
820                        read_required = 0;
821                }
822                /* Read status byte */
823                if (tracefifo_out_read(libtrace->fifo,
824                                &status, sizeof(uint32_t)) != sizeof(uint32_t)){
825                        read_required = 1;
826                        continue;
827                }
828                tracefifo_out_update(libtrace->fifo,sizeof(uint32_t));
829                /* Read in packet size */
830                if (tracefifo_out_read(libtrace->fifo,
831                                &size, sizeof(uint32_t)) != sizeof(uint32_t)) {
832                        tracefifo_out_reset(libtrace->fifo);
833                        read_required = 1;
834                        continue;
835                }
836                tracefifo_out_update(libtrace->fifo, sizeof(uint32_t));
837               
838                if (status.type == 2 /* RT_MSG */) {
839                        /* Need to skip this packet as it is a message packet */
840                        tracefifo_out_update(libtrace->fifo, size);
841                        tracefifo_ack_update(libtrace->fifo, size + 
842                                        sizeof(uint32_t) + 
843                                        sizeof(libtrace_packet_status_t));
844                        continue;
845                }
846               
847                /* read in the full packet */
848                if ((numbytes = tracefifo_out_read(libtrace->fifo, 
849                                                buffer, size)) != size) {
850                        tracefifo_out_reset(libtrace->fifo);
851                        read_required = 1;
852                        continue;
853                }
854
855                /* got in our whole packet, so... */
856                tracefifo_out_update(libtrace->fifo,size);
857
858                tracefifo_ack_update(libtrace->fifo,size + 
859                                sizeof(uint32_t) + 
860                                sizeof(libtrace_packet_status_t));
861
862                if (((dag_record_t *)buffer)->flags.rxerror == 1) {
863                        packet->payload = NULL;
864                } else {
865                        packet->payload = (char*)packet->buffer + erf_get_framing_length(packet);
866                }
867                return numbytes;
868        } while(1);
869}
870
871static int erf_dump_packet(libtrace_out_t *libtrace,
872                dag_record_t *erfptr, int pad, void *buffer) {
873        int numbytes = 0;
874        int size;
875
876        if ((numbytes = libtrace_io_write(OUTPUT.file, erfptr, dag_record_size + pad)) != dag_record_size+pad) {
877                trace_set_err_out(libtrace,errno,
878                                "write(%s)",libtrace->uridata);
879                return -1;
880        }
881
882        size=ntohs(erfptr->rlen)-(dag_record_size+pad);
883
884        numbytes=libtrace_io_write(OUTPUT.file, buffer, size);
885        if (numbytes != size) {
886                trace_set_err_out(libtrace,errno,
887                                "write(%s)",libtrace->uridata);
888                return -1;
889        }
890
891        return numbytes + pad + dag_record_size;
892}
893
894static int erf_start_output(libtrace_out_t *libtrace)
895{
896        OUTPUT.file = trace_open_file_out(libtrace,
897                        OPTIONS.erf.level,
898                        OPTIONS.erf.fileflag);
899        if (!OUTPUT.file) {
900                return -1;
901        }
902        return 0;
903}
904
905static bool find_compatible_linktype(libtrace_out_t *libtrace,
906                                libtrace_packet_t *packet)
907{
908        /* Keep trying to simplify the packet until we can find
909         * something we can do with it */
910        do {
911                char type=libtrace_to_erf_type(trace_get_link_type(packet));
912
913                /* Success */
914                if (type != (char)-1)
915                        return true;
916
917                if (!demote_packet(packet)) {
918                        trace_set_err_out(libtrace,
919                                        TRACE_ERR_NO_CONVERSION,
920                                        "No erf type for packet (%i)",
921                                        trace_get_link_type(packet));
922                        return false;
923                }
924
925        } while(1);
926
927        return true;
928}
929               
930static int erf_write_packet(libtrace_out_t *libtrace, 
931                libtrace_packet_t *packet) 
932{
933        int numbytes = 0;
934        int pad = 0;
935        dag_record_t *dag_hdr = (dag_record_t *)packet->header;
936        void *payload = packet->payload;
937
938        assert(OUTPUT.file);
939
940        if (!packet->header) {
941                /*trace_set_err_output(libtrace, TRACE_ERR_BAD_PACKET,
942                                "Packet has no header - probably an RT packet");
943                */
944                return -1;
945        }
946       
947        pad = erf_get_padding(packet);
948
949        /* If we've had an rxerror, we have no payload to write - fix
950         * rlen to be the correct length
951         */
952        /* I Think this is bogus, we should somehow figure out
953         * a way to write out the payload even if it is gibberish -- Perry */
954        if (payload == NULL) {
955                dag_hdr->rlen = htons(dag_record_size + pad);
956        } 
957       
958        if (packet->trace->format == &erf 
959#ifdef HAVE_DAG
960                        || packet->trace->format == &dag
961#endif
962                        ) {
963                numbytes = erf_dump_packet(libtrace,
964                                (dag_record_t *)packet->header,
965                                pad,
966                                payload
967                                );
968        } else {
969                dag_record_t erfhdr;
970                /* convert format - build up a new erf header */
971                /* Timestamp */
972                erfhdr.ts = bswap_host_to_le64(trace_get_erf_timestamp(packet));
973
974                /* Flags. Can't do this */
975                memset(&erfhdr.flags,1,sizeof(erfhdr.flags));
976                if (trace_get_direction(packet)!=-1)
977                        erfhdr.flags.iface = trace_get_direction(packet);
978
979                if (!find_compatible_linktype(libtrace,packet))
980                        return -1;
981
982                payload=packet->payload;
983                pad = erf_get_padding(packet);
984
985                erfhdr.type = libtrace_to_erf_type(trace_get_link_type(packet));
986
987                /* Packet length (rlen includes format overhead) */
988                assert(trace_get_capture_length(packet)>0 
989                                && trace_get_capture_length(packet)<=65536);
990                assert(erf_get_framing_length(packet)>0 
991                                && trace_get_framing_length(packet)<=65536);
992                assert(
993                        trace_get_capture_length(packet)+erf_get_framing_length(packet)>0
994                      &&trace_get_capture_length(packet)+erf_get_framing_length(packet)<=65536);
995                erfhdr.rlen = htons(trace_get_capture_length(packet) 
996                        + erf_get_framing_length(packet));
997                /* loss counter. Can't do this */
998                erfhdr.lctr = 0;
999                /* Wire length, does not include padding! */
1000                erfhdr.wlen = htons(trace_get_wire_length(packet));
1001
1002                /* Write it out */
1003                numbytes = erf_dump_packet(libtrace,
1004                                &erfhdr,
1005                                pad,
1006                                payload);
1007        }
1008        return numbytes;
1009}
1010
1011static libtrace_linktype_t erf_get_link_type(const libtrace_packet_t *packet) {
1012        dag_record_t *erfptr = 0;
1013        erfptr = (dag_record_t *)packet->header;
1014        return erf_type_to_libtrace(erfptr->type);
1015}
1016
1017static libtrace_direction_t erf_get_direction(const libtrace_packet_t *packet) {
1018        dag_record_t *erfptr = 0;
1019        erfptr = (dag_record_t *)packet->header;
1020        return erfptr->flags.iface;
1021}
1022
1023static libtrace_direction_t erf_set_direction(libtrace_packet_t *packet, libtrace_direction_t direction) {
1024        dag_record_t *erfptr = 0;
1025        erfptr = (dag_record_t *)packet->header;
1026        erfptr->flags.iface = direction;
1027        return erfptr->flags.iface;
1028}
1029
1030static uint64_t erf_get_erf_timestamp(const libtrace_packet_t *packet) {
1031        dag_record_t *erfptr = 0;
1032        erfptr = (dag_record_t *)packet->header;
1033        return bswap_le_to_host64(erfptr->ts);
1034}
1035
1036static int erf_get_capture_length(const libtrace_packet_t *packet) {
1037        dag_record_t *erfptr = 0;
1038        int caplen;
1039        if (packet->payload == NULL)
1040                return 0; 
1041       
1042        erfptr = (dag_record_t *)packet->header;
1043        caplen = ntohs(erfptr->rlen) - erf_get_framing_length(packet);
1044        if (ntohs(erfptr->wlen) < caplen)
1045                return ntohs(erfptr->wlen);
1046
1047        return (ntohs(erfptr->rlen) - erf_get_framing_length(packet));
1048}
1049
1050static int erf_get_wire_length(const libtrace_packet_t *packet) {
1051        dag_record_t *erfptr = 0;
1052        erfptr = (dag_record_t *)packet->header;
1053        return ntohs(erfptr->wlen);
1054}
1055
1056static size_t erf_set_capture_length(libtrace_packet_t *packet, size_t size) {
1057        dag_record_t *erfptr = 0;
1058        assert(packet);
1059        if(size  > trace_get_capture_length(packet)) {
1060                /* can't make a packet larger */
1061                return trace_get_capture_length(packet);
1062        }
1063        erfptr = (dag_record_t *)packet->header;
1064        erfptr->rlen = htons(size + erf_get_framing_length(packet));
1065        return trace_get_capture_length(packet);
1066}
1067
1068static int rtclient_get_fd(const libtrace_t *libtrace) {
1069        return INPUT.fd;
1070}
1071
1072#ifdef HAVE_DAG
1073libtrace_eventobj_t trace_event_dag(libtrace_t *trace, libtrace_packet_t *packet) {
1074        libtrace_eventobj_t event = {0,0,0.0,0};
1075        int dag_fd;
1076        int data;
1077
1078        if (trace->format->get_fd) {
1079                dag_fd = trace->format->get_fd(trace);
1080        } else {
1081                dag_fd = 0;
1082        }
1083       
1084        data = dag_read(trace, DAGF_NONBLOCK);
1085
1086        if (data > 0) {
1087                event.size = trace_read_packet(trace,packet);
1088                event.type = TRACE_EVENT_PACKET;
1089                return event;
1090        }
1091        event.type = TRACE_EVENT_SLEEP;
1092        event.seconds = 0.0001;
1093        return event;
1094}
1095#endif
1096
1097#ifdef HAVE_DAG
1098static void dag_help() {
1099        printf("dag format module: $Revision$\n");
1100        printf("Supported input URIs:\n");
1101        printf("\tdag:/dev/dagn\n");
1102        printf("\n");
1103        printf("\te.g.: dag:/dev/dag0\n");
1104        printf("\n");
1105        printf("Supported output URIs:\n");
1106        printf("\tnone\n");
1107        printf("\n");
1108}
1109#endif
1110
1111static void erf_help() {
1112        printf("erf format module: $Revision$\n");
1113        printf("Supported input URIs:\n");
1114        printf("\terf:/path/to/file\t(uncompressed)\n");
1115        printf("\terf:/path/to/file.gz\t(gzip-compressed)\n");
1116        printf("\terf:-\t(stdin, either compressed or not)\n");
1117        printf("\terf:/path/to/socket\n");
1118        printf("\n");
1119        printf("\te.g.: erf:/tmp/trace\n");
1120        printf("\n");
1121        printf("Supported output URIs:\n");
1122        printf("\terf:path/to/file\t(uncompressed)\n");
1123        printf("\terf:/path/to/file.gz\t(gzip-compressed)\n");
1124        printf("\terf:-\t(stdout, either compressed or not)\n");
1125        printf("\n");
1126        printf("\te.g.: erf:/tmp/trace\n");
1127        printf("\n");
1128        printf("Supported output options:\n");
1129        printf("\t-z\tSpecify the gzip compression, ranging from 0 (uncompressed) to 9 - defaults to 1\n");
1130        printf("\n");
1131
1132       
1133}
1134
1135static void rtclient_help() {
1136        printf("rtclient format module: $Revision$\n");
1137        printf("DEPRECATED - use rt module instead\n");
1138        printf("Supported input URIs:\n");
1139        printf("\trtclient:host:port\n");
1140        printf("\n");
1141        printf("\te.g.:rtclient:localhost:3435\n");
1142        printf("\n");
1143        printf("Supported output URIs:\n");
1144        printf("\tnone\n");
1145        printf("\n");
1146}       
1147
1148static struct libtrace_format_t erf = {
1149        "erf",
1150        "$Id$",
1151        TRACE_FORMAT_ERF,
1152        erf_init_input,                 /* init_input */       
1153        NULL,                           /* config_input */
1154        erf_start_input,                /* start_input */
1155        NULL,                           /* pause_input */
1156        erf_init_output,                /* init_output */
1157        erf_config_output,              /* config_output */
1158        erf_start_output,               /* start_output */
1159        erf_fin_input,                  /* fin_input */
1160        erf_fin_output,                 /* fin_output */
1161        erf_read_packet,                /* read_packet */
1162        NULL,                           /* fin_packet */
1163        erf_write_packet,               /* write_packet */
1164        erf_get_link_type,              /* get_link_type */
1165        erf_get_direction,              /* get_direction */
1166        erf_set_direction,              /* set_direction */
1167        erf_get_erf_timestamp,          /* get_erf_timestamp */
1168        NULL,                           /* get_timeval */
1169        NULL,                           /* get_seconds */
1170        erf_seek_erf,                   /* seek_erf */
1171        NULL,                           /* seek_timeval */
1172        NULL,                           /* seek_seconds */
1173        erf_get_capture_length,         /* get_capture_length */
1174        erf_get_wire_length,            /* get_wire_length */
1175        erf_get_framing_length,         /* get_framing_length */
1176        erf_set_capture_length,         /* set_capture_length */
1177        NULL,                           /* get_fd */
1178        trace_event_trace,              /* trace_event */
1179        erf_help,                       /* help */
1180        NULL                            /* next pointer */
1181};
1182
1183#ifdef HAVE_DAG
1184static struct libtrace_format_t dag = {
1185        "dag",
1186        "$Id$",
1187        TRACE_FORMAT_ERF,
1188        dag_init_input,                 /* init_input */       
1189        dag_config_input,               /* config_input */
1190        dag_start_input,                /* start_input */
1191        dag_pause_input,                /* pause_input */
1192        NULL,                           /* init_output */
1193        NULL,                           /* config_output */
1194        NULL,                           /* start_output */
1195        dag_fin_input,                  /* fin_input */
1196        NULL,                           /* fin_output */
1197        dag_read_packet,                /* read_packet */
1198        NULL,                           /* fin_packet */
1199        NULL,                           /* write_packet */
1200        erf_get_link_type,              /* get_link_type */
1201        erf_get_direction,              /* get_direction */
1202        erf_set_direction,              /* set_direction */
1203        erf_get_erf_timestamp,          /* get_erf_timestamp */
1204        NULL,                           /* get_timeval */
1205        NULL,                           /* get_seconds */
1206        NULL,                           /* seek_erf */
1207        NULL,                           /* seek_timeval */
1208        NULL,                           /* seek_seconds */
1209        erf_get_capture_length,         /* get_capture_length */
1210        erf_get_wire_length,            /* get_wire_length */
1211        erf_get_framing_length,         /* get_framing_length */
1212        erf_set_capture_length,         /* set_capture_length */
1213        NULL,                           /* get_fd */
1214        trace_event_dag,                /* trace_event */
1215        dag_help,                       /* help */
1216        NULL                            /* next pointer */
1217};
1218#endif
1219
1220static struct libtrace_format_t rtclient = {
1221        "rtclient",
1222        "$Id$",
1223        TRACE_FORMAT_ERF,
1224        rtclient_init_input,            /* init_input */       
1225        NULL,                           /* config_input */
1226        rtclient_start_input,           /* start_input */
1227        rtclient_pause_input,           /* pause_input */
1228        NULL,                           /* init_output */
1229        NULL,                           /* config_output */
1230        NULL,                           /* start_output */
1231        rtclient_fin_input,             /* fin_input */
1232        NULL,                           /* fin_output */
1233        rtclient_read_packet,           /* read_packet */
1234        NULL,                           /* fin_packet */
1235        NULL,                           /* write_packet */
1236        erf_get_link_type,              /* get_link_type */
1237        erf_get_direction,              /* get_direction */
1238        erf_set_direction,              /* set_direction */
1239        erf_get_erf_timestamp,          /* get_erf_timestamp */
1240        NULL,                           /* get_timeval */
1241        NULL,                           /* get_seconds */
1242        NULL,                           /* seek_erf */
1243        NULL,                           /* seek_timeval */
1244        NULL,                           /* seek_seconds */
1245        erf_get_capture_length,         /* get_capture_length */
1246        erf_get_wire_length,            /* get_wire_length */
1247        erf_get_framing_length,         /* get_framing_length */
1248        erf_set_capture_length,         /* set_capture_length */
1249        rtclient_get_fd,                /* get_fd */
1250        trace_event_device,             /* trace_event */
1251        rtclient_help,                  /* help */
1252        NULL                            /* next pointer */
1253};
1254
1255void erf_constructor() {
1256        register_format(&rtclient);
1257        register_format(&erf);
1258#ifdef HAVE_DAG
1259        register_format(&dag);
1260#endif
1261}
Note: See TracBrowser for help on using the repository browser.