source: lib/format_erf.c @ 5bb5dd2

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivegetfragoffhelplibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 5bb5dd2 was 5bb5dd2, checked in by Shane Alcock <salcock@…>, 15 years ago

Fixed wag_get_capture_length to use the payload length field of the wag header, rather than using the total frame size and subtracting what we assumed to be the header size.
Fixed bug in dag_read_packet where rxerrors would cause the payload to be null but the packet's size would still include the (theoretical) payload size

  • Property mode set to 100644
File size: 30.4 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2004 The University of Waikato, Hamilton, New Zealand.
5 * Authors: Daniel Lawson
6 *          Perry Lorier
7 *         
8 * All rights reserved.
9 *
10 * This code has been developed by the University of Waikato WAND
11 * research group. For further information please see http://www.wand.net.nz/
12 *
13 * libtrace is free software; you can redistribute it and/or modify
14 * it under the terms of the GNU General Public License as published by
15 * the Free Software Foundation; either version 2 of the License, or
16 * (at your option) any later version.
17 *
18 * libtrace is distributed in the hope that it will be useful,
19 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21 * GNU General Public License for more details.
22 *
23 * You should have received a copy of the GNU General Public License
24 * along with libtrace; if not, write to the Free Software
25 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26 *
27 * $Id$
28 *
29 */
30#define _GNU_SOURCE
31
32#include "config.h"
33#include "common.h"
34#include "libtrace.h"
35#include "libtrace_int.h"
36#include "format_helper.h"
37#include "parse_cmd.h"
38
39#include <assert.h>
40#include <errno.h>
41#include <fcntl.h>
42#include <stdio.h>
43#include <string.h>
44#include <stdlib.h>
45
46#ifdef HAVE_DAG
47#include <sys/mman.h>
48#endif
49
50#ifdef WIN32
51#  include <io.h>
52#  include <share.h>
53#  define PATH_MAX _MAX_PATH
54#  define snprintf sprintf_s
55#else
56#  include <netdb.h>
57#  ifndef PATH_MAX
58#       define PATH_MAX 4096
59#  endif
60#  include <sys/ioctl.h>
61#endif
62
63
64#define COLLECTOR_PORT 3435
65
66static struct libtrace_format_t erf;
67static struct libtrace_format_t rtclient;
68#ifdef HAVE_DAG
69static struct libtrace_format_t dag;
70#endif
71
72#define DATA(x) ((struct erf_format_data_t *)x->format_data)
73#define DATAOUT(x) ((struct erf_format_data_out_t *)x->format_data)
74
75#define CONNINFO DATA(libtrace)->conn_info
76#define INPUT DATA(libtrace)->input
77#define OUTPUT DATAOUT(libtrace)->output
78#ifdef HAVE_DAG
79#define DAG DATA(libtrace)->dag
80#define DUCK DATA(libtrace)->duck
81#endif
82#define OPTIONS DATAOUT(libtrace)->options
83struct erf_format_data_t {
84        union {
85                struct {
86                        char *hostname;
87                        short port;
88                } rt;
89        } conn_info;
90       
91        union {
92                int fd;
93                libtrace_io_t *file;
94        } input;
95
96        struct {
97                enum { INDEX_UNKNOWN=0, INDEX_NONE, INDEX_EXISTS } exists;
98                libtrace_io_t *index;
99                off_t index_len;
100        } seek;
101
102#ifdef HAVE_DAG
103        struct {
104                uint32_t last_duck;     
105                uint32_t duck_freq;
106                uint32_t last_pkt;
107                libtrace_t *dummy_duck;
108        } duck;
109       
110        struct {
111                void *buf; 
112                unsigned bottom;
113                unsigned top;
114                unsigned diff;
115                unsigned curr;
116                unsigned offset;
117                unsigned int dagstream;
118        } dag;
119#endif
120};
121
122struct erf_format_data_out_t {
123        union {
124                struct {
125                        char *hostname;
126                        short port;
127                } rt;
128                char *path;
129        } conn_info;
130
131        union {
132                struct {
133                        int level;
134                        int fileflag;
135                } erf;
136               
137        } options;
138       
139        union {
140                int fd;
141                struct rtserver_t * rtserver;
142                libtrace_io_t *file;
143        } output;
144};
145
146/** Structure holding status information for a packet */
147typedef struct libtrace_packet_status {
148        uint8_t type;
149        uint8_t reserved;
150        uint16_t message;
151} libtrace_packet_status_t;
152
153typedef struct erf_index_t {
154        uint64_t timestamp;
155        uint64_t offset; 
156} erf_index_t;
157
158#ifdef HAVE_DAG
159static int dag_init_input(libtrace_t *libtrace) {
160        struct stat buf;
161
162        libtrace->format_data = (struct erf_format_data_t *)
163                malloc(sizeof(struct erf_format_data_t));
164        if (stat(libtrace->uridata, &buf) == -1) {
165                trace_set_err(libtrace,errno,"stat(%s)",libtrace->uridata);
166                return -1;
167        } 
168
169        DAG.dagstream = 0;
170       
171        if (S_ISCHR(buf.st_mode)) {
172                /* DEVICE */
173                if((INPUT.fd = dag_open(libtrace->uridata)) < 0) {
174                        trace_set_err(libtrace,errno,"Cannot open DAG %s",
175                                        libtrace->uridata);
176                        return -1;
177                }
178                if((DAG.buf = (void *)dag_mmap(INPUT.fd)) == MAP_FAILED) {
179                        trace_set_err(libtrace,errno,"Cannot mmap DAG %s",
180                                        libtrace->uridata);
181                        return -1;
182                }
183        } else {
184                trace_set_err(libtrace,errno,"Not a valid dag device: %s",
185                                libtrace->uridata);
186                return -1;
187        }
188
189        DUCK.last_duck = 0;
190        DUCK.duck_freq = 0; 
191        DUCK.last_pkt = 0;
192        DUCK.dummy_duck = NULL;
193       
194        return 0;
195}
196
197static int dag_config_input(libtrace_t *libtrace, trace_option_t option,
198                                void *data) {
199        switch(option) {
200                case TRACE_META_FREQ:
201                        DUCK.duck_freq = *(int *)data;
202                        return 0;
203                case TRACE_OPTION_SNAPLEN:
204                        /* Surely we can set this?? Fall through for now*/
205               
206                case TRACE_OPTION_PROMISC:
207                        /* DAG already operates in a promisc fashion */
208
209                case TRACE_OPTION_FILTER:
210
211                default:
212                        trace_set_err(libtrace, TRACE_ERR_UNKNOWN_OPTION,
213                                        "Unknown or unsupported option: %i",
214                                        option);
215                        return -1;
216        }
217        assert (0);
218}
219#endif
220
221/* Dag erf ether packets have a 2 byte padding before the packet
222 * so that the ip header is aligned on a 32 bit boundary.
223 */
224static int erf_get_padding(const libtrace_packet_t *packet)
225{
226        if (packet->trace->format->type==TRACE_FORMAT_ERF) {
227                dag_record_t *erfptr = (dag_record_t *)packet->header;
228                switch(erfptr->type) {
229                        case TYPE_ETH:          return 2;
230                        default:                return 0;
231                }
232        }
233        else {
234                switch(trace_get_link_type(packet)) {
235                        case TYPE_ETH:          return 2;
236                        default:                return 0;
237                }
238        }
239}
240
241static int erf_get_framing_length(const libtrace_packet_t *packet)
242{
243        return dag_record_size + erf_get_padding(packet);
244}
245
246
247static int erf_init_input(libtrace_t *libtrace) 
248{
249        libtrace->format_data = malloc(sizeof(struct erf_format_data_t));
250       
251        INPUT.file = 0;
252
253        return 0; /* success */
254}
255
256static int erf_start_input(libtrace_t *libtrace)
257{
258        if (INPUT.file)
259                return 0; /* success */
260
261        INPUT.file = trace_open_file(libtrace);
262
263        if (!INPUT.file)
264                return -1;
265
266        return 0; /* success */
267}
268
269/* Binary search through the index to find the closest point before
270 * the packet.  Consider in future having a btree index perhaps?
271 */
272static int erf_fast_seek_start(libtrace_t *libtrace,uint64_t erfts)
273{
274        size_t max_off = DATA(libtrace)->seek.index_len/sizeof(erf_index_t);
275        size_t min_off = 0;
276        off_t current;
277        erf_index_t record;
278        do {
279                current=(max_off+min_off)>>2;
280
281                libtrace_io_seek(DATA(libtrace)->seek.index,
282                                current*sizeof(record),
283                                SEEK_SET);
284                libtrace_io_read(DATA(libtrace)->seek.index,
285                                &record,sizeof(record));
286                if (record.timestamp < erfts) {
287                        min_off=current;
288                }
289                if (record.timestamp > erfts) {
290                        max_off=current;
291                }
292                if (record.timestamp == erfts)
293                        break;
294        } while(min_off<max_off);
295
296        /* If we've passed it, seek backwards.  This loop shouldn't
297         * execute more than twice.
298         */
299        do {
300                libtrace_io_seek(DATA(libtrace)->seek.index,
301                                current*sizeof(record),SEEK_SET);
302                libtrace_io_read(DATA(libtrace)->seek.index,
303                                &record,sizeof(record));
304                current--;
305        } while(record.timestamp>erfts);
306
307        /* We've found our location in the trace, now use it. */
308        libtrace_io_seek(INPUT.file,record.offset,SEEK_SET);
309
310        return 0; /* success */
311}
312
313/* There is no index.  Seek through the entire trace from the start, nice
314 * and slowly.
315 */
316static int erf_slow_seek_start(libtrace_t *libtrace,uint64_t erfts)
317{
318        if (INPUT.file) {
319                libtrace_io_close(INPUT.file);
320        }
321        INPUT.file = trace_open_file(libtrace);
322        if (!INPUT.file)
323                return -1;
324        return 0;
325}
326
327static int erf_seek_erf(libtrace_t *libtrace,uint64_t erfts)
328{
329        libtrace_packet_t *packet;
330        off_t off = 0;
331
332        if (DATA(libtrace)->seek.exists==INDEX_UNKNOWN) {
333                char buffer[PATH_MAX];
334                snprintf(buffer,sizeof(buffer),"%s.idx",libtrace->uridata);
335                DATA(libtrace)->seek.index=libtrace_io_open(buffer,"rb");
336                if (DATA(libtrace)->seek.index) {
337                        DATA(libtrace)->seek.exists=INDEX_EXISTS;
338                }
339                else {
340                        DATA(libtrace)->seek.exists=INDEX_NONE;
341                }
342        }
343
344        /* If theres an index, use it to find the nearest packet that isn't
345         * after the time we're looking for.  If there is no index we need
346         * to seek slowly through the trace from the beginning.  Sigh.
347         */
348        switch(DATA(libtrace)->seek.exists) {
349                case INDEX_EXISTS:
350                        erf_fast_seek_start(libtrace,erfts);
351                        break;
352                case INDEX_NONE:
353                        erf_slow_seek_start(libtrace,erfts);
354                        break;
355                case INDEX_UNKNOWN:
356                        assert(0);
357                        break;
358        }
359
360        /* Now seek forward looking for the correct timestamp */
361        packet=trace_create_packet();
362        do {
363                trace_read_packet(libtrace,packet);
364                if (trace_get_erf_timestamp(packet)==erfts)
365                        break;
366                off=libtrace_io_tell(INPUT.file);
367        } while(trace_get_erf_timestamp(packet)<erfts);
368
369        libtrace_io_seek(INPUT.file,off,SEEK_SET);
370
371        return 0;
372}
373
374static int rtclient_init_input(libtrace_t *libtrace) {
375        char *scan;
376        libtrace->format_data = malloc(sizeof(struct erf_format_data_t));
377
378        if (strlen(libtrace->uridata) == 0) {
379                CONNINFO.rt.hostname = 
380                        strdup("localhost");
381                CONNINFO.rt.port = 
382                        COLLECTOR_PORT;
383        } else {
384                if ((scan = strchr(libtrace->uridata,':')) == NULL) {
385                        CONNINFO.rt.hostname = 
386                                strdup(libtrace->uridata);
387                        CONNINFO.rt.port =
388                                COLLECTOR_PORT;
389                } else {
390                        CONNINFO.rt.hostname = 
391                                (char *)strndup(libtrace->uridata,
392                                                (scan - libtrace->uridata));
393                        CONNINFO.rt.port = 
394                                atoi(++scan);
395                }
396        }
397
398        return 0; /* success */
399}
400
401static int rtclient_start_input(libtrace_t *libtrace)
402{
403        struct hostent *he;
404        struct sockaddr_in remote;
405        if ((he=gethostbyname(CONNINFO.rt.hostname)) == NULL) { 
406                trace_set_err(libtrace,TRACE_ERR_INIT_FAILED,"failed to resolve %s",
407                                CONNINFO.rt.hostname);
408                return -1;
409        } 
410        if ((INPUT.fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
411                trace_set_err(libtrace,errno,"socket(AF_INET,SOCK_STREAM)");
412                return -1;
413        }
414
415        remote.sin_family = AF_INET;   
416        remote.sin_port = htons(CONNINFO.rt.port);
417        remote.sin_addr = *((struct in_addr *)he->h_addr);
418        memset(&(remote.sin_zero), 0, 8);
419
420        if (connect(INPUT.fd, (struct sockaddr *)&remote,
421                                sizeof(struct sockaddr)) == -1) {
422                trace_set_err(libtrace,errno,"connect(%s)",
423                                CONNINFO.rt.hostname);
424                return -1;
425        }
426        return 0; /* success */
427}
428
429static int rtclient_pause_input(libtrace_t *libtrace)
430{
431        close(INPUT.fd);
432        return 0; /* success */
433}
434
435static int erf_init_output(libtrace_out_t *libtrace) {
436        libtrace->format_data = malloc(sizeof(struct erf_format_data_out_t));
437
438        OPTIONS.erf.level = 0;
439        OPTIONS.erf.fileflag = O_CREAT | O_WRONLY;
440        OUTPUT.file = 0;
441
442        return 0;
443}
444
445static int erf_config_output(libtrace_out_t *libtrace, trace_option_output_t option,
446                void *value) {
447
448        switch (option) {
449                case TRACE_OPTION_OUTPUT_COMPRESS:
450                        OPTIONS.erf.level = *(int*)value;
451                        return 0;
452                case TRACE_OPTION_OUTPUT_FILEFLAGS:
453                        OPTIONS.erf.fileflag = *(int*)value;
454                        return 0;
455                default:
456                        /* Unknown option */
457                        trace_set_err_out(libtrace,TRACE_ERR_UNKNOWN_OPTION,
458                                        "Unknown option");
459                        return -1;
460        }
461}
462
463
464#ifdef HAVE_DAG
465static int dag_pause_input(libtrace_t *libtrace) {
466#ifdef DAG_VERSION_2_4
467        dag_stop(INPUT.fd);
468#else
469        if (dag_stop_stream(INPUT.fd, DAG.dagstream) < 0) {
470                trace_set_err(libtrace, errno, "Could not stop DAG stream");
471                return -1;
472        }
473        if (dag_detach_stream(INPUT.fd, DAG.dagstream) < 0) {
474                trace_set_err(libtrace, errno, "Could not detach DAG stream");
475                return -1;
476        }
477#endif
478        return 0; /* success */
479}
480
481static int dag_fin_input(libtrace_t *libtrace) {
482        /* dag pause input implicitly called to cleanup before this */
483       
484        dag_close(INPUT.fd);
485        if (DUCK.dummy_duck)
486                trace_destroy_dead(DUCK.dummy_duck);
487        free(libtrace->format_data);
488        return 0; /* success */
489}
490#endif
491
492static int rtclient_fin_input(libtrace_t *libtrace) {
493        free(CONNINFO.rt.hostname);
494        close(INPUT.fd);
495        free(libtrace->format_data);
496        return 0;
497}
498
499static int erf_fin_input(libtrace_t *libtrace) {
500        if (INPUT.file)
501                libtrace_io_close(INPUT.file);
502        free(libtrace->format_data);
503        return 0;
504}
505
506static int erf_fin_output(libtrace_out_t *libtrace) {
507        libtrace_io_close(OUTPUT.file);
508        free(libtrace->format_data);
509        return 0;
510}
511 
512#ifdef HAVE_DAG
513#ifdef DAG_VERSION_2_4
514static int dag_get_duckinfo(libtrace_t *libtrace, 
515                                libtrace_packet_t *packet) {
516        dag_inf lt_dag_inf;
517       
518        if (packet->buf_control == TRACE_CTRL_EXTERNAL || 
519                        !packet->buffer) {
520                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
521                packet->buf_control = TRACE_CTRL_PACKET;
522                if (!packet->buffer) {
523                        trace_set_err(libtrace, errno,
524                                        "Cannot allocate packet buffer");
525                        return -1;
526                }
527        }
528       
529        packet->header = 0;
530        packet->payload = packet->buffer;
531       
532        if ((ioctl(INPUT.fd, DAG_IOINF, &lt_dag_inf) < 0)) {
533                trace_set_err(libtrace, errno,
534                                "Error using DAG_IOINF");
535                return -1;
536        }
537        if (!IsDUCK(&lt_dag_inf)) {
538                printf("WARNING: %s does not have modern clock support - No DUCK information will be gathered\n", libtrace->uridata);
539                return 0;
540        }
541
542        if ((ioctl(INPUT.fd, DAG_IOGETDUCK, (duck_inf *)packet->payload) 
543                                < 0)) {
544                trace_set_err(libtrace, errno, "Error using DAG_IOGETDUCK");
545                return -1;
546        }
547
548        packet->type = RT_DUCK_2_4;
549        packet->size = sizeof(duck_inf);
550        if (!DUCK.dummy_duck) 
551                DUCK.dummy_duck = trace_create_dead("duck:dummy");
552        packet->trace = DUCK.dummy_duck;
553        return packet->size;
554}       
555#else
556static int dag_get_duckinfo(libtrace_t *libtrace, 
557                                libtrace_packet_t *packet) {
558        daginf_t lt_dag_inf;
559       
560        if (packet->buf_control == TRACE_CTRL_EXTERNAL || 
561                        !packet->buffer) {
562                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
563                packet->buf_control = TRACE_CTRL_PACKET;
564                if (!packet->buffer) {
565                        trace_set_err(libtrace, errno,
566                                        "Cannot allocate packet buffer");
567                        return -1;
568                }
569        }
570       
571        packet->header = 0;
572        packet->payload = packet->buffer;
573       
574        /* No need to check if we can get DUCK or not - we're modern
575         * enough */
576        if ((ioctl(INPUT.fd, DAGIOCDUCK, (duckinf_t *)packet->payload) 
577                                < 0)) {
578                trace_set_err(libtrace, errno, "Error using DAGIOCDUCK");
579                return -1;
580        }
581
582        packet->type = RT_DUCK_2_5;
583        packet->size = sizeof(duckinf_t);
584        if (!DUCK.dummy_duck) 
585                DUCK.dummy_duck = trace_create_dead("rt:localhost:3434");
586        packet->trace = DUCK.dummy_duck;       
587        return packet->size;
588}       
589#endif
590
591static int dag_read(libtrace_t *libtrace, int block_flag) {
592
593        if (DAG.diff != 0) 
594                return DAG.diff;
595
596        DAG.bottom = DAG.top;
597
598        DAG.top = dag_offset(
599                        INPUT.fd,
600                        &(DAG.bottom),
601                        block_flag);
602
603        DAG.diff = DAG.top - DAG.bottom;
604
605        DAG.offset = 0;
606        return DAG.diff;
607}
608
609/* FIXME: dag_read_packet shouldn't update the pointers, dag_fin_packet
610 * should do that.
611 */
612static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
613        int numbytes;
614        int size;
615        struct timeval tv;
616        dag_record_t *erfptr;
617
618        if (DUCK.last_pkt - DUCK.last_duck > DUCK.duck_freq && 
619                        DUCK.duck_freq != 0) {
620                size = dag_get_duckinfo(libtrace, packet);
621                DUCK.last_duck = DUCK.last_pkt;
622                if (size != 0) {
623                        return size;
624                }
625                /* No DUCK support, so don't waste our time anymore */
626                DUCK.duck_freq = 0;
627        }
628       
629        if (packet->buf_control == TRACE_CTRL_PACKET) {
630                packet->buf_control = TRACE_CTRL_EXTERNAL;
631                free(packet->buffer);
632                packet->buffer = 0;
633        }
634       
635        packet->type = RT_DATA_ERF;
636       
637        if ((numbytes = dag_read(libtrace,0)) < 0) 
638                return numbytes;
639        assert(numbytes>0);
640
641        /*DAG always gives us whole packets */
642        erfptr = (dag_record_t *) ((char *)DAG.buf + 
643                        (DAG.bottom + DAG.offset));
644        size = ntohs(erfptr->rlen);
645
646        assert( size >= dag_record_size );
647        assert( size < LIBTRACE_PACKET_BUFSIZE);
648       
649        packet->buffer = erfptr;
650        packet->header = erfptr;
651        if (((dag_record_t *)packet->buffer)->flags.rxerror == 1) {
652                packet->payload = NULL;
653        } else {
654                packet->payload = (char*)packet->buffer
655                        + erf_get_framing_length(packet);
656        }
657
658        DAG.offset += size;
659        DAG.diff -= size;
660
661        if (packet->payload != NULL)
662                packet->size = size;
663        else 
664                packet->size = erf_get_framing_length(packet);
665       
666        tv = trace_get_timeval(packet);
667        DUCK.last_pkt = tv.tv_sec;
668       
669        return (packet->size);
670}
671
672static int dag_start_input(libtrace_t *libtrace) {
673#ifdef DAG_VERSION_2_4
674        if(dag_start(INPUT.fd) < 0) {
675                trace_set_err(libtrace,errno,"Cannot start DAG %s",
676                                libtrace->uridata);
677                return -1;
678        }
679#else
680        if (dag_attach_stream(INPUT.fd, DAG.dagstream, 0, 0) < 0) {
681                trace_set_err(libtrace, errno, "Cannot attach DAG stream");
682                return -1;
683        }
684        if (dag_start_stream(INPUT.fd, DAG.dagstream) < 0) {
685                trace_set_err(libtrace, errno, "Cannot start DAG stream");
686                return -1;
687        }
688#endif
689        /* dags appear to have a bug where if you call dag_start after
690         * calling dag_stop, and at least one packet has arrived, bad things
691         * happen.  flush the memory hole
692         */
693        while(dag_read(libtrace,1)!=0)
694                DAG.diff=0;
695        return 0;
696}
697#endif
698
699static int erf_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
700        int numbytes;
701        int size;
702        void *buffer2 = packet->buffer;
703        int rlen;
704
705        if (!packet->buffer || packet->buf_control == TRACE_CTRL_EXTERNAL) {
706                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
707                packet->buf_control = TRACE_CTRL_PACKET;
708                if (!packet->buffer) {
709                        trace_set_err(libtrace, errno, 
710                                        "Cannot allocate memory");
711                        return -1;
712                }
713        }
714
715       
716       
717        packet->header = packet->buffer;
718        packet->type = RT_DATA_ERF;
719
720        if ((numbytes=libtrace_io_read(INPUT.file,
721                                        packet->buffer,
722                                        dag_record_size)) == -1) {
723                trace_set_err(libtrace,errno,"read(%s)",
724                                libtrace->uridata);
725                return -1;
726        }
727        if (numbytes == 0) {
728                return 0;
729        }
730
731        rlen = ntohs(((dag_record_t *)packet->buffer)->rlen);
732        buffer2 = (char*)packet->buffer + dag_record_size;
733        size = rlen - dag_record_size;
734
735        assert(size < LIBTRACE_PACKET_BUFSIZE && size >= dag_record_size);
736
737        /* Unknown/corrupt */
738        assert(((dag_record_t *)packet->buffer)->type < 10);
739       
740        /* read in the rest of the packet */
741        if ((numbytes=libtrace_io_read(INPUT.file,
742                                        buffer2,
743                                        size)) != size) {
744                if (numbytes==-1) {
745                        trace_set_err(libtrace,errno, "read(%s)", libtrace->uridata);
746                        return -1;
747                }
748                trace_set_err(libtrace,EIO,"Truncated packet (wanted %d, got %d)", size, numbytes);
749                /* Failed to read the full packet?  must be EOF */
750                return -1;
751        }
752        if (((dag_record_t *)packet->buffer)->flags.rxerror == 1) {
753                packet->payload = NULL;
754        } else {
755                packet->payload = (char*)packet->buffer + erf_get_framing_length(packet);
756        }
757        return rlen;
758}
759
760static int rtclient_read(libtrace_t *libtrace, void *buffer, size_t len) {
761        int numbytes;
762
763        while(1) {
764#ifndef MSG_NOSIGNAL
765#  define MSG_NOSIGNAL 0
766#endif
767                if ((numbytes = recv(INPUT.fd,
768                                                buffer,
769                                                len,
770                                                MSG_NOSIGNAL)) == -1) {
771                        if (errno == EINTR) {
772                                /*ignore EINTR in case
773                                 *a caller is using signals
774                                 */
775                                continue;
776                        }
777                        trace_set_err(libtrace,errno,"recv(%s)",
778                                        libtrace->uridata);
779                        return -1;
780                }
781                break;
782
783        }
784        return numbytes;
785}
786
787static int rtclient_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
788        int numbytes = 0;
789        char buf[RP_BUFSIZE];
790        int read_required = 0;
791       
792        void *buffer = 0;
793
794        if (packet->buf_control == TRACE_CTRL_EXTERNAL || !packet->buffer) {
795                packet->buf_control = TRACE_CTRL_PACKET;
796                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
797        }
798
799        buffer = packet->buffer;
800        packet->header = packet->buffer;
801       
802        packet->type = RT_DATA_ERF;
803
804       
805        do {
806                libtrace_packet_status_t status;
807                int size;
808                if (tracefifo_out_available(libtrace->fifo) == 0 
809                                || read_required) {
810                        if ((numbytes = rtclient_read(
811                                        libtrace,buf,RP_BUFSIZE))<=0) {
812                                return numbytes;
813                        }
814                        tracefifo_write(libtrace->fifo,buf,numbytes);
815                        read_required = 0;
816                }
817                /* Read status byte */
818                if (tracefifo_out_read(libtrace->fifo,
819                                &status, sizeof(uint32_t)) == 0) {
820                        read_required = 1;
821                        continue;
822                }
823                tracefifo_out_update(libtrace->fifo,sizeof(uint32_t));
824                /* Read in packet size */
825                if (tracefifo_out_read(libtrace->fifo,
826                                &size, sizeof(uint32_t)) == 0) {
827                        tracefifo_out_reset(libtrace->fifo);
828                        read_required = 1;
829                        continue;
830                }
831                tracefifo_out_update(libtrace->fifo, sizeof(uint32_t));
832               
833                if (status.type == 2 /* RT_MSG */) {
834                        /* Need to skip this packet as it is a message packet */
835                        tracefifo_out_update(libtrace->fifo, size);
836                        tracefifo_ack_update(libtrace->fifo, size + 
837                                        sizeof(uint32_t) + 
838                                        sizeof(libtrace_packet_status_t));
839                        continue;
840                }
841               
842                /* read in the full packet */
843                if ((numbytes = tracefifo_out_read(libtrace->fifo, 
844                                                buffer, size)) == 0) {
845                        tracefifo_out_reset(libtrace->fifo);
846                        read_required = 1;
847                        continue;
848                }
849
850                /* got in our whole packet, so... */
851                tracefifo_out_update(libtrace->fifo,size);
852
853                tracefifo_ack_update(libtrace->fifo,size + 
854                                sizeof(uint32_t) + 
855                                sizeof(libtrace_packet_status_t));
856
857                if (((dag_record_t *)buffer)->flags.rxerror == 1) {
858                        packet->payload = NULL;
859                } else {
860                        packet->payload = (char*)packet->buffer + erf_get_framing_length(packet);
861                }
862                return numbytes;
863        } while(1);
864}
865
866static int erf_dump_packet(libtrace_out_t *libtrace,
867                dag_record_t *erfptr, int pad, void *buffer) {
868        int numbytes = 0;
869        int size;
870
871        if ((numbytes = libtrace_io_write(OUTPUT.file, erfptr, dag_record_size + pad)) != dag_record_size+pad) {
872                trace_set_err_out(libtrace,errno,
873                                "write(%s)",libtrace->uridata);
874                return -1;
875        }
876
877        size=ntohs(erfptr->rlen)-(dag_record_size+pad);
878
879        numbytes=libtrace_io_write(OUTPUT.file, buffer, size);
880        if (numbytes != size) {
881                trace_set_err_out(libtrace,errno,
882                                "write(%s)",libtrace->uridata);
883                return -1;
884        }
885
886        return numbytes + pad + dag_record_size;
887}
888
889static int erf_start_output(libtrace_out_t *libtrace)
890{
891        OUTPUT.file = trace_open_file_out(libtrace,
892                        OPTIONS.erf.level,
893                        OPTIONS.erf.fileflag);
894        if (!OUTPUT.file) {
895                return -1;
896        }
897        return 0;
898}
899               
900static int erf_write_packet(libtrace_out_t *libtrace, 
901                const libtrace_packet_t *packet) 
902{
903        int numbytes = 0;
904        int pad = 0;
905        dag_record_t *dag_hdr = (dag_record_t *)packet->header;
906        void *payload = packet->payload;
907
908        assert(OUTPUT.file);
909
910        if (!packet->header) {
911                /*trace_set_err_output(libtrace, TRACE_ERR_BAD_PACKET,
912                                "Packet has no header - probably an RT packet");
913                */
914                return -1;
915        }
916       
917        pad = erf_get_padding(packet);
918
919        /* If we've had an rxerror, we have no payload to write - fix
920         * rlen to be the correct length
921         */
922        /* I Think this is bogus -- Perry */
923        if (payload == NULL) {
924                dag_hdr->rlen = htons(dag_record_size + pad);
925        } 
926       
927        if (packet->trace->format == &erf 
928#ifdef HAVE_DAG
929                        || packet->trace->format == &dag
930#endif
931                        ) {
932                numbytes = erf_dump_packet(libtrace,
933                                (dag_record_t *)packet->header,
934                                pad,
935                                payload
936                                );
937        } else {
938                dag_record_t erfhdr;
939                int type;
940                /* convert format - build up a new erf header */
941                /* Timestamp */
942                erfhdr.ts = trace_get_erf_timestamp(packet);
943                /* Keep trying to simplify the packet until we can find
944                 * something we can do with it */
945                do {
946                        type=libtrace_to_erf_type(trace_get_link_type(packet));
947                } while(type==(char)-1 && demote_packet((libtrace_packet_t *)packet));
948                /* We just don't support this link type sorry */
949                if (type==(char)-1) {
950                        trace_set_err_out(libtrace,
951                                        TRACE_ERR_NO_CONVERSION,
952                                        "No erf type for packet");
953                        return -1;
954                }
955                erfhdr.type = type;
956                /* Flags. Can't do this */
957                memset(&erfhdr.flags,1,sizeof(erfhdr.flags));
958                if (trace_get_direction(packet)!=-1)
959                        erfhdr.flags.iface = trace_get_direction(packet);
960
961                /* Packet length (rlen includes format overhead) */
962                assert(trace_get_capture_length(packet)>0 
963                                && trace_get_capture_length(packet)<=65536);
964                assert(erf_get_framing_length(packet)>0 
965                                && trace_get_framing_length(packet)<=65536);
966                assert(
967                        trace_get_capture_length(packet)+erf_get_framing_length(packet)>0
968                      &&trace_get_capture_length(packet)+erf_get_framing_length(packet)<=65536);
969                erfhdr.rlen = htons(trace_get_capture_length(packet) 
970                        + erf_get_framing_length(packet));
971                /* loss counter. Can't do this */
972                erfhdr.lctr = 0;
973                /* Wire length, may contain the padding */
974                erfhdr.wlen = htons(trace_get_wire_length(packet)+pad);
975
976                /* Write it out */
977                numbytes = erf_dump_packet(libtrace,
978                                &erfhdr,
979                                pad,
980                                payload);
981        }
982        return numbytes;
983}
984
985static libtrace_linktype_t erf_get_link_type(const libtrace_packet_t *packet) {
986        dag_record_t *erfptr = 0;
987        erfptr = (dag_record_t *)packet->header;
988        return erf_type_to_libtrace(erfptr->type);
989}
990
991static libtrace_direction_t erf_get_direction(const libtrace_packet_t *packet) {
992        dag_record_t *erfptr = 0;
993        erfptr = (dag_record_t *)packet->header;
994        return erfptr->flags.iface;
995}
996
997static libtrace_direction_t erf_set_direction(libtrace_packet_t *packet, libtrace_direction_t direction) {
998        dag_record_t *erfptr = 0;
999        erfptr = (dag_record_t *)packet->header;
1000        erfptr->flags.iface = direction;
1001        return erfptr->flags.iface;
1002}
1003
1004static uint64_t erf_get_erf_timestamp(const libtrace_packet_t *packet) {
1005        dag_record_t *erfptr = 0;
1006        erfptr = (dag_record_t *)packet->header;
1007        return erfptr->ts;
1008}
1009
1010static int erf_get_capture_length(const libtrace_packet_t *packet) {
1011        dag_record_t *erfptr = 0;
1012        erfptr = (dag_record_t *)packet->header;
1013        return (ntohs(erfptr->rlen) - erf_get_framing_length(packet));
1014}
1015
1016static int erf_get_wire_length(const libtrace_packet_t *packet) {
1017        dag_record_t *erfptr = 0;
1018        erfptr = (dag_record_t *)packet->header;
1019        return ntohs(erfptr->wlen) - erf_get_padding(packet);
1020}
1021
1022static size_t erf_set_capture_length(libtrace_packet_t *packet, size_t size) {
1023        dag_record_t *erfptr = 0;
1024        assert(packet);
1025        if(size  > trace_get_capture_length(packet)) {
1026                /* can't make a packet larger */
1027                return trace_get_capture_length(packet);
1028        }
1029        erfptr = (dag_record_t *)packet->header;
1030        erfptr->rlen = htons(size + erf_get_framing_length(packet));
1031        return trace_get_capture_length(packet);
1032}
1033
1034static int rtclient_get_fd(const libtrace_t *libtrace) {
1035        return INPUT.fd;
1036}
1037
1038#ifdef HAVE_DAG
1039libtrace_eventobj_t trace_event_dag(libtrace_t *trace, libtrace_packet_t *packet) {
1040        libtrace_eventobj_t event = {0,0,0.0,0};
1041        int dag_fd;
1042        int data;
1043
1044        if (trace->format->get_fd) {
1045                dag_fd = trace->format->get_fd(trace);
1046        } else {
1047                dag_fd = 0;
1048        }
1049       
1050        data = dag_read(trace, DAGF_NONBLOCK);
1051
1052        if (data > 0) {
1053                event.size = trace_read_packet(trace,packet);
1054                event.type = TRACE_EVENT_PACKET;
1055                return event;
1056        }
1057        event.type = TRACE_EVENT_SLEEP;
1058        event.seconds = 0.0001;
1059        return event;
1060}
1061#endif
1062
1063#ifdef HAVE_DAG
1064static void dag_help() {
1065        printf("dag format module: $Revision$\n");
1066        printf("Supported input URIs:\n");
1067        printf("\tdag:/dev/dagn\n");
1068        printf("\n");
1069        printf("\te.g.: dag:/dev/dag0\n");
1070        printf("\n");
1071        printf("Supported output URIs:\n");
1072        printf("\tnone\n");
1073        printf("\n");
1074}
1075#endif
1076
1077static void erf_help() {
1078        printf("erf format module: $Revision$\n");
1079        printf("Supported input URIs:\n");
1080        printf("\terf:/path/to/file\t(uncompressed)\n");
1081        printf("\terf:/path/to/file.gz\t(gzip-compressed)\n");
1082        printf("\terf:-\t(stdin, either compressed or not)\n");
1083        printf("\terf:/path/to/socket\n");
1084        printf("\n");
1085        printf("\te.g.: erf:/tmp/trace\n");
1086        printf("\n");
1087        printf("Supported output URIs:\n");
1088        printf("\terf:path/to/file\t(uncompressed)\n");
1089        printf("\terf:/path/to/file.gz\t(gzip-compressed)\n");
1090        printf("\terf:-\t(stdout, either compressed or not)\n");
1091        printf("\n");
1092        printf("\te.g.: erf:/tmp/trace\n");
1093        printf("\n");
1094        printf("Supported output options:\n");
1095        printf("\t-z\tSpecify the gzip compression, ranging from 0 (uncompressed) to 9 - defaults to 1\n");
1096        printf("\n");
1097
1098       
1099}
1100
1101static void rtclient_help() {
1102        printf("rtclient format module: $Revision$\n");
1103        printf("DEPRECATED - use rt module instead\n");
1104        printf("Supported input URIs:\n");
1105        printf("\trtclient:host:port\n");
1106        printf("\n");
1107        printf("\te.g.:rtclient:localhost:3435\n");
1108        printf("\n");
1109        printf("Supported output URIs:\n");
1110        printf("\tnone\n");
1111        printf("\n");
1112}       
1113
1114static struct libtrace_format_t erf = {
1115        "erf",
1116        "$Id$",
1117        TRACE_FORMAT_ERF,
1118        erf_init_input,                 /* init_input */       
1119        NULL,                           /* config_input */
1120        erf_start_input,                /* start_input */
1121        NULL,                           /* pause_input */
1122        erf_init_output,                /* init_output */
1123        erf_config_output,              /* config_output */
1124        erf_start_output,               /* start_output */
1125        erf_fin_input,                  /* fin_input */
1126        erf_fin_output,                 /* fin_output */
1127        erf_read_packet,                /* read_packet */
1128        NULL,                           /* fin_packet */
1129        erf_write_packet,               /* write_packet */
1130        erf_get_link_type,              /* get_link_type */
1131        erf_get_direction,              /* get_direction */
1132        erf_set_direction,              /* set_direction */
1133        erf_get_erf_timestamp,          /* get_erf_timestamp */
1134        NULL,                           /* get_timeval */
1135        NULL,                           /* get_seconds */
1136        erf_seek_erf,                   /* seek_erf */
1137        NULL,                           /* seek_timeval */
1138        NULL,                           /* seek_seconds */
1139        erf_get_capture_length,         /* get_capture_length */
1140        erf_get_wire_length,            /* get_wire_length */
1141        erf_get_framing_length,         /* get_framing_length */
1142        erf_set_capture_length,         /* set_capture_length */
1143        NULL,                           /* get_fd */
1144        trace_event_trace,              /* trace_event */
1145        erf_help,                       /* help */
1146        NULL                            /* next pointer */
1147};
1148
1149#ifdef HAVE_DAG
1150static struct libtrace_format_t dag = {
1151        "dag",
1152        "$Id$",
1153        TRACE_FORMAT_ERF,
1154        dag_init_input,                 /* init_input */       
1155        dag_config_input,               /* config_input */
1156        dag_start_input,                /* start_input */
1157        dag_pause_input,                /* pause_input */
1158        NULL,                           /* init_output */
1159        NULL,                           /* config_output */
1160        NULL,                           /* start_output */
1161        dag_fin_input,                  /* fin_input */
1162        NULL,                           /* fin_output */
1163        dag_read_packet,                /* read_packet */
1164        NULL,                           /* fin_packet */
1165        NULL,                           /* write_packet */
1166        erf_get_link_type,              /* get_link_type */
1167        erf_get_direction,              /* get_direction */
1168        erf_set_direction,              /* set_direction */
1169        erf_get_erf_timestamp,          /* get_erf_timestamp */
1170        NULL,                           /* get_timeval */
1171        NULL,                           /* get_seconds */
1172        NULL,                           /* seek_erf */
1173        NULL,                           /* seek_timeval */
1174        NULL,                           /* seek_seconds */
1175        erf_get_capture_length,         /* get_capture_length */
1176        erf_get_wire_length,            /* get_wire_length */
1177        erf_get_framing_length,         /* get_framing_length */
1178        erf_set_capture_length,         /* set_capture_length */
1179        NULL,                           /* get_fd */
1180        trace_event_dag,                /* trace_event */
1181        dag_help,                       /* help */
1182        NULL                            /* next pointer */
1183};
1184#endif
1185
1186static struct libtrace_format_t rtclient = {
1187        "rtclient",
1188        "$Id$",
1189        TRACE_FORMAT_ERF,
1190        rtclient_init_input,            /* init_input */       
1191        NULL,                           /* config_input */
1192        rtclient_start_input,           /* start_input */
1193        rtclient_pause_input,           /* pause_input */
1194        NULL,                           /* init_output */
1195        NULL,                           /* config_output */
1196        NULL,                           /* start_output */
1197        rtclient_fin_input,             /* fin_input */
1198        NULL,                           /* fin_output */
1199        rtclient_read_packet,           /* read_packet */
1200        NULL,                           /* fin_packet */
1201        NULL,                           /* write_packet */
1202        erf_get_link_type,              /* get_link_type */
1203        erf_get_direction,              /* get_direction */
1204        erf_set_direction,              /* set_direction */
1205        erf_get_erf_timestamp,          /* get_erf_timestamp */
1206        NULL,                           /* get_timeval */
1207        NULL,                           /* get_seconds */
1208        NULL,                           /* seek_erf */
1209        NULL,                           /* seek_timeval */
1210        NULL,                           /* seek_seconds */
1211        erf_get_capture_length,         /* get_capture_length */
1212        erf_get_wire_length,            /* get_wire_length */
1213        erf_get_framing_length,         /* get_framing_length */
1214        erf_set_capture_length,         /* set_capture_length */
1215        rtclient_get_fd,                /* get_fd */
1216        trace_event_device,             /* trace_event */
1217        rtclient_help,                  /* help */
1218        NULL                            /* next pointer */
1219};
1220
1221void CONSTRUCTOR erf_constructor() {
1222        register_format(&rtclient);
1223        register_format(&erf);
1224#ifdef HAVE_DAG
1225        register_format(&dag);
1226#endif
1227}
Note: See TracBrowser for help on using the repository browser.