source: lib/format_erf.c @ 5b4404a

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivegetfragoffhelplibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 5b4404a was 5b4404a, checked in by Perry Lorier <perry@…>, 15 years ago

Use binary mode for windows
At EOF return EOF not a bogus error.

  • Property mode set to 100644
File size: 28.6 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2004 The University of Waikato, Hamilton, New Zealand.
5 * Authors: Daniel Lawson
6 *          Perry Lorier
7 *         
8 * All rights reserved.
9 *
10 * This code has been developed by the University of Waikato WAND
11 * research group. For further information please see http://www.wand.net.nz/
12 *
13 * libtrace is free software; you can redistribute it and/or modify
14 * it under the terms of the GNU General Public License as published by
15 * the Free Software Foundation; either version 2 of the License, or
16 * (at your option) any later version.
17 *
18 * libtrace is distributed in the hope that it will be useful,
19 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21 * GNU General Public License for more details.
22 *
23 * You should have received a copy of the GNU General Public License
24 * along with libtrace; if not, write to the Free Software
25 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26 *
27 * $Id$
28 *
29 */
30#define _GNU_SOURCE
31
32#include "config.h"
33#include "common.h"
34#include "libtrace.h"
35#include "libtrace_int.h"
36#include "format_helper.h"
37#include "parse_cmd.h"
38
39#include <assert.h>
40#include <errno.h>
41#include <fcntl.h>
42#include <stdio.h>
43#include <string.h>
44#include <stdlib.h>
45
46#if HAVE_DAG
47#include <sys/mman.h>
48#endif
49
50#ifdef WIN32
51#  include <io.h>
52#  include <share.h>
53#  define PATH_MAX _MAX_PATH
54#  define snprintf sprintf_s
55#else
56#  include <netdb.h>
57#  ifndef PATH_MAX
58#       define PATH_MAX 4096
59#  endif
60#  include <sys/ioctl.h>
61#endif
62
63
64#define COLLECTOR_PORT 3435
65
66static struct libtrace_format_t erf;
67static struct libtrace_format_t rtclient;
68#if HAVE_DAG
69static struct libtrace_format_t dag;
70#endif
71
72#define DATA(x) ((struct erf_format_data_t *)x->format_data)
73#define DATAOUT(x) ((struct erf_format_data_out_t *)x->format_data)
74
75#define CONNINFO DATA(libtrace)->conn_info
76#define INPUT DATA(libtrace)->input
77#define OUTPUT DATAOUT(libtrace)->output
78#if HAVE_DAG
79#define DAG DATA(libtrace)->dag
80#define DUCK DATA(libtrace)->duck
81#endif
82#define OPTIONS DATAOUT(libtrace)->options
83struct erf_format_data_t {
84        union {
85                struct {
86                        char *hostname;
87                        short port;
88                } rt;
89        } conn_info;
90       
91        union {
92                int fd;
93                libtrace_io_t *file;
94        } input;
95
96        struct {
97                enum { INDEX_UNKNOWN=0, INDEX_NONE, INDEX_EXISTS } exists;
98                libtrace_io_t *index;
99                off_t index_len;
100        } seek;
101
102#if HAVE_DAG
103        struct {
104                uint32_t last_duck;     
105                uint32_t duck_freq;
106                uint32_t last_pkt;
107                libtrace_t *dummy_duck;
108        } duck;
109       
110        struct {
111                void *buf; 
112                unsigned bottom;
113                unsigned top;
114                unsigned diff;
115                unsigned curr;
116                unsigned offset;
117        } dag;
118#endif
119};
120
121struct erf_format_data_out_t {
122        union {
123                struct {
124                        char *hostname;
125                        short port;
126                } rt;
127                char *path;
128        } conn_info;
129
130        union {
131                struct {
132                        int level;
133                        int fileflag;
134                } erf;
135               
136        } options;
137       
138        union {
139                int fd;
140                struct rtserver_t * rtserver;
141                libtrace_io_t *file;
142        } output;
143};
144
145/** Structure holding status information for a packet */
146typedef struct libtrace_packet_status {
147        uint8_t type;
148        uint8_t reserved;
149        uint16_t message;
150} libtrace_packet_status_t;
151
152typedef struct erf_index_t {
153        uint64_t timestamp;
154        uint64_t offset; 
155} erf_index_t;
156
157#ifdef HAVE_DAG
158static int dag_init_input(libtrace_t *libtrace) {
159        struct stat buf;
160
161        libtrace->format_data = (struct erf_format_data_t *)
162                malloc(sizeof(struct erf_format_data_t));
163        if (stat(libtrace->uridata, &buf) == -1) {
164                trace_set_err(libtrace,errno,"stat(%s)",libtrace->uridata);
165                return -1;
166        } 
167        if (S_ISCHR(buf.st_mode)) {
168                /* DEVICE */
169                if((INPUT.fd = dag_open(libtrace->uridata)) < 0) {
170                        trace_set_err(libtrace,errno,"Cannot open DAG %s",
171                                        libtrace->uridata);
172                        return -1;
173                }
174                if((DAG.buf = (void *)dag_mmap(INPUT.fd)) == MAP_FAILED) {
175                        trace_set_err(libtrace,errno,"Cannot mmap DAG %s",
176                                        libtrace->uridata);
177                        return -1;
178                }
179        } else {
180                trace_set_err(libtrace,errno,"Not a valid dag device: %s",
181                                libtrace->uridata);
182                return -1;
183        }
184
185        DUCK.last_duck = 0;
186        DUCK.duck_freq = 0; 
187        DUCK.last_pkt = 0;
188        DUCK.dummy_duck = NULL;
189       
190        return 0;
191}
192
193static int dag_config_input(libtrace_t *libtrace, trace_option_t option,
194                                void *data) {
195        switch(option) {
196                case TRACE_META_FREQ:
197                        DUCK.duck_freq = *(int *)data;
198                        return 0;
199                case TRACE_OPTION_SNAPLEN:
200                        /* Surely we can set this?? Fall through for now*/
201               
202                case TRACE_OPTION_PROMISC:
203                        /* DAG already operates in a promisc fashion */
204
205                case TRACE_OPTION_FILTER:
206
207                default:
208                        trace_set_err(libtrace, TRACE_ERR_UNKNOWN_OPTION,
209                                        "Unknown or unsupported option: %i",
210                                        option);
211                        return -1;
212        }
213        assert (0);
214}
215#endif
216
217/* Dag erf ether packets have a 2 byte padding before the packet
218 * so that the ip header is aligned on a 32 bit boundary.
219 */
220static int erf_get_padding(const libtrace_packet_t *packet)
221{
222        dag_record_t *erfptr = (dag_record_t *)packet->header;
223        switch(erfptr->type) {
224                case TYPE_ETH:          return 2;
225                default:                return 0;
226        }
227}
228
229static int erf_get_framing_length(const libtrace_packet_t *packet)
230{
231        return dag_record_size + erf_get_padding(packet);
232}
233
234
235static int erf_init_input(libtrace_t *libtrace) 
236{
237        libtrace->format_data = malloc(sizeof(struct erf_format_data_t));
238       
239        INPUT.file = 0;
240
241        return 0; /* success */
242}
243
244static int erf_start_input(libtrace_t *libtrace)
245{
246        if (INPUT.file)
247                return 0; /* success */
248
249        INPUT.file = trace_open_file(libtrace);
250
251        if (!INPUT.file)
252                return -1;
253
254        return 0; /* success */
255}
256
257/* Binary search through the index to find the closest point before
258 * the packet.  Consider in future having a btree index perhaps?
259 */
260static int erf_fast_seek_start(libtrace_t *libtrace,uint64_t erfts)
261{
262        size_t max_off = DATA(libtrace)->seek.index_len/sizeof(erf_index_t);
263        size_t min_off = 0;
264        off_t current;
265        erf_index_t record;
266        do {
267                current=(max_off+min_off)>>2;
268
269                libtrace_io_seek(DATA(libtrace)->seek.index,
270                                current*sizeof(record),
271                                SEEK_SET);
272                libtrace_io_read(DATA(libtrace)->seek.index,
273                                &record,sizeof(record));
274                if (record.timestamp < erfts) {
275                        min_off=current;
276                }
277                if (record.timestamp > erfts) {
278                        max_off=current;
279                }
280                if (record.timestamp == erfts)
281                        break;
282        } while(min_off<max_off);
283
284        /* If we've passed it, seek backwards.  This loop shouldn't
285         * execute more than twice.
286         */
287        do {
288                libtrace_io_seek(DATA(libtrace)->seek.index,
289                                current*sizeof(record),SEEK_SET);
290                libtrace_io_read(DATA(libtrace)->seek.index,
291                                &record,sizeof(record));
292                current--;
293        } while(record.timestamp>erfts);
294
295        /* We've found our location in the trace, now use it. */
296        libtrace_io_seek(INPUT.file,record.offset,SEEK_SET);
297
298        return 0; /* success */
299}
300
301/* There is no index.  Seek through the entire trace from the start, nice
302 * and slowly.
303 */
304static int erf_slow_seek_start(libtrace_t *libtrace,uint64_t erfts)
305{
306        if (INPUT.file) {
307                libtrace_io_close(INPUT.file);
308        }
309        INPUT.file = trace_open_file(libtrace);
310        if (!INPUT.file)
311                return -1;
312        return 0;
313}
314
315static int erf_seek_erf(libtrace_t *libtrace,uint64_t erfts)
316{
317        libtrace_packet_t *packet;
318        off_t off = 0;
319
320        if (DATA(libtrace)->seek.exists==INDEX_UNKNOWN) {
321                char buffer[PATH_MAX];
322                snprintf(buffer,sizeof(buffer),"%s.idx",libtrace->uridata);
323                DATA(libtrace)->seek.index=libtrace_io_open(buffer,"rb");
324                if (DATA(libtrace)->seek.index) {
325                        DATA(libtrace)->seek.exists=INDEX_EXISTS;
326                }
327                else {
328                        DATA(libtrace)->seek.exists=INDEX_NONE;
329                }
330        }
331
332        /* If theres an index, use it to find the nearest packet that isn't
333         * after the time we're looking for.  If there is no index we need
334         * to seek slowly through the trace from the beginning.  Sigh.
335         */
336        switch(DATA(libtrace)->seek.exists) {
337                case INDEX_EXISTS:
338                        erf_fast_seek_start(libtrace,erfts);
339                        break;
340                case INDEX_NONE:
341                        erf_slow_seek_start(libtrace,erfts);
342                        break;
343                case INDEX_UNKNOWN:
344                        assert(0);
345                        break;
346        }
347
348        /* Now seek forward looking for the correct timestamp */
349        packet=trace_create_packet();
350        do {
351                trace_read_packet(libtrace,packet);
352                if (trace_get_erf_timestamp(packet)==erfts)
353                        break;
354                off=libtrace_io_tell(INPUT.file);
355        } while(trace_get_erf_timestamp(packet)<erfts);
356
357        libtrace_io_seek(INPUT.file,off,SEEK_SET);
358
359        return 0;
360}
361
362static int rtclient_init_input(libtrace_t *libtrace) {
363        char *scan;
364        libtrace->format_data = malloc(sizeof(struct erf_format_data_t));
365
366        if (strlen(libtrace->uridata) == 0) {
367                CONNINFO.rt.hostname = 
368                        strdup("localhost");
369                CONNINFO.rt.port = 
370                        COLLECTOR_PORT;
371        } else {
372                if ((scan = strchr(libtrace->uridata,':')) == NULL) {
373                        CONNINFO.rt.hostname = 
374                                strdup(libtrace->uridata);
375                        CONNINFO.rt.port =
376                                COLLECTOR_PORT;
377                } else {
378                        CONNINFO.rt.hostname = 
379                                (char *)strndup(libtrace->uridata,
380                                                (scan - libtrace->uridata));
381                        CONNINFO.rt.port = 
382                                atoi(++scan);
383                }
384        }
385
386        return 0; /* success */
387}
388
389static int rtclient_start_input(libtrace_t *libtrace)
390{
391        struct hostent *he;
392        struct sockaddr_in remote;
393        if ((he=gethostbyname(CONNINFO.rt.hostname)) == NULL) { 
394                trace_set_err(libtrace,TRACE_ERR_INIT_FAILED,"failed to resolve %s",
395                                CONNINFO.rt.hostname);
396                return -1;
397        } 
398        if ((INPUT.fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
399                trace_set_err(libtrace,errno,"socket(AF_INET,SOCK_STREAM)");
400                return -1;
401        }
402
403        remote.sin_family = AF_INET;   
404        remote.sin_port = htons(CONNINFO.rt.port);
405        remote.sin_addr = *((struct in_addr *)he->h_addr);
406        memset(&(remote.sin_zero), 0, 8);
407
408        if (connect(INPUT.fd, (struct sockaddr *)&remote,
409                                sizeof(struct sockaddr)) == -1) {
410                trace_set_err(libtrace,errno,"connect(%s)",
411                                CONNINFO.rt.hostname);
412                return -1;
413        }
414        return 0; /* success */
415}
416
417static int rtclient_pause_input(libtrace_t *libtrace)
418{
419        close(INPUT.fd);
420        return 0; /* success */
421}
422
423static int erf_init_output(libtrace_out_t *libtrace) {
424        libtrace->format_data = malloc(sizeof(struct erf_format_data_out_t));
425
426        OPTIONS.erf.level = 0;
427        OPTIONS.erf.fileflag = O_CREAT | O_WRONLY;
428        OUTPUT.file = 0;
429
430        return 0;
431}
432
433static int erf_config_output(libtrace_out_t *libtrace, trace_option_output_t option,
434                void *value) {
435
436        switch (option) {
437                case TRACE_OPTION_OUTPUT_COMPRESS:
438                        OPTIONS.erf.level = *(int*)value;
439                        return 0;
440                case TRACE_OPTION_OUTPUT_FILEFLAGS:
441                        OPTIONS.erf.fileflag = *(int*)value;
442                        return 0;
443                default:
444                        /* Unknown option */
445                        trace_set_err_out(libtrace,TRACE_ERR_UNKNOWN_OPTION,
446                                        "Unknown option");
447                        return -1;
448        }
449}
450
451
452#ifdef HAVE_DAG
453static int dag_pause_input(libtrace_t *libtrace) {
454        dag_stop(INPUT.fd);
455        return 0; /* success */
456}
457
458static int dag_fin_input(libtrace_t *libtrace) {
459        /* dag pause input implicitly called to cleanup before this */
460        dag_close(INPUT.fd);
461        if (DUCK.dummy_duck)
462                trace_destroy_dead(DUCK.dummy_duck);
463        free(libtrace->format_data);
464        return 0; /* success */
465}
466#endif
467
468static int rtclient_fin_input(libtrace_t *libtrace) {
469        free(CONNINFO.rt.hostname);
470        close(INPUT.fd);
471        free(libtrace->format_data);
472        return 0;
473}
474
475static int erf_fin_input(libtrace_t *libtrace) {
476        libtrace_io_close(INPUT.file);
477        free(libtrace->format_data);
478        return 0;
479}
480
481static int erf_fin_output(libtrace_out_t *libtrace) {
482        libtrace_io_close(OUTPUT.file);
483        free(libtrace->format_data);
484        return 0;
485}
486 
487#if HAVE_DAG
488#if DAG_VERSION_2_4
489static int dag_get_duckinfo(libtrace_t *libtrace, 
490                                libtrace_packet_t *packet) {
491        dag_inf lt_dag_inf;
492       
493        if (packet->buf_control == TRACE_CTRL_EXTERNAL || 
494                        !packet->buffer) {
495                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
496                packet->buf_control = TRACE_CTRL_PACKET;
497                if (!packet->buffer) {
498                        trace_set_err(libtrace, errno,
499                                        "Cannot allocate packet buffer");
500                        return -1;
501                }
502        }
503       
504        packet->header = 0;
505        packet->payload = packet->buffer;
506       
507        if ((ioctl(INPUT.fd, DAG_IOINF, &lt_dag_inf) < 0)) {
508                trace_set_err(libtrace, errno,
509                                "Error using DAG_IOINF");
510                return -1;
511        }
512        if (!IsDUCK(&lt_dag_inf)) {
513                printf("WARNING: %s does not have modern clock support - No DUCK information will be gathered\n", libtrace->uridata);
514                return 0;
515        }
516
517        if ((ioctl(INPUT.fd, DAG_IOGETDUCK, (duck_inf *)packet->payload) 
518                                < 0)) {
519                trace_set_err(libtrace, errno, "Error using DAG_IOGETDUCK");
520                return -1;
521        }
522
523        packet->type = RT_DUCK_2_4;
524        packet->size = sizeof(duck_inf);
525        if (!DUCK.dummy_duck) 
526                DUCK.dummy_duck = trace_create_dead("duck:dummy");
527        packet->trace = DUCK.dummy_duck;
528        return packet->size;
529}       
530#else
531static int dag_get_duckinfo(libtrace_t *libtrace, 
532                                libtrace_packet_t *packet) {
533        daginf_t lt_dag_inf;
534       
535        if (packet->buf_control == TRACE_CTRL_EXTERNAL || 
536                        !packet->buffer) {
537                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
538                packet->buf_control = TRACE_CTRL_PACKET;
539                if (!packet->buffer) {
540                        trace_set_err(libtrace, errno,
541                                        "Cannot allocate packet buffer");
542                        return -1;
543                }
544        }
545       
546        packet->header = 0;
547        packet->payload = packet->buffer;
548       
549        /* No need to check if we can get DUCK or not - we're modern
550         * enough */
551        if ((ioctl(INPUT.fd, DAGIOCDUCK, (duckinf_t *)packet->payload) 
552                                < 0)) {
553                trace_set_err(libtrace, errno, "Error using DAGIOCDUCK");
554                return -1;
555        }
556
557        packet->type = RT_DUCK_2_5;
558        packet->size = sizeof(duckinf_t);
559        if (!DUCK.dummy_duck) 
560                DUCK.dummy_duck = trace_create_dead("rt:localhost:3434");
561        packet->trace = DUCK.dummy_duck;       
562        return packet->size;
563}       
564#endif
565
566static int dag_read(libtrace_t *libtrace, int block_flag) {
567
568        if (DAG.diff != 0) 
569                return DAG.diff;
570
571        DAG.bottom = DAG.top;
572
573        DAG.top = dag_offset(
574                        INPUT.fd,
575                        &(DAG.bottom),
576                        block_flag);
577
578        DAG.diff = DAG.top - DAG.bottom;
579
580        DAG.offset = 0;
581        return DAG.diff;
582}
583
584/* FIXME: dag_read_packet shouldn't update the pointers, dag_fin_packet
585 * should do that.
586 */
587static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
588        int numbytes;
589        int size;
590        struct timeval tv;
591        dag_record_t *erfptr;
592
593        if (DUCK.last_pkt - DUCK.last_duck > DUCK.duck_freq && 
594                        DUCK.duck_freq != 0) {
595                size = dag_get_duckinfo(libtrace, packet);
596                DUCK.last_duck = DUCK.last_pkt;
597                if (size != 0) {
598                        return size;
599                }
600                /* No DUCK support, so don't waste our time anymore */
601                DUCK.duck_freq = 0;
602        }
603       
604        if (packet->buf_control == TRACE_CTRL_PACKET) {
605                packet->buf_control = TRACE_CTRL_EXTERNAL;
606                free(packet->buffer);
607                packet->buffer = 0;
608        }
609       
610        packet->type = RT_DATA_ERF;
611       
612        if ((numbytes = dag_read(libtrace,0)) < 0) 
613                return numbytes;
614        assert(numbytes>0);
615
616        /*DAG always gives us whole packets */
617        erfptr = (dag_record_t *) ((char *)DAG.buf + 
618                        (DAG.bottom + DAG.offset));
619        size = ntohs(erfptr->rlen);
620
621        assert( size >= dag_record_size );
622        assert( size < LIBTRACE_PACKET_BUFSIZE);
623       
624        packet->buffer = erfptr;
625        packet->header = erfptr;
626        if (((dag_record_t *)packet->buffer)->flags.rxerror == 1) {
627                packet->payload = NULL;
628        } else {
629                packet->payload = (char*)packet->buffer
630                        + erf_get_framing_length(packet);
631        }
632
633        DAG.offset += size;
634        DAG.diff -= size;
635
636        packet->size = size;
637        tv = trace_get_timeval(packet);
638        DUCK.last_pkt = tv.tv_sec;
639       
640        return (size);
641}
642
643static int dag_start_input(libtrace_t *libtrace) {
644        if(dag_start(INPUT.fd) < 0) {
645                trace_set_err(libtrace,errno,"Cannot start DAG %s",
646                                libtrace->uridata);
647                return -1;
648        }
649        /* dags appear to have a bug where if you call dag_start after
650         * calling dag_stop, and at least one packet has arrived, bad things
651         * happen.  flush the memory hole
652         */
653        while(dag_read(libtrace,1)!=0)
654                DAG.diff=0;
655        return 0;
656}
657#endif
658
659static int erf_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
660        int numbytes;
661        int size;
662        void *buffer2 = packet->buffer;
663        int rlen;
664
665        if (!packet->buffer || packet->buf_control == TRACE_CTRL_EXTERNAL) {
666                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
667                packet->buf_control = TRACE_CTRL_PACKET;
668                if (!packet->buffer) {
669                        trace_set_err(libtrace, errno, 
670                                        "Cannot allocate memory");
671                        return -1;
672                }
673        }
674
675       
676       
677        packet->header = packet->buffer;
678        packet->type = RT_DATA_ERF;
679
680        if ((numbytes=libtrace_io_read(INPUT.file,
681                                        packet->buffer,
682                                        dag_record_size)) == -1) {
683                trace_set_err(libtrace,errno,"read(%s)",
684                                libtrace->uridata);
685                return -1;
686        }
687        if (numbytes == 0) {
688                return 0;
689        }
690
691        rlen = ntohs(((dag_record_t *)packet->buffer)->rlen);
692        buffer2 = (char*)packet->buffer + dag_record_size;
693        size = rlen - dag_record_size;
694
695        assert(size < LIBTRACE_PACKET_BUFSIZE && size >= dag_record_size);
696
697        /* Unknown/corrupt */
698        assert(((dag_record_t *)packet->buffer)->type < 10);
699       
700        /* read in the rest of the packet */
701        if ((numbytes=libtrace_io_read(INPUT.file,
702                                        buffer2,
703                                        size)) != size) {
704                if (numbytes==-1) {
705                        trace_set_err(libtrace,errno, "read(%s)", libtrace->uridata);
706                        return -1;
707                }
708                /* Failed to read the full packet?  must be EOF */
709                return 0;
710        }
711        if (((dag_record_t *)packet->buffer)->flags.rxerror == 1) {
712                packet->payload = NULL;
713        } else {
714                packet->payload = (char*)packet->buffer + erf_get_framing_length(packet);
715        }
716        return rlen;
717}
718
719static int rtclient_read(libtrace_t *libtrace, void *buffer, size_t len) {
720        int numbytes;
721
722        while(1) {
723#ifndef MSG_NOSIGNAL
724#  define MSG_NOSIGNAL 0
725#endif
726                if ((numbytes = recv(INPUT.fd,
727                                                buffer,
728                                                len,
729                                                MSG_NOSIGNAL)) == -1) {
730                        if (errno == EINTR) {
731                                /*ignore EINTR in case
732                                 *a caller is using signals
733                                 */
734                                continue;
735                        }
736                        trace_set_err(libtrace,errno,"recv(%s)",
737                                        libtrace->uridata);
738                        return -1;
739                }
740                break;
741
742        }
743        return numbytes;
744}
745
746static int rtclient_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
747        int numbytes = 0;
748        char buf[RP_BUFSIZE];
749        int read_required = 0;
750       
751        void *buffer = 0;
752
753        if (packet->buf_control == TRACE_CTRL_EXTERNAL || !packet->buffer) {
754                packet->buf_control = TRACE_CTRL_PACKET;
755                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
756        }
757
758        buffer = packet->buffer;
759        packet->header = packet->buffer;
760       
761        packet->type = RT_DATA_ERF;
762
763       
764        do {
765                libtrace_packet_status_t status;
766                int size;
767                if (tracefifo_out_available(libtrace->fifo) == 0 
768                                || read_required) {
769                        if ((numbytes = rtclient_read(
770                                        libtrace,buf,RP_BUFSIZE))<=0) {
771                                return numbytes;
772                        }
773                        tracefifo_write(libtrace->fifo,buf,numbytes);
774                        read_required = 0;
775                }
776                /* Read status byte */
777                if (tracefifo_out_read(libtrace->fifo,
778                                &status, sizeof(uint32_t)) == 0) {
779                        read_required = 1;
780                        continue;
781                }
782                tracefifo_out_update(libtrace->fifo,sizeof(uint32_t));
783                /* Read in packet size */
784                if (tracefifo_out_read(libtrace->fifo,
785                                &size, sizeof(uint32_t)) == 0) {
786                        tracefifo_out_reset(libtrace->fifo);
787                        read_required = 1;
788                        continue;
789                }
790                tracefifo_out_update(libtrace->fifo, sizeof(uint32_t));
791               
792                if (status.type == 2 /* RT_MSG */) {
793                        /* Need to skip this packet as it is a message packet */
794                        tracefifo_out_update(libtrace->fifo, size);
795                        tracefifo_ack_update(libtrace->fifo, size + 
796                                        sizeof(uint32_t) + 
797                                        sizeof(libtrace_packet_status_t));
798                        continue;
799                }
800               
801                /* read in the full packet */
802                if ((numbytes = tracefifo_out_read(libtrace->fifo, 
803                                                buffer, size)) == 0) {
804                        tracefifo_out_reset(libtrace->fifo);
805                        read_required = 1;
806                        continue;
807                }
808
809                /* got in our whole packet, so... */
810                tracefifo_out_update(libtrace->fifo,size);
811
812                tracefifo_ack_update(libtrace->fifo,size + 
813                                sizeof(uint32_t) + 
814                                sizeof(libtrace_packet_status_t));
815
816                if (((dag_record_t *)buffer)->flags.rxerror == 1) {
817                        packet->payload = NULL;
818                } else {
819                        packet->payload = (char*)packet->buffer + erf_get_framing_length(packet);
820                }
821                return numbytes;
822        } while(1);
823}
824
825static int erf_dump_packet(libtrace_out_t *libtrace,
826                dag_record_t *erfptr, int pad, void *buffer, size_t size) {
827        int numbytes = 0;
828        assert(size<=65536);
829
830        if ((numbytes = libtrace_io_write(OUTPUT.file, erfptr, dag_record_size + pad)) != dag_record_size+pad) {
831                trace_set_err_out(libtrace,errno,
832                                "write(%s)",libtrace->uridata);
833                return -1;
834        }
835
836        if ((numbytes=libtrace_io_write(OUTPUT.file, buffer, size)) != (int)size) {
837                trace_set_err_out(libtrace,errno,
838                                "write(%s)",libtrace->uridata);
839                return -1;
840        }
841
842        return numbytes + pad + dag_record_size;
843}
844
845static int erf_start_output(libtrace_out_t *libtrace)
846{
847        OUTPUT.file = trace_open_file_out(libtrace,
848                        OPTIONS.erf.level,
849                        OPTIONS.erf.fileflag);
850        if (!OUTPUT.file) {
851                return -1;
852        }
853        return 0;
854}
855               
856static int erf_write_packet(libtrace_out_t *libtrace, 
857                const libtrace_packet_t *packet) 
858{
859        int numbytes = 0;
860        int pad = 0;
861        dag_record_t *dag_hdr = (dag_record_t *)packet->header;
862        void *payload = packet->payload;
863
864        assert(OUTPUT.file);
865
866        if (!packet->header) {
867                //trace_set_err_output(libtrace, TRACE_ERR_BAD_PACKET,
868                //              "Packet has no header - probably an RT packet");
869                return -1;
870        }
871       
872        pad = erf_get_padding(packet);
873
874        /* If we've had an rxerror, we have no payload to write - fix rlen to
875         * be the correct length */
876        if (payload == NULL) {
877                dag_hdr->rlen = htons(dag_record_size + pad);
878        } 
879       
880        if (packet->trace->format == &erf 
881#if HAVE_DAG
882                        || packet->trace->format == &dag
883#endif
884                        ) {
885                numbytes = erf_dump_packet(libtrace,
886                                (dag_record_t *)packet->header,
887                                pad,
888                                payload,
889                                trace_get_capture_length(packet)
890                                );
891        } else {
892                dag_record_t erfhdr;
893                int type;
894                /* convert format - build up a new erf header */
895                /* Timestamp */
896                erfhdr.ts = trace_get_erf_timestamp(packet);
897                type=libtrace_to_erf_type(trace_get_link_type(packet));
898                if (type==(char)-1) {
899                        trace_set_err_out(libtrace,TRACE_ERR_NO_CONVERSION,
900                                        "No erf type for packet");
901                        return -1;
902                }
903                erfhdr.type = type;
904                /* Flags. Can't do this */
905                memset(&erfhdr.flags,1,sizeof(erfhdr.flags));
906                /* Packet length (rlen includes format overhead) */
907                erfhdr.rlen = trace_get_capture_length(packet) 
908                        + erf_get_framing_length(packet);
909                /* loss counter. Can't do this */
910                erfhdr.lctr = 0;
911                /* Wire length */
912                erfhdr.wlen = trace_get_wire_length(packet);
913
914                /* Write it out */
915                numbytes = erf_dump_packet(libtrace,
916                                &erfhdr,
917                                pad,
918                                payload,
919                                trace_get_capture_length(packet));
920        }
921        return numbytes;
922}
923
924static libtrace_linktype_t erf_get_link_type(const libtrace_packet_t *packet) {
925        dag_record_t *erfptr = 0;
926        erfptr = (dag_record_t *)packet->header;
927        return erf_type_to_libtrace(erfptr->type);
928}
929
930static libtrace_direction_t erf_get_direction(const libtrace_packet_t *packet) {
931        dag_record_t *erfptr = 0;
932        erfptr = (dag_record_t *)packet->header;
933        return erfptr->flags.iface;
934}
935
936static libtrace_direction_t erf_set_direction(libtrace_packet_t *packet, libtrace_direction_t direction) {
937        dag_record_t *erfptr = 0;
938        erfptr = (dag_record_t *)packet->header;
939        erfptr->flags.iface = direction;
940        return erfptr->flags.iface;
941}
942
943static uint64_t erf_get_erf_timestamp(const libtrace_packet_t *packet) {
944        dag_record_t *erfptr = 0;
945        erfptr = (dag_record_t *)packet->header;
946        return erfptr->ts;
947}
948
949static int erf_get_capture_length(const libtrace_packet_t *packet) {
950        dag_record_t *erfptr = 0;
951        erfptr = (dag_record_t *)packet->header;
952        return (ntohs(erfptr->rlen) - erf_get_framing_length(packet));
953}
954
955static int erf_get_wire_length(const libtrace_packet_t *packet) {
956        dag_record_t *erfptr = 0;
957        erfptr = (dag_record_t *)packet->header;
958        return ntohs(erfptr->wlen);
959}
960
961static size_t erf_set_capture_length(libtrace_packet_t *packet, size_t size) {
962        dag_record_t *erfptr = 0;
963        assert(packet);
964        if(size  > trace_get_capture_length(packet)) {
965                /* can't make a packet larger */
966                return trace_get_capture_length(packet);
967        }
968        erfptr = (dag_record_t *)packet->header;
969        erfptr->rlen = htons(size + erf_get_framing_length(packet));
970        return trace_get_capture_length(packet);
971}
972
973static int rtclient_get_fd(const libtrace_t *libtrace) {
974        return INPUT.fd;
975}
976
977#ifdef HAVE_DAG
978libtrace_eventobj_t trace_event_dag(libtrace_t *trace, libtrace_packet_t *packet) {
979        libtrace_eventobj_t event = {0,0,0.0,0};
980        int dag_fd;
981        int data;
982
983        if (trace->format->get_fd) {
984                dag_fd = trace->format->get_fd(trace);
985        } else {
986                dag_fd = 0;
987        }
988       
989        data = dag_read(trace, DAGF_NONBLOCK);
990
991        if (data > 0) {
992                event.size = trace_read_packet(trace,packet);
993                event.type = TRACE_EVENT_PACKET;
994                return event;
995        }
996        event.type = TRACE_EVENT_SLEEP;
997        event.seconds = 0.0001;
998        return event;
999}
1000#endif
1001
1002#if HAVE_DAG
1003static void dag_help() {
1004        printf("dag format module: $Revision$\n");
1005        printf("Supported input URIs:\n");
1006        printf("\tdag:/dev/dagn\n");
1007        printf("\n");
1008        printf("\te.g.: dag:/dev/dag0\n");
1009        printf("\n");
1010        printf("Supported output URIs:\n");
1011        printf("\tnone\n");
1012        printf("\n");
1013}
1014#endif
1015
1016static void erf_help() {
1017        printf("erf format module: $Revision$\n");
1018        printf("Supported input URIs:\n");
1019        printf("\terf:/path/to/file\t(uncompressed)\n");
1020        printf("\terf:/path/to/file.gz\t(gzip-compressed)\n");
1021        printf("\terf:-\t(stdin, either compressed or not)\n");
1022        printf("\terf:/path/to/socket\n");
1023        printf("\n");
1024        printf("\te.g.: erf:/tmp/trace\n");
1025        printf("\n");
1026        printf("Supported output URIs:\n");
1027        printf("\terf:path/to/file\t(uncompressed)\n");
1028        printf("\terf:/path/to/file.gz\t(gzip-compressed)\n");
1029        printf("\terf:-\t(stdout, either compressed or not)\n");
1030        printf("\n");
1031        printf("\te.g.: erf:/tmp/trace\n");
1032        printf("\n");
1033        printf("Supported output options:\n");
1034        printf("\t-z\tSpecify the gzip compression, ranging from 0 (uncompressed) to 9 - defaults to 1\n");
1035        printf("\n");
1036
1037       
1038}
1039
1040static void rtclient_help() {
1041        printf("rtclient format module: $Revision$\n");
1042        printf("DEPRECATED - use rt module instead\n");
1043        printf("Supported input URIs:\n");
1044        printf("\trtclient:host:port\n");
1045        printf("\n");
1046        printf("\te.g.:rtclient:localhost:3435\n");
1047        printf("\n");
1048        printf("Supported output URIs:\n");
1049        printf("\tnone\n");
1050        printf("\n");
1051}       
1052
1053static struct libtrace_format_t erf = {
1054        "erf",
1055        "$Id$",
1056        TRACE_FORMAT_ERF,
1057        erf_init_input,                 /* init_input */       
1058        NULL,                           /* config_input */
1059        erf_start_input,                /* start_input */
1060        NULL,                           /* pause_input */
1061        erf_init_output,                /* init_output */
1062        erf_config_output,              /* config_output */
1063        erf_start_output,               /* start_output */
1064        erf_fin_input,                  /* fin_input */
1065        erf_fin_output,                 /* fin_output */
1066        erf_read_packet,                /* read_packet */
1067        NULL,                           /* fin_packet */
1068        erf_write_packet,               /* write_packet */
1069        erf_get_link_type,              /* get_link_type */
1070        erf_get_direction,              /* get_direction */
1071        erf_set_direction,              /* set_direction */
1072        erf_get_erf_timestamp,          /* get_erf_timestamp */
1073        NULL,                           /* get_timeval */
1074        NULL,                           /* get_seconds */
1075        erf_seek_erf,                   /* seek_erf */
1076        NULL,                           /* seek_timeval */
1077        NULL,                           /* seek_seconds */
1078        erf_get_capture_length,         /* get_capture_length */
1079        erf_get_wire_length,            /* get_wire_length */
1080        erf_get_framing_length,         /* get_framing_length */
1081        erf_set_capture_length,         /* set_capture_length */
1082        NULL,                           /* get_fd */
1083        trace_event_trace,              /* trace_event */
1084        erf_help,                       /* help */
1085        NULL                            /* next pointer */
1086};
1087
1088#ifdef HAVE_DAG
1089static struct libtrace_format_t dag = {
1090        "dag",
1091        "$Id$",
1092        TRACE_FORMAT_ERF,
1093        dag_init_input,                 /* init_input */       
1094        dag_config_input,               /* config_input */
1095        dag_start_input,                /* start_input */
1096        dag_pause_input,                /* pause_input */
1097        NULL,                           /* init_output */
1098        NULL,                           /* config_output */
1099        NULL,                           /* start_output */
1100        dag_fin_input,                  /* fin_input */
1101        NULL,                           /* fin_output */
1102        dag_read_packet,                /* read_packet */
1103        NULL,                           /* fin_packet */
1104        NULL,                           /* write_packet */
1105        erf_get_link_type,              /* get_link_type */
1106        erf_get_direction,              /* get_direction */
1107        erf_set_direction,              /* set_direction */
1108        erf_get_erf_timestamp,          /* get_erf_timestamp */
1109        NULL,                           /* get_timeval */
1110        NULL,                           /* get_seconds */
1111        NULL,                           /* seek_erf */
1112        NULL,                           /* seek_timeval */
1113        NULL,                           /* seek_seconds */
1114        erf_get_capture_length,         /* get_capture_length */
1115        erf_get_wire_length,            /* get_wire_length */
1116        erf_get_framing_length,         /* get_framing_length */
1117        erf_set_capture_length,         /* set_capture_length */
1118        NULL,                           /* get_fd */
1119        trace_event_dag,                /* trace_event */
1120        dag_help,                       /* help */
1121        NULL                            /* next pointer */
1122};
1123#endif
1124
1125static struct libtrace_format_t rtclient = {
1126        "rtclient",
1127        "$Id$",
1128        TRACE_FORMAT_ERF,
1129        rtclient_init_input,            /* init_input */       
1130        NULL,                           /* config_input */
1131        rtclient_start_input,           /* start_input */
1132        rtclient_pause_input,           /* pause_input */
1133        NULL,                           /* init_output */
1134        NULL,                           /* config_output */
1135        NULL,                           /* start_output */
1136        rtclient_fin_input,             /* fin_input */
1137        NULL,                           /* fin_output */
1138        rtclient_read_packet,           /* read_packet */
1139        NULL,                           /* fin_packet */
1140        NULL,                           /* write_packet */
1141        erf_get_link_type,              /* get_link_type */
1142        erf_get_direction,              /* get_direction */
1143        erf_set_direction,              /* set_direction */
1144        erf_get_erf_timestamp,          /* get_erf_timestamp */
1145        NULL,                           /* get_timeval */
1146        NULL,                           /* get_seconds */
1147        NULL,                           /* seek_erf */
1148        NULL,                           /* seek_timeval */
1149        NULL,                           /* seek_seconds */
1150        erf_get_capture_length,         /* get_capture_length */
1151        erf_get_wire_length,            /* get_wire_length */
1152        erf_get_framing_length,         /* get_framing_length */
1153        erf_set_capture_length,         /* set_capture_length */
1154        rtclient_get_fd,                /* get_fd */
1155        trace_event_device,             /* trace_event */
1156        rtclient_help,                  /* help */
1157        NULL                            /* next pointer */
1158};
1159
1160void CONSTRUCTOR erf_constructor() {
1161        register_format(&rtclient);
1162        register_format(&erf);
1163#ifdef HAVE_DAG
1164        register_format(&dag);
1165#endif
1166}
Note: See TracBrowser for help on using the repository browser.