source: lib/format_erf.c @ cd7eec7

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivegetfragoffhelplibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since cd7eec7 was cd7eec7, checked in by Shane Alcock <salcock@…>, 15 years ago

Added a new format for reading and writing DUCK packets
Added corresponding test cases for DUCK
Removed references to RT_DUCK_* from format_rt
Added a configuration option for meta-data frequency (used solely for DUCK frequency at the moment) and updated other formats to ignore the option

  • Property mode set to 100644
File size: 28.7 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2004 The University of Waikato, Hamilton, New Zealand.
5 * Authors: Daniel Lawson
6 *          Perry Lorier
7 *         
8 * All rights reserved.
9 *
10 * This code has been developed by the University of Waikato WAND
11 * research group. For further information please see http://www.wand.net.nz/
12 *
13 * libtrace is free software; you can redistribute it and/or modify
14 * it under the terms of the GNU General Public License as published by
15 * the Free Software Foundation; either version 2 of the License, or
16 * (at your option) any later version.
17 *
18 * libtrace is distributed in the hope that it will be useful,
19 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21 * GNU General Public License for more details.
22 *
23 * You should have received a copy of the GNU General Public License
24 * along with libtrace; if not, write to the Free Software
25 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26 *
27 * $Id$
28 *
29 */
30#define _GNU_SOURCE
31
32#include "config.h"
33#include "common.h"
34#include "libtrace.h"
35#include "libtrace_int.h"
36#include "format_helper.h"
37#include "parse_cmd.h"
38
39#include <assert.h>
40#include <errno.h>
41#include <fcntl.h>
42#include <stdio.h>
43#include <string.h>
44#include <stdlib.h>
45
46#if HAVE_DAG
47#include <sys/mman.h>
48#endif
49
50#ifdef WIN32
51#  include <io.h>
52#  include <share.h>
53#  define PATH_MAX _MAX_PATH
54#  define snprintf sprintf_s
55#else
56#  include <netdb.h>
57#  ifndef PATH_MAX
58#       define PATH_MAX 4096
59#  endif
60#endif
61
62
63#define COLLECTOR_PORT 3435
64
65static struct libtrace_format_t erf;
66static struct libtrace_format_t rtclient;
67#if HAVE_DAG
68static struct libtrace_format_t dag;
69#endif
70
71#define DATA(x) ((struct erf_format_data_t *)x->format_data)
72#define DATAOUT(x) ((struct erf_format_data_out_t *)x->format_data)
73
74#define CONNINFO DATA(libtrace)->conn_info
75#define INPUT DATA(libtrace)->input
76#define OUTPUT DATAOUT(libtrace)->output
77#if HAVE_DAG
78#define DAG DATA(libtrace)->dag
79#define DUCK DATA(libtrace)->duck
80#endif
81#define OPTIONS DATAOUT(libtrace)->options
82struct erf_format_data_t {
83        union {
84                struct {
85                        char *hostname;
86                        short port;
87                } rt;
88        } conn_info;
89       
90        union {
91                int fd;
92                libtrace_io_t *file;
93        } input;
94
95        struct {
96                enum { INDEX_UNKNOWN=0, INDEX_NONE, INDEX_EXISTS } exists;
97                libtrace_io_t *index;
98                off_t index_len;
99        } seek;
100
101#if HAVE_DAG
102        struct {
103                uint32_t last_duck;     
104                uint32_t duck_freq;
105                uint32_t last_pkt;
106                libtrace_t *dummy_duck;
107        } duck;
108       
109        struct {
110                void *buf; 
111                unsigned bottom;
112                unsigned top;
113                unsigned diff;
114                unsigned curr;
115                unsigned offset;
116        } dag;
117#endif
118};
119
120struct erf_format_data_out_t {
121        union {
122                struct {
123                        char *hostname;
124                        short port;
125                } rt;
126                char *path;
127        } conn_info;
128
129        union {
130                struct {
131                        int level;
132                        int fileflag;
133                } erf;
134               
135        } options;
136       
137        union {
138                int fd;
139                struct rtserver_t * rtserver;
140                libtrace_io_t *file;
141        } output;
142};
143
144/** Structure holding status information for a packet */
145typedef struct libtrace_packet_status {
146        uint8_t type;
147        uint8_t reserved;
148        uint16_t message;
149} libtrace_packet_status_t;
150
151typedef struct erf_index_t {
152        uint64_t timestamp;
153        uint64_t offset; 
154} erf_index_t;
155
156#ifdef HAVE_DAG
157static int dag_init_input(libtrace_t *libtrace) {
158        struct stat buf;
159
160        libtrace->format_data = (struct erf_format_data_t *)
161                malloc(sizeof(struct erf_format_data_t));
162        if (stat(libtrace->uridata, &buf) == -1) {
163                trace_set_err(libtrace,errno,"stat(%s)",libtrace->uridata);
164                return -1;
165        } 
166        if (S_ISCHR(buf.st_mode)) {
167                /* DEVICE */
168                if((INPUT.fd = dag_open(libtrace->uridata)) < 0) {
169                        trace_set_err(libtrace,errno,"Cannot open DAG %s",
170                                        libtrace->uridata);
171                        return -1;
172                }
173                if((DAG.buf = (void *)dag_mmap(INPUT.fd)) == MAP_FAILED) {
174                        trace_set_err(libtrace,errno,"Cannot mmap DAG %s",
175                                        libtrace->uridata);
176                        return -1;
177                }
178        } else {
179                trace_set_err(libtrace,errno,"Not a valid dag device: %s",
180                                libtrace->uridata);
181                return -1;
182        }
183
184        DUCK.last_duck = 0;
185        DUCK.duck_freq = 0; 
186        DUCK.last_pkt = 0;
187        DUCK.dummy_duck = NULL;
188       
189        return 0;
190}
191
192static int dag_config_input(libtrace_t *libtrace, trace_option_t option,
193                                void *data) {
194        switch(option) {
195                case TRACE_META_FREQ:
196                        DUCK.duck_freq = *(int *)data;
197                        return 0;
198                case TRACE_OPTION_SNAPLEN:
199                        /* Surely we can set this?? Fall through for now*/
200               
201                case TRACE_OPTION_PROMISC:
202                        /* DAG already operates in a promisc fashion */
203
204                case TRACE_OPTION_FILTER:
205
206                default:
207                        trace_set_err(libtrace, TRACE_ERR_UNKNOWN_OPTION,
208                                        "Unknown or unsupported option: %i",
209                                        option);
210                        return -1;
211        }
212        assert (0);
213}
214#endif
215
216/* Dag erf ether packets have a 2 byte padding before the packet
217 * so that the ip header is aligned on a 32 bit boundary.
218 */
219static int erf_get_padding(const libtrace_packet_t *packet)
220{
221        dag_record_t *erfptr = (dag_record_t *)packet->header;
222        switch(erfptr->type) {
223                case TYPE_ETH:          return 2;
224                default:                return 0;
225        }
226}
227
228static int erf_get_framing_length(const libtrace_packet_t *packet)
229{
230        return dag_record_size + erf_get_padding(packet);
231}
232
233
234static int erf_init_input(libtrace_t *libtrace) 
235{
236        libtrace->format_data = malloc(sizeof(struct erf_format_data_t));
237       
238        INPUT.file = 0;
239
240        return 0; /* success */
241}
242
243static int erf_start_input(libtrace_t *libtrace)
244{
245        if (INPUT.file)
246                return 0; /* success */
247
248        INPUT.file = trace_open_file(libtrace);
249
250        if (!INPUT.file)
251                return -1;
252
253        return 0; /* success */
254}
255
256/* Binary search through the index to find the closest point before
257 * the packet.  Consider in future having a btree index perhaps?
258 */
259static int erf_fast_seek_start(libtrace_t *libtrace,uint64_t erfts)
260{
261        size_t max_off = DATA(libtrace)->seek.index_len/sizeof(erf_index_t);
262        size_t min_off = 0;
263        off_t current;
264        erf_index_t record;
265        do {
266                current=(max_off+min_off)>>2;
267
268                libtrace_io_seek(DATA(libtrace)->seek.index,
269                                current*sizeof(record),
270                                SEEK_SET);
271                libtrace_io_read(DATA(libtrace)->seek.index,
272                                &record,sizeof(record));
273                if (record.timestamp < erfts) {
274                        min_off=current;
275                }
276                if (record.timestamp > erfts) {
277                        max_off=current;
278                }
279                if (record.timestamp == erfts)
280                        break;
281        } while(min_off<max_off);
282
283        /* If we've passed it, seek backwards.  This loop shouldn't
284         * execute more than twice.
285         */
286        do {
287                libtrace_io_seek(DATA(libtrace)->seek.index,
288                                current*sizeof(record),SEEK_SET);
289                libtrace_io_read(DATA(libtrace)->seek.index,
290                                &record,sizeof(record));
291                current--;
292        } while(record.timestamp>erfts);
293
294        /* We've found our location in the trace, now use it. */
295        libtrace_io_seek(INPUT.file,record.offset,SEEK_SET);
296
297        return 0; /* success */
298}
299
300/* There is no index.  Seek through the entire trace from the start, nice
301 * and slowly.
302 */
303static int erf_slow_seek_start(libtrace_t *libtrace,uint64_t erfts)
304{
305        if (INPUT.file) {
306                libtrace_io_close(INPUT.file);
307        }
308        INPUT.file = trace_open_file(libtrace);
309        if (!INPUT.file)
310                return -1;
311        return 0;
312}
313
314static int erf_seek_erf(libtrace_t *libtrace,uint64_t erfts)
315{
316        libtrace_packet_t *packet;
317        off_t off = 0;
318
319        if (DATA(libtrace)->seek.exists==INDEX_UNKNOWN) {
320                char buffer[PATH_MAX];
321                snprintf(buffer,sizeof(buffer),"%s.idx",libtrace->uridata);
322                DATA(libtrace)->seek.index=libtrace_io_open(buffer,"r");
323                if (DATA(libtrace)->seek.index) {
324                        DATA(libtrace)->seek.exists=INDEX_EXISTS;
325                }
326                else {
327                        DATA(libtrace)->seek.exists=INDEX_NONE;
328                }
329        }
330
331        /* If theres an index, use it to find the nearest packet that isn't
332         * after the time we're looking for.  If there is no index we need
333         * to seek slowly through the trace from the beginning.  Sigh.
334         */
335        switch(DATA(libtrace)->seek.exists) {
336                case INDEX_EXISTS:
337                        erf_fast_seek_start(libtrace,erfts);
338                        break;
339                case INDEX_NONE:
340                        erf_slow_seek_start(libtrace,erfts);
341                        break;
342                case INDEX_UNKNOWN:
343                        assert(0);
344                        break;
345        }
346
347        /* Now seek forward looking for the correct timestamp */
348        packet=trace_create_packet();
349        do {
350                trace_read_packet(libtrace,packet);
351                if (trace_get_erf_timestamp(packet)==erfts)
352                        break;
353                off=libtrace_io_tell(INPUT.file);
354        } while(trace_get_erf_timestamp(packet)<erfts);
355
356        libtrace_io_seek(INPUT.file,off,SEEK_SET);
357
358        return 0;
359}
360
361static int rtclient_init_input(libtrace_t *libtrace) {
362        char *scan;
363        libtrace->format_data = malloc(sizeof(struct erf_format_data_t));
364
365        if (strlen(libtrace->uridata) == 0) {
366                CONNINFO.rt.hostname = 
367                        strdup("localhost");
368                CONNINFO.rt.port = 
369                        COLLECTOR_PORT;
370        } else {
371                if ((scan = strchr(libtrace->uridata,':')) == NULL) {
372                        CONNINFO.rt.hostname = 
373                                strdup(libtrace->uridata);
374                        CONNINFO.rt.port =
375                                COLLECTOR_PORT;
376                } else {
377                        CONNINFO.rt.hostname = 
378                                (char *)strndup(libtrace->uridata,
379                                                (scan - libtrace->uridata));
380                        CONNINFO.rt.port = 
381                                atoi(++scan);
382                }
383        }
384
385        return 0; /* success */
386}
387
388static int rtclient_start_input(libtrace_t *libtrace)
389{
390        struct hostent *he;
391        struct sockaddr_in remote;
392        if ((he=gethostbyname(CONNINFO.rt.hostname)) == NULL) { 
393                trace_set_err(libtrace,TRACE_ERR_INIT_FAILED,"failed to resolve %s",
394                                CONNINFO.rt.hostname);
395                return -1;
396        } 
397        if ((INPUT.fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
398                trace_set_err(libtrace,errno,"socket(AF_INET,SOCK_STREAM)");
399                return -1;
400        }
401
402        remote.sin_family = AF_INET;   
403        remote.sin_port = htons(CONNINFO.rt.port);
404        remote.sin_addr = *((struct in_addr *)he->h_addr);
405        memset(&(remote.sin_zero), 0, 8);
406
407        if (connect(INPUT.fd, (struct sockaddr *)&remote,
408                                sizeof(struct sockaddr)) == -1) {
409                trace_set_err(libtrace,errno,"connect(%s)",
410                                CONNINFO.rt.hostname);
411                return -1;
412        }
413        return 0; /* success */
414}
415
416static int rtclient_pause_input(libtrace_t *libtrace)
417{
418        close(INPUT.fd);
419        return 0; /* success */
420}
421
422static int erf_init_output(libtrace_out_t *libtrace) {
423        libtrace->format_data = malloc(sizeof(struct erf_format_data_out_t));
424
425        OPTIONS.erf.level = 0;
426        OPTIONS.erf.fileflag = O_CREAT | O_WRONLY;
427        OUTPUT.file = 0;
428
429        return 0;
430}
431
432static int erf_config_output(libtrace_out_t *libtrace, trace_option_output_t option,
433                void *value) {
434
435        switch (option) {
436                case TRACE_OPTION_OUTPUT_COMPRESS:
437                        OPTIONS.erf.level = *(int*)value;
438                        return 0;
439                case TRACE_OPTION_OUTPUT_FILEFLAGS:
440                        OPTIONS.erf.fileflag = *(int*)value;
441                        return 0;
442                default:
443                        /* Unknown option */
444                        trace_set_err_out(libtrace,TRACE_ERR_UNKNOWN_OPTION,
445                                        "Unknown option");
446                        return -1;
447        }
448}
449
450
451#ifdef HAVE_DAG
452static int dag_pause_input(libtrace_t *libtrace) {
453        dag_stop(INPUT.fd);
454        return 0; /* success */
455}
456
457static int dag_fin_input(libtrace_t *libtrace) {
458        /* dag pause input implicitly called to cleanup before this */
459        dag_close(INPUT.fd);
460        if (DUCK.dummy_duck)
461                trace_destroy_dead(DUCK.dummy_duck);
462        free(libtrace->format_data);
463        return 0; /* success */
464}
465#endif
466
467static int rtclient_fin_input(libtrace_t *libtrace) {
468        free(CONNINFO.rt.hostname);
469        close(INPUT.fd);
470        free(libtrace->format_data);
471        return 0;
472}
473
474static int erf_fin_input(libtrace_t *libtrace) {
475        libtrace_io_close(INPUT.file);
476        free(libtrace->format_data);
477        return 0;
478}
479
480static int erf_fin_output(libtrace_out_t *libtrace) {
481        libtrace_io_close(OUTPUT.file);
482        free(libtrace->format_data);
483        return 0;
484}
485 
486#if HAVE_DAG
487#if DAG_VERSION_2_4
488static int dag_get_duckinfo(libtrace_t *libtrace, 
489                                libtrace_packet_t *packet) {
490        dag_inf lt_dag_inf;
491       
492        if (packet->buf_control == TRACE_CTRL_EXTERNAL || 
493                        !packet->buffer) {
494                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
495                packet->buf_control = TRACE_CTRL_PACKET;
496                if (!packet->buffer) {
497                        trace_set_err(libtrace, errno,
498                                        "Cannot allocate packet buffer");
499                        return -1;
500                }
501        }
502       
503        packet->header = 0;
504        packet->payload = packet->buffer;
505       
506        if ((ioctl(INPUT.fd, DAG_IOINF, &lt_dag_inf) < 0)) {
507                trace_set_err(libtrace, errno,
508                                "Error using DAG_IOINF");
509                return -1;
510        }
511        if (!IsDUCK(&lt_dag_inf)) {
512                printf("WARNING: %s does not have modern clock support - No DUCK information will be gathered\n", libtrace->uridata);
513                return 0;
514        }
515
516        if ((ioctl(INPUT.fd, DAG_IOGETDUCK, (duck_inf *)packet->payload) 
517                                < 0)) {
518                trace_set_err(libtrace, errno, "Error using DAG_IOGETDUCK");
519                return -1;
520        }
521
522        packet->type = RT_DUCK_2_4;
523        packet->size = sizeof(duck_inf);
524        if (!DUCK.dummy_duck) 
525                DUCK.dummy_duck = trace_create_dead("duck:dummy");
526        packet->trace = DUCK.dummy_duck;
527        return packet->size;
528}       
529#else
530static int dag_get_duckinfo(libtrace_t *libtrace, 
531                                libtrace_packet_t *packet) {
532        daginf_t lt_dag_inf;
533       
534        if (packet->buf_control == TRACE_CTRL_EXTERNAL || 
535                        !packet->buffer) {
536                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
537                packet->buf_control = TRACE_CTRL_PACKET;
538                if (!packet->buffer) {
539                        trace_set_err(libtrace, errno,
540                                        "Cannot allocate packet buffer");
541                        return -1;
542                }
543        }
544       
545        packet->header = 0;
546        packet->payload = packet->buffer;
547       
548        if ((ioctl(INPUT.fd, DAGIOCINFO, &lt_dag_inf) < 0)) {
549                trace_set_err(libtrace, errno,
550                                "Error using DAGIOCINFO");
551                return -1;
552        }
553        if (!IsDUCK(&lt_dag_inf)) {
554                printf("WARNING: %s does not have modern clock support - No DUCK information will be gathered\n", libtrace->uridata);
555                return 0;
556        }
557
558        if ((ioctl(INPUT.fd, DAGIOCDUCK, (duckinf_t *)packet->payload) 
559                                < 0)) {
560                trace_set_err(libtrace, errno, "Error using DAGIOCDUCK");
561                return -1;
562        }
563
564        packet->type = RT_DUCK_2_5;
565        packet->size = sizeof(duckinf_t);
566        if (!DUCK.dummy_duck) 
567                DUCK.dummy_duck = trace_create_dead("rt:localhost:3434");
568        packet->trace = DUCK.dummy_duck;       
569        return packet->size;
570}       
571#endif
572
573static int dag_read(libtrace_t *libtrace, int block_flag) {
574
575        if (DAG.diff != 0) 
576                return DAG.diff;
577
578        DAG.bottom = DAG.top;
579
580        DAG.top = dag_offset(
581                        INPUT.fd,
582                        &(DAG.bottom),
583                        block_flag);
584
585        DAG.diff = DAG.top - DAG.bottom;
586
587        DAG.offset = 0;
588        return DAG.diff;
589}
590
591/* FIXME: dag_read_packet shouldn't update the pointers, dag_fin_packet
592 * should do that.
593 */
594static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
595        int numbytes;
596        int size;
597        struct timeval tv;
598        dag_record_t *erfptr;
599
600        if (DUCK.last_pkt - DUCK.last_duck > DUCK.duck_freq && 
601                        DUCK.duck_freq != 0) {
602                size = dag_get_duckinfo(libtrace, packet);
603                DUCK.last_duck = DUCK.last_pkt;
604                if (size != 0) {
605                        return size;
606                }
607                /* No DUCK support, so don't waste our time anymore */
608                DUCK.duck_freq = 0;
609        }
610       
611        if (packet->buf_control == TRACE_CTRL_PACKET) {
612                packet->buf_control = TRACE_CTRL_EXTERNAL;
613                free(packet->buffer);
614                packet->buffer = 0;
615        }
616       
617        packet->type = RT_DATA_ERF;
618       
619        if ((numbytes = dag_read(libtrace,0)) < 0) 
620                return numbytes;
621        assert(numbytes>0);
622
623        /*DAG always gives us whole packets */
624        erfptr = (dag_record_t *) ((char *)DAG.buf + 
625                        (DAG.bottom + DAG.offset));
626        size = ntohs(erfptr->rlen);
627
628        assert( size >= dag_record_size );
629        assert( size < LIBTRACE_PACKET_BUFSIZE);
630       
631        packet->buffer = erfptr;
632        packet->header = erfptr;
633        if (((dag_record_t *)packet->buffer)->flags.rxerror == 1) {
634                packet->payload = NULL;
635        } else {
636                packet->payload = (char*)packet->buffer
637                        + erf_get_framing_length(packet);
638        }
639
640        DAG.offset += size;
641        DAG.diff -= size;
642
643        packet->size = size;
644        tv = trace_get_timeval(packet);
645        DUCK.last_pkt = tv.tv_sec;
646       
647        return (size);
648}
649
650static int dag_start_input(libtrace_t *libtrace) {
651        if(dag_start(INPUT.fd) < 0) {
652                trace_set_err(libtrace,errno,"Cannot start DAG %s",
653                                libtrace->uridata);
654                return -1;
655        }
656        /* dags appear to have a bug where if you call dag_start after
657         * calling dag_stop, and at least one packet has arrived, bad things
658         * happen.  flush the memory hole
659         */
660        while(dag_read(libtrace,1)!=0)
661                DAG.diff=0;
662        return 0;
663}
664#endif
665
666static int erf_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
667        int numbytes;
668        int size;
669        void *buffer2 = packet->buffer;
670        int rlen;
671
672        if (!packet->buffer || packet->buf_control == TRACE_CTRL_EXTERNAL) {
673                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
674                packet->buf_control = TRACE_CTRL_PACKET;
675                if (!packet->buffer) {
676                        trace_set_err(libtrace, errno, 
677                                        "Cannot allocate memory");
678                        return -1;
679                }
680        }
681
682       
683       
684        packet->header = packet->buffer;
685        packet->type = RT_DATA_ERF;
686
687        if ((numbytes=libtrace_io_read(INPUT.file,
688                                        packet->buffer,
689                                        dag_record_size)) == -1) {
690                trace_set_err(libtrace,errno,"read(%s)",
691                                libtrace->uridata);
692                return -1;
693        }
694        if (numbytes == 0) {
695                return 0;
696        }
697
698        rlen = ntohs(((dag_record_t *)packet->buffer)->rlen);
699        buffer2 = (char*)packet->buffer + dag_record_size;
700        size = rlen - dag_record_size;
701
702        assert(size < LIBTRACE_PACKET_BUFSIZE && size >= dag_record_size);
703
704        /* Unknown/corrupt */
705        assert(((dag_record_t *)packet->buffer)->type < 10);
706       
707        /* read in the rest of the packet */
708        if ((numbytes=libtrace_io_read(INPUT.file,
709                                        buffer2,
710                                        size)) != size) {
711                trace_set_err(libtrace,errno, "read(%s)", libtrace->uridata);
712                return -1;
713        }
714        if (((dag_record_t *)packet->buffer)->flags.rxerror == 1) {
715                packet->payload = NULL;
716        } else {
717                packet->payload = (char*)packet->buffer + erf_get_framing_length(packet);
718        }
719        return rlen;
720}
721
722static int rtclient_read(libtrace_t *libtrace, void *buffer, size_t len) {
723        int numbytes;
724
725        while(1) {
726#ifndef MSG_NOSIGNAL
727#  define MSG_NOSIGNAL 0
728#endif
729                if ((numbytes = recv(INPUT.fd,
730                                                buffer,
731                                                len,
732                                                MSG_NOSIGNAL)) == -1) {
733                        if (errno == EINTR) {
734                                /*ignore EINTR in case
735                                 *a caller is using signals
736                                 */
737                                continue;
738                        }
739                        trace_set_err(libtrace,errno,"recv(%s)",
740                                        libtrace->uridata);
741                        return -1;
742                }
743                break;
744
745        }
746        return numbytes;
747}
748
749static int rtclient_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
750        int numbytes = 0;
751        char buf[RP_BUFSIZE];
752        int read_required = 0;
753       
754        void *buffer = 0;
755
756        if (packet->buf_control == TRACE_CTRL_EXTERNAL || !packet->buffer) {
757                packet->buf_control = TRACE_CTRL_PACKET;
758                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
759        }
760
761        buffer = packet->buffer;
762        packet->header = packet->buffer;
763       
764        packet->type = RT_DATA_ERF;
765
766       
767        do {
768                libtrace_packet_status_t status;
769                int size;
770                if (tracefifo_out_available(libtrace->fifo) == 0 
771                                || read_required) {
772                        if ((numbytes = rtclient_read(
773                                        libtrace,buf,RP_BUFSIZE))<=0) {
774                                return numbytes;
775                        }
776                        tracefifo_write(libtrace->fifo,buf,numbytes);
777                        read_required = 0;
778                }
779                /* Read status byte */
780                if (tracefifo_out_read(libtrace->fifo,
781                                &status, sizeof(uint32_t)) == 0) {
782                        read_required = 1;
783                        continue;
784                }
785                tracefifo_out_update(libtrace->fifo,sizeof(uint32_t));
786                /* Read in packet size */
787                if (tracefifo_out_read(libtrace->fifo,
788                                &size, sizeof(uint32_t)) == 0) {
789                        tracefifo_out_reset(libtrace->fifo);
790                        read_required = 1;
791                        continue;
792                }
793                tracefifo_out_update(libtrace->fifo, sizeof(uint32_t));
794               
795                if (status.type == 2 /* RT_MSG */) {
796                        /* Need to skip this packet as it is a message packet */
797                        tracefifo_out_update(libtrace->fifo, size);
798                        tracefifo_ack_update(libtrace->fifo, size + 
799                                        sizeof(uint32_t) + 
800                                        sizeof(libtrace_packet_status_t));
801                        continue;
802                }
803               
804                /* read in the full packet */
805                if ((numbytes = tracefifo_out_read(libtrace->fifo, 
806                                                buffer, size)) == 0) {
807                        tracefifo_out_reset(libtrace->fifo);
808                        read_required = 1;
809                        continue;
810                }
811
812                /* got in our whole packet, so... */
813                tracefifo_out_update(libtrace->fifo,size);
814
815                tracefifo_ack_update(libtrace->fifo,size + 
816                                sizeof(uint32_t) + 
817                                sizeof(libtrace_packet_status_t));
818
819                if (((dag_record_t *)buffer)->flags.rxerror == 1) {
820                        packet->payload = NULL;
821                } else {
822                        packet->payload = (char*)packet->buffer + erf_get_framing_length(packet);
823                }
824                return numbytes;
825        } while(1);
826}
827
828static int erf_dump_packet(libtrace_out_t *libtrace,
829                dag_record_t *erfptr, int pad, void *buffer, size_t size) {
830        int numbytes = 0;
831        assert(size<=65536);
832
833        if ((numbytes = libtrace_io_write(OUTPUT.file, erfptr, dag_record_size + pad)) != dag_record_size+pad) {
834                trace_set_err_out(libtrace,errno,
835                                "write(%s)",libtrace->uridata);
836                return -1;
837        }
838
839        if ((numbytes=libtrace_io_write(OUTPUT.file, buffer, size)) != (int)size) {
840                trace_set_err_out(libtrace,errno,
841                                "write(%s)",libtrace->uridata);
842                return -1;
843        }
844
845        return numbytes + pad + dag_record_size;
846}
847
848static int erf_start_output(libtrace_out_t *libtrace)
849{
850        OUTPUT.file = trace_open_file_out(libtrace,
851                        OPTIONS.erf.level,
852                        OPTIONS.erf.fileflag);
853        if (!OUTPUT.file) {
854                return -1;
855        }
856        return 0;
857}
858               
859static int erf_write_packet(libtrace_out_t *libtrace, 
860                const libtrace_packet_t *packet) 
861{
862        int numbytes = 0;
863        int pad = 0;
864        dag_record_t *dag_hdr = (dag_record_t *)packet->header;
865        void *payload = packet->payload;
866
867        assert(OUTPUT.file);
868
869        if (!packet->header) {
870                //trace_set_err_output(libtrace, TRACE_ERR_BAD_PACKET,
871                //              "Packet has no header - probably an RT packet");
872                return -1;
873        }
874       
875        pad = erf_get_padding(packet);
876
877        /* If we've had an rxerror, we have no payload to write - fix rlen to
878         * be the correct length */
879        if (payload == NULL) {
880                dag_hdr->rlen = htons(dag_record_size + pad);
881        } 
882       
883        if (packet->trace->format == &erf 
884#if HAVE_DAG
885                        || packet->trace->format == &dag
886#endif
887                        ) {
888                numbytes = erf_dump_packet(libtrace,
889                                (dag_record_t *)packet->header,
890                                pad,
891                                payload,
892                                trace_get_capture_length(packet)
893                                );
894        } else {
895                dag_record_t erfhdr;
896                int type;
897                /* convert format - build up a new erf header */
898                /* Timestamp */
899                erfhdr.ts = trace_get_erf_timestamp(packet);
900                type=libtrace_to_erf_type(trace_get_link_type(packet));
901                if (type==(char)-1) {
902                        trace_set_err_out(libtrace,TRACE_ERR_NO_CONVERSION,
903                                        "No erf type for packet");
904                        return -1;
905                }
906                erfhdr.type = type;
907                /* Flags. Can't do this */
908                memset(&erfhdr.flags,1,sizeof(erfhdr.flags));
909                /* Packet length (rlen includes format overhead) */
910                erfhdr.rlen = trace_get_capture_length(packet) 
911                        + erf_get_framing_length(packet);
912                /* loss counter. Can't do this */
913                erfhdr.lctr = 0;
914                /* Wire length */
915                erfhdr.wlen = trace_get_wire_length(packet);
916
917                /* Write it out */
918                numbytes = erf_dump_packet(libtrace,
919                                &erfhdr,
920                                pad,
921                                payload,
922                                trace_get_capture_length(packet));
923        }
924        return numbytes;
925}
926
927static libtrace_linktype_t erf_get_link_type(const libtrace_packet_t *packet) {
928        dag_record_t *erfptr = 0;
929        erfptr = (dag_record_t *)packet->header;
930        return erf_type_to_libtrace(erfptr->type);
931}
932
933static libtrace_direction_t erf_get_direction(const libtrace_packet_t *packet) {
934        dag_record_t *erfptr = 0;
935        erfptr = (dag_record_t *)packet->header;
936        return erfptr->flags.iface;
937}
938
939static libtrace_direction_t erf_set_direction(libtrace_packet_t *packet, libtrace_direction_t direction) {
940        dag_record_t *erfptr = 0;
941        erfptr = (dag_record_t *)packet->header;
942        erfptr->flags.iface = direction;
943        return erfptr->flags.iface;
944}
945
946static uint64_t erf_get_erf_timestamp(const libtrace_packet_t *packet) {
947        dag_record_t *erfptr = 0;
948        erfptr = (dag_record_t *)packet->header;
949        return erfptr->ts;
950}
951
952static int erf_get_capture_length(const libtrace_packet_t *packet) {
953        dag_record_t *erfptr = 0;
954        erfptr = (dag_record_t *)packet->header;
955        return (ntohs(erfptr->rlen) - erf_get_framing_length(packet));
956}
957
958static int erf_get_wire_length(const libtrace_packet_t *packet) {
959        dag_record_t *erfptr = 0;
960        erfptr = (dag_record_t *)packet->header;
961        return ntohs(erfptr->wlen);
962}
963
964static size_t erf_set_capture_length(libtrace_packet_t *packet, size_t size) {
965        dag_record_t *erfptr = 0;
966        assert(packet);
967        if(size  > trace_get_capture_length(packet)) {
968                /* can't make a packet larger */
969                return trace_get_capture_length(packet);
970        }
971        erfptr = (dag_record_t *)packet->header;
972        erfptr->rlen = htons(size + erf_get_framing_length(packet));
973        return trace_get_capture_length(packet);
974}
975
976static int rtclient_get_fd(const libtrace_t *libtrace) {
977        return INPUT.fd;
978}
979
980#ifdef HAVE_DAG
981libtrace_eventobj_t trace_event_dag(libtrace_t *trace, libtrace_packet_t *packet) {
982        libtrace_eventobj_t event = {0,0,0.0,0};
983        int dag_fd;
984        int data;
985
986        if (trace->format->get_fd) {
987                dag_fd = trace->format->get_fd(trace);
988        } else {
989                dag_fd = 0;
990        }
991       
992        data = dag_read(trace, DAGF_NONBLOCK);
993
994        if (data > 0) {
995                event.size = trace_read_packet(trace,packet);
996                event.type = TRACE_EVENT_PACKET;
997                return event;
998        }
999        event.type = TRACE_EVENT_SLEEP;
1000        event.seconds = 0.0001;
1001        return event;
1002}
1003#endif
1004
1005#if HAVE_DAG
1006static void dag_help() {
1007        printf("dag format module: $Revision$\n");
1008        printf("Supported input URIs:\n");
1009        printf("\tdag:/dev/dagn\n");
1010        printf("\n");
1011        printf("\te.g.: dag:/dev/dag0\n");
1012        printf("\n");
1013        printf("Supported output URIs:\n");
1014        printf("\tnone\n");
1015        printf("\n");
1016}
1017#endif
1018
1019static void erf_help() {
1020        printf("erf format module: $Revision$\n");
1021        printf("Supported input URIs:\n");
1022        printf("\terf:/path/to/file\t(uncompressed)\n");
1023        printf("\terf:/path/to/file.gz\t(gzip-compressed)\n");
1024        printf("\terf:-\t(stdin, either compressed or not)\n");
1025        printf("\terf:/path/to/socket\n");
1026        printf("\n");
1027        printf("\te.g.: erf:/tmp/trace\n");
1028        printf("\n");
1029        printf("Supported output URIs:\n");
1030        printf("\terf:path/to/file\t(uncompressed)\n");
1031        printf("\terf:/path/to/file.gz\t(gzip-compressed)\n");
1032        printf("\terf:-\t(stdout, either compressed or not)\n");
1033        printf("\n");
1034        printf("\te.g.: erf:/tmp/trace\n");
1035        printf("\n");
1036        printf("Supported output options:\n");
1037        printf("\t-z\tSpecify the gzip compression, ranging from 0 (uncompressed) to 9 - defaults to 1\n");
1038        printf("\n");
1039
1040       
1041}
1042
1043static void rtclient_help() {
1044        printf("rtclient format module: $Revision$\n");
1045        printf("DEPRECATED - use rt module instead\n");
1046        printf("Supported input URIs:\n");
1047        printf("\trtclient:host:port\n");
1048        printf("\n");
1049        printf("\te.g.:rtclient:localhost:3435\n");
1050        printf("\n");
1051        printf("Supported output URIs:\n");
1052        printf("\tnone\n");
1053        printf("\n");
1054}       
1055
1056static struct libtrace_format_t erf = {
1057        "erf",
1058        "$Id$",
1059        TRACE_FORMAT_ERF,
1060        erf_init_input,                 /* init_input */       
1061        NULL,                           /* config_input */
1062        erf_start_input,                /* start_input */
1063        NULL,                           /* pause_input */
1064        erf_init_output,                /* init_output */
1065        erf_config_output,              /* config_output */
1066        erf_start_output,               /* start_output */
1067        erf_fin_input,                  /* fin_input */
1068        erf_fin_output,                 /* fin_output */
1069        erf_read_packet,                /* read_packet */
1070        NULL,                           /* fin_packet */
1071        erf_write_packet,               /* write_packet */
1072        erf_get_link_type,              /* get_link_type */
1073        erf_get_direction,              /* get_direction */
1074        erf_set_direction,              /* set_direction */
1075        erf_get_erf_timestamp,          /* get_erf_timestamp */
1076        NULL,                           /* get_timeval */
1077        NULL,                           /* get_seconds */
1078        erf_seek_erf,                   /* seek_erf */
1079        NULL,                           /* seek_timeval */
1080        NULL,                           /* seek_seconds */
1081        erf_get_capture_length,         /* get_capture_length */
1082        erf_get_wire_length,            /* get_wire_length */
1083        erf_get_framing_length,         /* get_framing_length */
1084        erf_set_capture_length,         /* set_capture_length */
1085        NULL,                           /* get_fd */
1086        trace_event_trace,              /* trace_event */
1087        erf_help,                       /* help */
1088        NULL                            /* next pointer */
1089};
1090
1091#ifdef HAVE_DAG
1092static struct libtrace_format_t dag = {
1093        "dag",
1094        "$Id$",
1095        TRACE_FORMAT_ERF,
1096        dag_init_input,                 /* init_input */       
1097        dag_config_input,               /* config_input */
1098        dag_start_input,                /* start_input */
1099        dag_pause_input,                /* pause_input */
1100        NULL,                           /* init_output */
1101        NULL,                           /* config_output */
1102        NULL,                           /* start_output */
1103        dag_fin_input,                  /* fin_input */
1104        NULL,                           /* fin_output */
1105        dag_read_packet,                /* read_packet */
1106        NULL,                           /* fin_packet */
1107        NULL,                           /* write_packet */
1108        erf_get_link_type,              /* get_link_type */
1109        erf_get_direction,              /* get_direction */
1110        erf_set_direction,              /* set_direction */
1111        erf_get_erf_timestamp,          /* get_erf_timestamp */
1112        NULL,                           /* get_timeval */
1113        NULL,                           /* get_seconds */
1114        NULL,                           /* seek_erf */
1115        NULL,                           /* seek_timeval */
1116        NULL,                           /* seek_seconds */
1117        erf_get_capture_length,         /* get_capture_length */
1118        erf_get_wire_length,            /* get_wire_length */
1119        erf_get_framing_length,         /* get_framing_length */
1120        erf_set_capture_length,         /* set_capture_length */
1121        NULL,                           /* get_fd */
1122        trace_event_dag,                /* trace_event */
1123        dag_help,                       /* help */
1124        NULL                            /* next pointer */
1125};
1126#endif
1127
1128static struct libtrace_format_t rtclient = {
1129        "rtclient",
1130        "$Id$",
1131        TRACE_FORMAT_ERF,
1132        rtclient_init_input,            /* init_input */       
1133        NULL,                           /* config_input */
1134        rtclient_start_input,           /* start_input */
1135        rtclient_pause_input,           /* pause_input */
1136        NULL,                           /* init_output */
1137        NULL,                           /* config_output */
1138        NULL,                           /* start_output */
1139        rtclient_fin_input,             /* fin_input */
1140        NULL,                           /* fin_output */
1141        rtclient_read_packet,           /* read_packet */
1142        NULL,                           /* fin_packet */
1143        NULL,                           /* write_packet */
1144        erf_get_link_type,              /* get_link_type */
1145        erf_get_direction,              /* get_direction */
1146        erf_set_direction,              /* set_direction */
1147        erf_get_erf_timestamp,          /* get_erf_timestamp */
1148        NULL,                           /* get_timeval */
1149        NULL,                           /* get_seconds */
1150        NULL,                           /* seek_erf */
1151        NULL,                           /* seek_timeval */
1152        NULL,                           /* seek_seconds */
1153        erf_get_capture_length,         /* get_capture_length */
1154        erf_get_wire_length,            /* get_wire_length */
1155        erf_get_framing_length,         /* get_framing_length */
1156        erf_set_capture_length,         /* set_capture_length */
1157        rtclient_get_fd,                /* get_fd */
1158        trace_event_device,             /* trace_event */
1159        rtclient_help,                  /* help */
1160        NULL                            /* next pointer */
1161};
1162
1163void CONSTRUCTOR erf_constructor() {
1164        register_format(&rtclient);
1165        register_format(&erf);
1166#ifdef HAVE_DAG
1167        register_format(&dag);
1168#endif
1169}
Note: See TracBrowser for help on using the repository browser.