source: lib/format_erf.c @ da34e20

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivegetfragoffhelplibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since da34e20 was da34e20, checked in by Shane Alcock <salcock@…>, 15 years ago

dag_read_packet now returns a DUCK packet occasionally in place of a regular packet - changing the DUCK frequency will *soon* be a config option
wag_start and wag_pause will now turn the radio on and off, respectively
Added DUCK structures to rt_protocol.h for use with RT_DUCK_* packets
Updated format_rt to deal with RT_DUCK properly
Removed erroneous free(packet->buffer) when packet is not under our control in format_pcap.
Added check for DAG 2.4 in configure

  • Property mode set to 100644
File size: 28.0 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2004 The University of Waikato, Hamilton, New Zealand.
5 * Authors: Daniel Lawson
6 *          Perry Lorier
7 *         
8 * All rights reserved.
9 *
10 * This code has been developed by the University of Waikato WAND
11 * research group. For further information please see http://www.wand.net.nz/
12 *
13 * libtrace is free software; you can redistribute it and/or modify
14 * it under the terms of the GNU General Public License as published by
15 * the Free Software Foundation; either version 2 of the License, or
16 * (at your option) any later version.
17 *
18 * libtrace is distributed in the hope that it will be useful,
19 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21 * GNU General Public License for more details.
22 *
23 * You should have received a copy of the GNU General Public License
24 * along with libtrace; if not, write to the Free Software
25 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26 *
27 * $Id$
28 *
29 */
30#define _GNU_SOURCE
31
32#include "config.h"
33#include "common.h"
34#include "libtrace.h"
35#include "libtrace_int.h"
36#include "format_helper.h"
37#include "parse_cmd.h"
38
39#include <assert.h>
40#include <errno.h>
41#include <fcntl.h>
42#include <stdio.h>
43#include <string.h>
44#include <stdlib.h>
45
46#if HAVE_DAG
47#include <sys/mman.h>
48#endif
49
50#ifdef WIN32
51#  include <io.h>
52#  include <share.h>
53#  define PATH_MAX _MAX_PATH
54#  define snprintf sprintf_s
55#else
56#  include <netdb.h>
57#  ifndef PATH_MAX
58#       define PATH_MAX 4096
59#  endif
60#endif
61
62
63#define COLLECTOR_PORT 3435
64
65static struct libtrace_format_t erf;
66static struct libtrace_format_t rtclient;
67#if HAVE_DAG
68static struct libtrace_format_t dag;
69#endif
70
71#define DATA(x) ((struct erf_format_data_t *)x->format_data)
72#define DATAOUT(x) ((struct erf_format_data_out_t *)x->format_data)
73
74#define CONNINFO DATA(libtrace)->conn_info
75#define INPUT DATA(libtrace)->input
76#define OUTPUT DATAOUT(libtrace)->output
77#if HAVE_DAG
78#define DAG DATA(libtrace)->dag
79#define DUCK DATA(libtrace)->duck
80#endif
81#define OPTIONS DATAOUT(libtrace)->options
82struct erf_format_data_t {
83        union {
84                struct {
85                        char *hostname;
86                        short port;
87                } rt;
88        } conn_info;
89       
90        union {
91                int fd;
92                libtrace_io_t *file;
93        } input;
94
95        struct {
96                enum { INDEX_UNKNOWN=0, INDEX_NONE, INDEX_EXISTS } exists;
97                libtrace_io_t *index;
98                off_t index_len;
99        } seek;
100
101#if HAVE_DAG
102        struct {
103                uint32_t last_duck;     
104                uint32_t duck_freq;
105                uint32_t last_pkt;
106                libtrace_t *dummy_rt;
107        } duck;
108       
109        struct {
110                void *buf; 
111                unsigned bottom;
112                unsigned top;
113                unsigned diff;
114                unsigned curr;
115                unsigned offset;
116        } dag;
117#endif
118};
119
120struct erf_format_data_out_t {
121        union {
122                struct {
123                        char *hostname;
124                        short port;
125                } rt;
126                char *path;
127        } conn_info;
128
129        union {
130                struct {
131                        int level;
132                        int fileflag;
133                } erf;
134               
135        } options;
136       
137        union {
138                int fd;
139                struct rtserver_t * rtserver;
140                libtrace_io_t *file;
141        } output;
142};
143
144/** Structure holding status information for a packet */
145typedef struct libtrace_packet_status {
146        uint8_t type;
147        uint8_t reserved;
148        uint16_t message;
149} libtrace_packet_status_t;
150
151typedef struct erf_index_t {
152        uint64_t timestamp;
153        uint64_t offset; 
154} erf_index_t;
155
156#ifdef HAVE_DAG
157static int dag_init_input(libtrace_t *libtrace) {
158        struct stat buf;
159
160        libtrace->format_data = (struct erf_format_data_t *)
161                malloc(sizeof(struct erf_format_data_t));
162        if (stat(libtrace->uridata, &buf) == -1) {
163                trace_set_err(libtrace,errno,"stat(%s)",libtrace->uridata);
164                return -1;
165        } 
166        if (S_ISCHR(buf.st_mode)) {
167                /* DEVICE */
168                if((INPUT.fd = dag_open(libtrace->uridata)) < 0) {
169                        trace_set_err(libtrace,errno,"Cannot open DAG %s",
170                                        libtrace->uridata);
171                        return -1;
172                }
173                if((DAG.buf = (void *)dag_mmap(INPUT.fd)) == MAP_FAILED) {
174                        trace_set_err(libtrace,errno,"Cannot mmap DAG %s",
175                                        libtrace->uridata);
176                        return -1;
177                }
178        } else {
179                trace_set_err(libtrace,errno,"Not a valid dag device: %s",
180                                libtrace->uridata);
181                return -1;
182        }
183
184        DUCK.last_duck = 0;
185        DUCK.duck_freq = 60;  /** 5 minutes */
186        DUCK.last_pkt = 0;
187        DUCK.dummy_rt = NULL;
188       
189        return 0;
190}
191
192#endif
193
194/* Dag erf ether packets have a 2 byte padding before the packet
195 * so that the ip header is aligned on a 32 bit boundary.
196 */
197static int erf_get_padding(const libtrace_packet_t *packet)
198{
199        dag_record_t *erfptr = (dag_record_t *)packet->header;
200        switch(erfptr->type) {
201                case TYPE_ETH:          return 2;
202                default:                return 0;
203        }
204}
205
206static int erf_get_framing_length(const libtrace_packet_t *packet)
207{
208        return dag_record_size + erf_get_padding(packet);
209}
210
211
212static int erf_init_input(libtrace_t *libtrace) 
213{
214        libtrace->format_data = malloc(sizeof(struct erf_format_data_t));
215       
216        INPUT.file = 0;
217
218        return 0; /* success */
219}
220
221static int erf_start_input(libtrace_t *libtrace)
222{
223        if (INPUT.file)
224                return 0; /* success */
225
226        INPUT.file = trace_open_file(libtrace);
227
228        if (!INPUT.file)
229                return -1;
230
231        return 0; /* success */
232}
233
234/* Binary search through the index to find the closest point before
235 * the packet.  Consider in future having a btree index perhaps?
236 */
237static int erf_fast_seek_start(libtrace_t *libtrace,uint64_t erfts)
238{
239        size_t max_off = DATA(libtrace)->seek.index_len/sizeof(erf_index_t);
240        size_t min_off = 0;
241        off_t current;
242        erf_index_t record;
243        do {
244                current=(max_off+min_off)>>2;
245
246                libtrace_io_seek(DATA(libtrace)->seek.index,
247                                current*sizeof(record),
248                                SEEK_SET);
249                libtrace_io_read(DATA(libtrace)->seek.index,
250                                &record,sizeof(record));
251                if (record.timestamp < erfts) {
252                        min_off=current;
253                }
254                if (record.timestamp > erfts) {
255                        max_off=current;
256                }
257                if (record.timestamp == erfts)
258                        break;
259        } while(min_off<max_off);
260
261        /* If we've passed it, seek backwards.  This loop shouldn't
262         * execute more than twice.
263         */
264        do {
265                libtrace_io_seek(DATA(libtrace)->seek.index,
266                                current*sizeof(record),SEEK_SET);
267                libtrace_io_read(DATA(libtrace)->seek.index,
268                                &record,sizeof(record));
269                current--;
270        } while(record.timestamp>erfts);
271
272        /* We've found our location in the trace, now use it. */
273        libtrace_io_seek(INPUT.file,record.offset,SEEK_SET);
274
275        return 0; /* success */
276}
277
278/* There is no index.  Seek through the entire trace from the start, nice
279 * and slowly.
280 */
281static int erf_slow_seek_start(libtrace_t *libtrace,uint64_t erfts)
282{
283        if (INPUT.file) {
284                libtrace_io_close(INPUT.file);
285        }
286        INPUT.file = trace_open_file(libtrace);
287        if (!INPUT.file)
288                return -1;
289        return 0;
290}
291
292static int erf_seek_erf(libtrace_t *libtrace,uint64_t erfts)
293{
294        libtrace_packet_t *packet;
295        off_t off = 0;
296
297        if (DATA(libtrace)->seek.exists==INDEX_UNKNOWN) {
298                char buffer[PATH_MAX];
299                snprintf(buffer,sizeof(buffer),"%s.idx",libtrace->uridata);
300                DATA(libtrace)->seek.index=libtrace_io_open(buffer,"r");
301                if (DATA(libtrace)->seek.index) {
302                        DATA(libtrace)->seek.exists=INDEX_EXISTS;
303                }
304                else {
305                        DATA(libtrace)->seek.exists=INDEX_NONE;
306                }
307        }
308
309        /* If theres an index, use it to find the nearest packet that isn't
310         * after the time we're looking for.  If there is no index we need
311         * to seek slowly through the trace from the beginning.  Sigh.
312         */
313        switch(DATA(libtrace)->seek.exists) {
314                case INDEX_EXISTS:
315                        erf_fast_seek_start(libtrace,erfts);
316                        break;
317                case INDEX_NONE:
318                        erf_slow_seek_start(libtrace,erfts);
319                        break;
320                case INDEX_UNKNOWN:
321                        assert(0);
322                        break;
323        }
324
325        /* Now seek forward looking for the correct timestamp */
326        packet=trace_create_packet();
327        do {
328                trace_read_packet(libtrace,packet);
329                if (trace_get_erf_timestamp(packet)==erfts)
330                        break;
331                off=libtrace_io_tell(INPUT.file);
332        } while(trace_get_erf_timestamp(packet)<erfts);
333
334        libtrace_io_seek(INPUT.file,off,SEEK_SET);
335
336        return 0;
337}
338
339static int rtclient_init_input(libtrace_t *libtrace) {
340        char *scan;
341        libtrace->format_data = malloc(sizeof(struct erf_format_data_t));
342
343        if (strlen(libtrace->uridata) == 0) {
344                CONNINFO.rt.hostname = 
345                        strdup("localhost");
346                CONNINFO.rt.port = 
347                        COLLECTOR_PORT;
348        } else {
349                if ((scan = strchr(libtrace->uridata,':')) == NULL) {
350                        CONNINFO.rt.hostname = 
351                                strdup(libtrace->uridata);
352                        CONNINFO.rt.port =
353                                COLLECTOR_PORT;
354                } else {
355                        CONNINFO.rt.hostname = 
356                                (char *)strndup(libtrace->uridata,
357                                                (scan - libtrace->uridata));
358                        CONNINFO.rt.port = 
359                                atoi(++scan);
360                }
361        }
362
363        return 0; /* success */
364}
365
366static int rtclient_start_input(libtrace_t *libtrace)
367{
368        struct hostent *he;
369        struct sockaddr_in remote;
370        if ((he=gethostbyname(CONNINFO.rt.hostname)) == NULL) { 
371                trace_set_err(libtrace,TRACE_ERR_INIT_FAILED,"failed to resolve %s",
372                                CONNINFO.rt.hostname);
373                return -1;
374        } 
375        if ((INPUT.fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
376                trace_set_err(libtrace,errno,"socket(AF_INET,SOCK_STREAM)");
377                return -1;
378        }
379
380        remote.sin_family = AF_INET;   
381        remote.sin_port = htons(CONNINFO.rt.port);
382        remote.sin_addr = *((struct in_addr *)he->h_addr);
383        memset(&(remote.sin_zero), 0, 8);
384
385        if (connect(INPUT.fd, (struct sockaddr *)&remote,
386                                sizeof(struct sockaddr)) == -1) {
387                trace_set_err(libtrace,errno,"connect(%s)",
388                                CONNINFO.rt.hostname);
389                return -1;
390        }
391        return 0; /* success */
392}
393
394static int rtclient_pause_input(libtrace_t *libtrace)
395{
396        close(INPUT.fd);
397        return 0; /* success */
398}
399
400static int erf_init_output(libtrace_out_t *libtrace) {
401        libtrace->format_data = malloc(sizeof(struct erf_format_data_out_t));
402
403        OPTIONS.erf.level = 0;
404        OPTIONS.erf.fileflag = O_CREAT | O_WRONLY;
405        OUTPUT.file = 0;
406
407        return 0;
408}
409
410static int erf_config_output(libtrace_out_t *libtrace, trace_option_output_t option,
411                void *value) {
412
413        switch (option) {
414                case TRACE_OPTION_OUTPUT_COMPRESS:
415                        OPTIONS.erf.level = *(int*)value;
416                        return 0;
417                case TRACE_OPTION_OUTPUT_FILEFLAGS:
418                        OPTIONS.erf.fileflag = *(int*)value;
419                        return 0;
420                default:
421                        /* Unknown option */
422                        trace_set_err_out(libtrace,TRACE_ERR_UNKNOWN_OPTION,
423                                        "Unknown option");
424                        return -1;
425        }
426}
427
428
429#ifdef HAVE_DAG
430static int dag_pause_input(libtrace_t *libtrace) {
431        dag_stop(INPUT.fd);
432        return 0; /* success */
433}
434
435static int dag_fin_input(libtrace_t *libtrace) {
436        /* dag pause input implicitly called to cleanup before this */
437        dag_close(INPUT.fd);
438        if (DUCK.dummy_rt)
439                trace_destroy_dead(DUCK.dummy_rt);
440        free(libtrace->format_data);
441        return 0; /* success */
442}
443#endif
444
445static int rtclient_fin_input(libtrace_t *libtrace) {
446        free(CONNINFO.rt.hostname);
447        close(INPUT.fd);
448        free(libtrace->format_data);
449        return 0;
450}
451
452static int erf_fin_input(libtrace_t *libtrace) {
453        libtrace_io_close(INPUT.file);
454        free(libtrace->format_data);
455        return 0;
456}
457
458static int erf_fin_output(libtrace_out_t *libtrace) {
459        libtrace_io_close(OUTPUT.file);
460        free(libtrace->format_data);
461        return 0;
462}
463 
464#if HAVE_DAG
465#if DAG_VERSION_2_4
466static int dag_get_duckinfo(libtrace_t *libtrace, 
467                                libtrace_packet_t *packet) {
468        dag_inf lt_dag_inf;
469       
470        if (packet->buf_control == TRACE_CTRL_EXTERNAL || 
471                        !packet->buffer) {
472                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
473                packet->buf_control = TRACE_CTRL_PACKET;
474                if (!packet->buffer) {
475                        trace_set_err(libtrace, errno,
476                                        "Cannot allocate packet buffer");
477                        return -1;
478                }
479        }
480       
481        packet->header = 0;
482        packet->payload = packet->buffer;
483       
484        if ((ioctl(INPUT.fd, DAG_IOINF, &lt_dag_inf) < 0)) {
485                trace_set_err(libtrace, errno,
486                                "Error using DAG_IOINF");
487                return -1;
488        }
489        if (!IsDUCK(&lt_dag_inf)) {
490                printf("WARNING: %s does not have modern clock support - No DUCK information will be gathered\n", libtrace->uridata);
491                return 0;
492        }
493
494        if ((ioctl(INPUT.fd, DAG_IOGETDUCK, (duck_inf *)packet->payload) 
495                                < 0)) {
496                trace_set_err(libtrace, errno, "Error using DAG_IOGETDUCK");
497                return -1;
498        }
499
500        packet->type = RT_DUCK_2_4;
501        packet->size = sizeof(duck_inf);
502        if (!DUCK.dummy_rt) 
503                DUCK.dummy_rt = trace_create_dead("rt:localhost:3434");
504        packet->trace = DUCK.dummy_rt; 
505}       
506#else
507static int dag_get_duckinfo(libtrace_t *libtrace, 
508                                libtrace_packet_t *packet) {
509        daginf_t lt_dag_inf;
510       
511        if (packet->buf_control == TRACE_CTRL_EXTERNAL || 
512                        !packet->buffer) {
513                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
514                packet->buf_control = TRACE_CTRL_PACKET;
515                if (!packet->buffer) {
516                        trace_set_err(libtrace, errno,
517                                        "Cannot allocate packet buffer");
518                        return -1;
519                }
520        }
521       
522        packet->header = 0;
523        packet->payload = packet->buffer;
524       
525        if ((ioctl(INPUT.fd, DAGIOCINFO, &lt_dag_inf) < 0)) {
526                trace_set_err(libtrace, errno,
527                                "Error using DAGIOCINFO");
528                return -1;
529        }
530        if (!IsDUCK(&lt_dag_inf)) {
531                printf("WARNING: %s does not have modern clock support - No DUCK information will be gathered\n", libtrace->uridata);
532                return 0;
533        }
534
535        if ((ioctl(INPUT.fd, DAGIOCDUCK, (duckinf_t *)packet->payload) 
536                                < 0)) {
537                trace_set_err(libtrace, errno, "Error using DAGIOCDUCK");
538                return -1;
539        }
540
541        packet->type = RT_DUCK_2_5;
542        packet->size = sizeof(duckinf_t);
543        if (!DUCK.dummy_rt) 
544                DUCK.dummy_rt = trace_create_dead("rt:localhost:3434");
545        packet->trace = DUCK.dummy_rt; 
546}       
547#endif
548
549static int dag_read(libtrace_t *libtrace, int block_flag) {
550
551        if (DAG.diff != 0) 
552                return DAG.diff;
553
554        DAG.bottom = DAG.top;
555
556        DAG.top = dag_offset(
557                        INPUT.fd,
558                        &(DAG.bottom),
559                        block_flag);
560
561        DAG.diff = DAG.top - DAG.bottom;
562
563        DAG.offset = 0;
564        return DAG.diff;
565}
566
567/* FIXME: dag_read_packet shouldn't update the pointers, dag_fin_packet
568 * should do that.
569 */
570static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
571        int numbytes;
572        int size;
573        struct timeval tv;
574        dag_record_t *erfptr;
575
576        if (DUCK.last_pkt - DUCK.last_duck > DUCK.duck_freq && 
577                        DUCK.duck_freq != 0) {
578                size = dag_get_duckinfo(libtrace, packet);
579                DUCK.last_duck = DUCK.last_pkt;
580                if (size != 0) {
581                        return size;
582                }
583                /* No DUCK support, so don't waste our time anymore */
584                DUCK.duck_freq = 0;
585        }
586       
587        if (packet->buf_control == TRACE_CTRL_PACKET) {
588                packet->buf_control = TRACE_CTRL_EXTERNAL;
589                free(packet->buffer);
590                packet->buffer = 0;
591        }
592       
593        packet->type = RT_DATA_ERF;
594       
595        if ((numbytes = dag_read(libtrace,0)) < 0) 
596                return numbytes;
597        assert(numbytes>0);
598
599        /*DAG always gives us whole packets */
600        erfptr = (dag_record_t *) ((char *)DAG.buf + 
601                        (DAG.bottom + DAG.offset));
602        size = ntohs(erfptr->rlen);
603
604        assert( size >= dag_record_size );
605        assert( size < LIBTRACE_PACKET_BUFSIZE);
606       
607        packet->buffer = erfptr;
608        packet->header = erfptr;
609        if (((dag_record_t *)packet->buffer)->flags.rxerror == 1) {
610                packet->payload = NULL;
611        } else {
612                packet->payload = (char*)packet->buffer
613                        + erf_get_framing_length(packet);
614        }
615
616        DAG.offset += size;
617        DAG.diff -= size;
618
619        packet->size = size;
620        tv = trace_get_timeval(packet);
621        DUCK.last_pkt = tv.tv_sec;
622       
623        return (size);
624}
625
626static int dag_start_input(libtrace_t *libtrace) {
627        if(dag_start(INPUT.fd) < 0) {
628                trace_set_err(libtrace,errno,"Cannot start DAG %s",
629                                libtrace->uridata);
630                return -1;
631        }
632        /* dags appear to have a bug where if you call dag_start after
633         * calling dag_stop, and at least one packet has arrived, bad things
634         * happen.  flush the memory hole
635         */
636        while(dag_read(libtrace,1)!=0)
637                DAG.diff=0;
638        return 0;
639}
640#endif
641
642static int erf_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
643        int numbytes;
644        int size;
645        void *buffer2 = packet->buffer;
646        int rlen;
647
648        if (!packet->buffer || packet->buf_control == TRACE_CTRL_EXTERNAL) {
649                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
650                packet->buf_control = TRACE_CTRL_PACKET;
651                if (!packet->buffer) {
652                        trace_set_err(libtrace, errno, 
653                                        "Cannot allocate memory");
654                        return -1;
655                }
656        }
657
658       
659       
660        packet->header = packet->buffer;
661        packet->type = RT_DATA_ERF;
662
663        if ((numbytes=libtrace_io_read(INPUT.file,
664                                        packet->buffer,
665                                        dag_record_size)) == -1) {
666                trace_set_err(libtrace,errno,"read(%s)",
667                                libtrace->uridata);
668                return -1;
669        }
670        if (numbytes == 0) {
671                return 0;
672        }
673
674        rlen = ntohs(((dag_record_t *)packet->buffer)->rlen);
675        buffer2 = (char*)packet->buffer + dag_record_size;
676        size = rlen - dag_record_size;
677
678        assert(size < LIBTRACE_PACKET_BUFSIZE && size >= dag_record_size);
679
680        /* Unknown/corrupt */
681        assert(((dag_record_t *)packet->buffer)->type < 10);
682       
683        /* read in the rest of the packet */
684        if ((numbytes=libtrace_io_read(INPUT.file,
685                                        buffer2,
686                                        size)) != size) {
687                trace_set_err(libtrace,errno, "read(%s)", libtrace->uridata);
688                return -1;
689        }
690        if (((dag_record_t *)packet->buffer)->flags.rxerror == 1) {
691                packet->payload = NULL;
692        } else {
693                packet->payload = (char*)packet->buffer + erf_get_framing_length(packet);
694        }
695        return rlen;
696}
697
698static int rtclient_read(libtrace_t *libtrace, void *buffer, size_t len) {
699        int numbytes;
700
701        while(1) {
702#ifndef MSG_NOSIGNAL
703#  define MSG_NOSIGNAL 0
704#endif
705                if ((numbytes = recv(INPUT.fd,
706                                                buffer,
707                                                len,
708                                                MSG_NOSIGNAL)) == -1) {
709                        if (errno == EINTR) {
710                                /*ignore EINTR in case
711                                 *a caller is using signals
712                                 */
713                                continue;
714                        }
715                        trace_set_err(libtrace,errno,"recv(%s)",
716                                        libtrace->uridata);
717                        return -1;
718                }
719                break;
720
721        }
722        return numbytes;
723}
724
725static int rtclient_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
726        int numbytes = 0;
727        char buf[RP_BUFSIZE];
728        int read_required = 0;
729       
730        void *buffer = 0;
731
732        if (packet->buf_control == TRACE_CTRL_EXTERNAL || !packet->buffer) {
733                packet->buf_control = TRACE_CTRL_PACKET;
734                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
735        }
736
737        buffer = packet->buffer;
738        packet->header = packet->buffer;
739       
740        packet->type = RT_DATA_ERF;
741
742       
743        do {
744                libtrace_packet_status_t status;
745                int size;
746                if (tracefifo_out_available(libtrace->fifo) == 0 
747                                || read_required) {
748                        if ((numbytes = rtclient_read(
749                                        libtrace,buf,RP_BUFSIZE))<=0) {
750                                return numbytes;
751                        }
752                        tracefifo_write(libtrace->fifo,buf,numbytes);
753                        read_required = 0;
754                }
755                /* Read status byte */
756                if (tracefifo_out_read(libtrace->fifo,
757                                &status, sizeof(uint32_t)) == 0) {
758                        read_required = 1;
759                        continue;
760                }
761                tracefifo_out_update(libtrace->fifo,sizeof(uint32_t));
762                /* Read in packet size */
763                if (tracefifo_out_read(libtrace->fifo,
764                                &size, sizeof(uint32_t)) == 0) {
765                        tracefifo_out_reset(libtrace->fifo);
766                        read_required = 1;
767                        continue;
768                }
769                tracefifo_out_update(libtrace->fifo, sizeof(uint32_t));
770               
771                if (status.type == 2 /* RT_MSG */) {
772                        /* Need to skip this packet as it is a message packet */
773                        tracefifo_out_update(libtrace->fifo, size);
774                        tracefifo_ack_update(libtrace->fifo, size + 
775                                        sizeof(uint32_t) + 
776                                        sizeof(libtrace_packet_status_t));
777                        continue;
778                }
779               
780                /* read in the full packet */
781                if ((numbytes = tracefifo_out_read(libtrace->fifo, 
782                                                buffer, size)) == 0) {
783                        tracefifo_out_reset(libtrace->fifo);
784                        read_required = 1;
785                        continue;
786                }
787
788                /* got in our whole packet, so... */
789                tracefifo_out_update(libtrace->fifo,size);
790
791                tracefifo_ack_update(libtrace->fifo,size + 
792                                sizeof(uint32_t) + 
793                                sizeof(libtrace_packet_status_t));
794
795                if (((dag_record_t *)buffer)->flags.rxerror == 1) {
796                        packet->payload = NULL;
797                } else {
798                        packet->payload = (char*)packet->buffer + erf_get_framing_length(packet);
799                }
800                return numbytes;
801        } while(1);
802}
803
804static int erf_dump_packet(libtrace_out_t *libtrace,
805                dag_record_t *erfptr, int pad, void *buffer, size_t size) {
806        int numbytes = 0;
807        assert(size<=65536);
808
809        if ((numbytes = libtrace_io_write(OUTPUT.file, erfptr, dag_record_size + pad)) != dag_record_size+pad) {
810                trace_set_err_out(libtrace,errno,
811                                "write(%s)",libtrace->uridata);
812                return -1;
813        }
814
815        if ((numbytes=libtrace_io_write(OUTPUT.file, buffer, size)) != (int)size) {
816                trace_set_err_out(libtrace,errno,
817                                "write(%s)",libtrace->uridata);
818                return -1;
819        }
820
821        return numbytes + pad + dag_record_size;
822}
823
824static int erf_start_output(libtrace_out_t *libtrace)
825{
826        OUTPUT.file = trace_open_file_out(libtrace,
827                        OPTIONS.erf.level,
828                        OPTIONS.erf.fileflag);
829        if (!OUTPUT.file) {
830                return -1;
831        }
832        return 0;
833}
834               
835static int erf_write_packet(libtrace_out_t *libtrace, 
836                const libtrace_packet_t *packet) 
837{
838        int numbytes = 0;
839        int pad = 0;
840        dag_record_t *dag_hdr = (dag_record_t *)packet->header;
841        void *payload = packet->payload;
842
843        assert(OUTPUT.file);
844
845        pad = erf_get_padding(packet);
846
847        /* If we've had an rxerror, we have no payload to write - fix rlen to
848         * be the correct length */
849        if (payload == NULL) {
850                dag_hdr->rlen = htons(dag_record_size + pad);
851        } 
852       
853        if (packet->trace->format == &erf 
854#if HAVE_DAG
855                        || packet->trace->format == &dag
856#endif
857                        ) {
858                numbytes = erf_dump_packet(libtrace,
859                                (dag_record_t *)packet->header,
860                                pad,
861                                payload,
862                                trace_get_capture_length(packet)
863                                );
864        } else {
865                dag_record_t erfhdr;
866                int type;
867                /* convert format - build up a new erf header */
868                /* Timestamp */
869                erfhdr.ts = trace_get_erf_timestamp(packet);
870                type=libtrace_to_erf_type(trace_get_link_type(packet));
871                if (type==(char)-1) {
872                        trace_set_err_out(libtrace,TRACE_ERR_NO_CONVERSION,
873                                        "No erf type for packet");
874                        return -1;
875                }
876                erfhdr.type = type;
877                /* Flags. Can't do this */
878                memset(&erfhdr.flags,1,sizeof(erfhdr.flags));
879                /* Packet length (rlen includes format overhead) */
880                erfhdr.rlen = trace_get_capture_length(packet) 
881                        + erf_get_framing_length(packet);
882                /* loss counter. Can't do this */
883                erfhdr.lctr = 0;
884                /* Wire length */
885                erfhdr.wlen = trace_get_wire_length(packet);
886
887                /* Write it out */
888                numbytes = erf_dump_packet(libtrace,
889                                &erfhdr,
890                                pad,
891                                payload,
892                                trace_get_capture_length(packet));
893        }
894        return numbytes;
895}
896
897static libtrace_linktype_t erf_get_link_type(const libtrace_packet_t *packet) {
898        dag_record_t *erfptr = 0;
899        erfptr = (dag_record_t *)packet->header;
900        return erf_type_to_libtrace(erfptr->type);
901}
902
903static libtrace_direction_t erf_get_direction(const libtrace_packet_t *packet) {
904        dag_record_t *erfptr = 0;
905        erfptr = (dag_record_t *)packet->header;
906        return erfptr->flags.iface;
907}
908
909static libtrace_direction_t erf_set_direction(libtrace_packet_t *packet, libtrace_direction_t direction) {
910        dag_record_t *erfptr = 0;
911        erfptr = (dag_record_t *)packet->header;
912        erfptr->flags.iface = direction;
913        return erfptr->flags.iface;
914}
915
916static uint64_t erf_get_erf_timestamp(const libtrace_packet_t *packet) {
917        dag_record_t *erfptr = 0;
918        erfptr = (dag_record_t *)packet->header;
919        return erfptr->ts;
920}
921
922static int erf_get_capture_length(const libtrace_packet_t *packet) {
923        dag_record_t *erfptr = 0;
924        erfptr = (dag_record_t *)packet->header;
925        return (ntohs(erfptr->rlen) - erf_get_framing_length(packet));
926}
927
928static int erf_get_wire_length(const libtrace_packet_t *packet) {
929        dag_record_t *erfptr = 0;
930        erfptr = (dag_record_t *)packet->header;
931        return ntohs(erfptr->wlen);
932}
933
934static size_t erf_set_capture_length(libtrace_packet_t *packet, size_t size) {
935        dag_record_t *erfptr = 0;
936        assert(packet);
937        if(size  > trace_get_capture_length(packet)) {
938                /* can't make a packet larger */
939                return trace_get_capture_length(packet);
940        }
941        erfptr = (dag_record_t *)packet->header;
942        erfptr->rlen = htons(size + erf_get_framing_length(packet));
943        return trace_get_capture_length(packet);
944}
945
946static int rtclient_get_fd(const libtrace_t *libtrace) {
947        return INPUT.fd;
948}
949
950#ifdef HAVE_DAG
951libtrace_eventobj_t trace_event_dag(libtrace_t *trace, libtrace_packet_t *packet) {
952        libtrace_eventobj_t event = {0,0,0.0,0};
953        int dag_fd;
954        int data;
955
956        if (trace->format->get_fd) {
957                dag_fd = trace->format->get_fd(trace);
958        } else {
959                dag_fd = 0;
960        }
961       
962        data = dag_read(trace, DAGF_NONBLOCK);
963
964        if (data > 0) {
965                event.size = trace_read_packet(trace,packet);
966                event.type = TRACE_EVENT_PACKET;
967                return event;
968        }
969        event.type = TRACE_EVENT_SLEEP;
970        event.seconds = 0.0001;
971        return event;
972}
973#endif
974
975#if HAVE_DAG
976static void dag_help() {
977        printf("dag format module: $Revision$\n");
978        printf("Supported input URIs:\n");
979        printf("\tdag:/dev/dagn\n");
980        printf("\n");
981        printf("\te.g.: dag:/dev/dag0\n");
982        printf("\n");
983        printf("Supported output URIs:\n");
984        printf("\tnone\n");
985        printf("\n");
986}
987#endif
988
989static void erf_help() {
990        printf("erf format module: $Revision$\n");
991        printf("Supported input URIs:\n");
992        printf("\terf:/path/to/file\t(uncompressed)\n");
993        printf("\terf:/path/to/file.gz\t(gzip-compressed)\n");
994        printf("\terf:-\t(stdin, either compressed or not)\n");
995        printf("\terf:/path/to/socket\n");
996        printf("\n");
997        printf("\te.g.: erf:/tmp/trace\n");
998        printf("\n");
999        printf("Supported output URIs:\n");
1000        printf("\terf:path/to/file\t(uncompressed)\n");
1001        printf("\terf:/path/to/file.gz\t(gzip-compressed)\n");
1002        printf("\terf:-\t(stdout, either compressed or not)\n");
1003        printf("\n");
1004        printf("\te.g.: erf:/tmp/trace\n");
1005        printf("\n");
1006        printf("Supported output options:\n");
1007        printf("\t-z\tSpecify the gzip compression, ranging from 0 (uncompressed) to 9 - defaults to 1\n");
1008        printf("\n");
1009
1010       
1011}
1012
1013static void rtclient_help() {
1014        printf("rtclient format module: $Revision$\n");
1015        printf("DEPRECATED - use rt module instead\n");
1016        printf("Supported input URIs:\n");
1017        printf("\trtclient:host:port\n");
1018        printf("\n");
1019        printf("\te.g.:rtclient:localhost:3435\n");
1020        printf("\n");
1021        printf("Supported output URIs:\n");
1022        printf("\tnone\n");
1023        printf("\n");
1024}       
1025
1026static struct libtrace_format_t erf = {
1027        "erf",
1028        "$Id$",
1029        TRACE_FORMAT_ERF,
1030        erf_init_input,                 /* init_input */       
1031        NULL,                           /* config_input */
1032        erf_start_input,                /* start_input */
1033        NULL,                           /* pause_input */
1034        erf_init_output,                /* init_output */
1035        erf_config_output,              /* config_output */
1036        erf_start_output,               /* start_output */
1037        erf_fin_input,                  /* fin_input */
1038        erf_fin_output,                 /* fin_output */
1039        erf_read_packet,                /* read_packet */
1040        NULL,                           /* fin_packet */
1041        erf_write_packet,               /* write_packet */
1042        erf_get_link_type,              /* get_link_type */
1043        erf_get_direction,              /* get_direction */
1044        erf_set_direction,              /* set_direction */
1045        erf_get_erf_timestamp,          /* get_erf_timestamp */
1046        NULL,                           /* get_timeval */
1047        NULL,                           /* get_seconds */
1048        erf_seek_erf,                   /* seek_erf */
1049        NULL,                           /* seek_timeval */
1050        NULL,                           /* seek_seconds */
1051        erf_get_capture_length,         /* get_capture_length */
1052        erf_get_wire_length,            /* get_wire_length */
1053        erf_get_framing_length,         /* get_framing_length */
1054        erf_set_capture_length,         /* set_capture_length */
1055        NULL,                           /* get_fd */
1056        trace_event_trace,              /* trace_event */
1057        erf_help,                       /* help */
1058        NULL                            /* next pointer */
1059};
1060
1061#ifdef HAVE_DAG
1062static struct libtrace_format_t dag = {
1063        "dag",
1064        "$Id$",
1065        TRACE_FORMAT_ERF,
1066        dag_init_input,                 /* init_input */       
1067        NULL,                           /* config_input */
1068        dag_start_input,                /* start_input */
1069        dag_pause_input,                /* pause_input */
1070        NULL,                           /* init_output */
1071        NULL,                           /* config_output */
1072        NULL,                           /* start_output */
1073        dag_fin_input,                  /* fin_input */
1074        NULL,                           /* fin_output */
1075        dag_read_packet,                /* read_packet */
1076        NULL,                           /* fin_packet */
1077        NULL,                           /* write_packet */
1078        erf_get_link_type,              /* get_link_type */
1079        erf_get_direction,              /* get_direction */
1080        erf_set_direction,              /* set_direction */
1081        erf_get_erf_timestamp,          /* get_erf_timestamp */
1082        NULL,                           /* get_timeval */
1083        NULL,                           /* get_seconds */
1084        NULL,                           /* seek_erf */
1085        NULL,                           /* seek_timeval */
1086        NULL,                           /* seek_seconds */
1087        erf_get_capture_length,         /* get_capture_length */
1088        erf_get_wire_length,            /* get_wire_length */
1089        erf_get_framing_length,         /* get_framing_length */
1090        erf_set_capture_length,         /* set_capture_length */
1091        NULL,                           /* get_fd */
1092        trace_event_dag,                /* trace_event */
1093        dag_help,                       /* help */
1094        NULL                            /* next pointer */
1095};
1096#endif
1097
1098static struct libtrace_format_t rtclient = {
1099        "rtclient",
1100        "$Id$",
1101        TRACE_FORMAT_ERF,
1102        rtclient_init_input,            /* init_input */       
1103        NULL,                           /* config_input */
1104        rtclient_start_input,           /* start_input */
1105        rtclient_pause_input,           /* pause_input */
1106        NULL,                           /* init_output */
1107        NULL,                           /* config_output */
1108        NULL,                           /* start_output */
1109        rtclient_fin_input,             /* fin_input */
1110        NULL,                           /* fin_output */
1111        rtclient_read_packet,           /* read_packet */
1112        NULL,                           /* fin_packet */
1113        NULL,                           /* write_packet */
1114        erf_get_link_type,              /* get_link_type */
1115        erf_get_direction,              /* get_direction */
1116        erf_set_direction,              /* set_direction */
1117        erf_get_erf_timestamp,          /* get_erf_timestamp */
1118        NULL,                           /* get_timeval */
1119        NULL,                           /* get_seconds */
1120        NULL,                           /* seek_erf */
1121        NULL,                           /* seek_timeval */
1122        NULL,                           /* seek_seconds */
1123        erf_get_capture_length,         /* get_capture_length */
1124        erf_get_wire_length,            /* get_wire_length */
1125        erf_get_framing_length,         /* get_framing_length */
1126        erf_set_capture_length,         /* set_capture_length */
1127        rtclient_get_fd,                /* get_fd */
1128        trace_event_device,             /* trace_event */
1129        rtclient_help,                  /* help */
1130        NULL                            /* next pointer */
1131};
1132
1133void CONSTRUCTOR erf_constructor() {
1134        register_format(&rtclient);
1135        register_format(&erf);
1136#ifdef HAVE_DAG
1137        register_format(&dag);
1138#endif
1139}
Note: See TracBrowser for help on using the repository browser.