source: lib/format_erf.c @ 0e9b9f8

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivegetfragoffhelplibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 0e9b9f8 was 0e9b9f8, checked in by Shane Alcock <salcock@…>, 15 years ago

DAG 2.5 deprecated a few functions so decided to explicitly call the stream functions.

  • Property mode set to 100644
File size: 29.3 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2004 The University of Waikato, Hamilton, New Zealand.
5 * Authors: Daniel Lawson
6 *          Perry Lorier
7 *         
8 * All rights reserved.
9 *
10 * This code has been developed by the University of Waikato WAND
11 * research group. For further information please see http://www.wand.net.nz/
12 *
13 * libtrace is free software; you can redistribute it and/or modify
14 * it under the terms of the GNU General Public License as published by
15 * the Free Software Foundation; either version 2 of the License, or
16 * (at your option) any later version.
17 *
18 * libtrace is distributed in the hope that it will be useful,
19 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21 * GNU General Public License for more details.
22 *
23 * You should have received a copy of the GNU General Public License
24 * along with libtrace; if not, write to the Free Software
25 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26 *
27 * $Id$
28 *
29 */
30#define _GNU_SOURCE
31
32#include "config.h"
33#include "common.h"
34#include "libtrace.h"
35#include "libtrace_int.h"
36#include "format_helper.h"
37#include "parse_cmd.h"
38
39#include <assert.h>
40#include <errno.h>
41#include <fcntl.h>
42#include <stdio.h>
43#include <string.h>
44#include <stdlib.h>
45
46#if HAVE_DAG
47#include <sys/mman.h>
48#endif
49
50#ifdef WIN32
51#  include <io.h>
52#  include <share.h>
53#  define PATH_MAX _MAX_PATH
54#  define snprintf sprintf_s
55#else
56#  include <netdb.h>
57#  ifndef PATH_MAX
58#       define PATH_MAX 4096
59#  endif
60#  include <sys/ioctl.h>
61#endif
62
63
64#define COLLECTOR_PORT 3435
65
66static struct libtrace_format_t erf;
67static struct libtrace_format_t rtclient;
68#if HAVE_DAG
69static struct libtrace_format_t dag;
70#endif
71
72#define DATA(x) ((struct erf_format_data_t *)x->format_data)
73#define DATAOUT(x) ((struct erf_format_data_out_t *)x->format_data)
74
75#define CONNINFO DATA(libtrace)->conn_info
76#define INPUT DATA(libtrace)->input
77#define OUTPUT DATAOUT(libtrace)->output
78#if HAVE_DAG
79#define DAG DATA(libtrace)->dag
80#define DUCK DATA(libtrace)->duck
81#endif
82#define OPTIONS DATAOUT(libtrace)->options
83struct erf_format_data_t {
84        union {
85                struct {
86                        char *hostname;
87                        short port;
88                } rt;
89        } conn_info;
90       
91        union {
92                int fd;
93                libtrace_io_t *file;
94        } input;
95
96        struct {
97                enum { INDEX_UNKNOWN=0, INDEX_NONE, INDEX_EXISTS } exists;
98                libtrace_io_t *index;
99                off_t index_len;
100        } seek;
101
102#if HAVE_DAG
103        struct {
104                uint32_t last_duck;     
105                uint32_t duck_freq;
106                uint32_t last_pkt;
107                libtrace_t *dummy_duck;
108        } duck;
109       
110        struct {
111                void *buf; 
112                unsigned bottom;
113                unsigned top;
114                unsigned diff;
115                unsigned curr;
116                unsigned offset;
117                unsigned int dagstream;
118        } dag;
119#endif
120};
121
122struct erf_format_data_out_t {
123        union {
124                struct {
125                        char *hostname;
126                        short port;
127                } rt;
128                char *path;
129        } conn_info;
130
131        union {
132                struct {
133                        int level;
134                        int fileflag;
135                } erf;
136               
137        } options;
138       
139        union {
140                int fd;
141                struct rtserver_t * rtserver;
142                libtrace_io_t *file;
143        } output;
144};
145
146/** Structure holding status information for a packet */
147typedef struct libtrace_packet_status {
148        uint8_t type;
149        uint8_t reserved;
150        uint16_t message;
151} libtrace_packet_status_t;
152
153typedef struct erf_index_t {
154        uint64_t timestamp;
155        uint64_t offset; 
156} erf_index_t;
157
158#ifdef HAVE_DAG
159static int dag_init_input(libtrace_t *libtrace) {
160        struct stat buf;
161
162        libtrace->format_data = (struct erf_format_data_t *)
163                malloc(sizeof(struct erf_format_data_t));
164        if (stat(libtrace->uridata, &buf) == -1) {
165                trace_set_err(libtrace,errno,"stat(%s)",libtrace->uridata);
166                return -1;
167        } 
168
169        DAG.dagstream = 0;
170       
171        if (S_ISCHR(buf.st_mode)) {
172                /* DEVICE */
173                if((INPUT.fd = dag_open(libtrace->uridata)) < 0) {
174                        trace_set_err(libtrace,errno,"Cannot open DAG %s",
175                                        libtrace->uridata);
176                        return -1;
177                }
178                if((DAG.buf = (void *)dag_mmap(INPUT.fd)) == MAP_FAILED) {
179                        trace_set_err(libtrace,errno,"Cannot mmap DAG %s",
180                                        libtrace->uridata);
181                        return -1;
182                }
183        } else {
184                trace_set_err(libtrace,errno,"Not a valid dag device: %s",
185                                libtrace->uridata);
186                return -1;
187        }
188
189        DUCK.last_duck = 0;
190        DUCK.duck_freq = 0; 
191        DUCK.last_pkt = 0;
192        DUCK.dummy_duck = NULL;
193       
194        return 0;
195}
196
197static int dag_config_input(libtrace_t *libtrace, trace_option_t option,
198                                void *data) {
199        switch(option) {
200                case TRACE_META_FREQ:
201                        DUCK.duck_freq = *(int *)data;
202                        return 0;
203                case TRACE_OPTION_SNAPLEN:
204                        /* Surely we can set this?? Fall through for now*/
205               
206                case TRACE_OPTION_PROMISC:
207                        /* DAG already operates in a promisc fashion */
208
209                case TRACE_OPTION_FILTER:
210
211                default:
212                        trace_set_err(libtrace, TRACE_ERR_UNKNOWN_OPTION,
213                                        "Unknown or unsupported option: %i",
214                                        option);
215                        return -1;
216        }
217        assert (0);
218}
219#endif
220
221/* Dag erf ether packets have a 2 byte padding before the packet
222 * so that the ip header is aligned on a 32 bit boundary.
223 */
224static int erf_get_padding(const libtrace_packet_t *packet)
225{
226        dag_record_t *erfptr = (dag_record_t *)packet->header;
227        switch(erfptr->type) {
228                case TYPE_ETH:          return 2;
229                default:                return 0;
230        }
231}
232
233static int erf_get_framing_length(const libtrace_packet_t *packet)
234{
235        return dag_record_size + erf_get_padding(packet);
236}
237
238
239static int erf_init_input(libtrace_t *libtrace) 
240{
241        libtrace->format_data = malloc(sizeof(struct erf_format_data_t));
242       
243        INPUT.file = 0;
244
245        return 0; /* success */
246}
247
248static int erf_start_input(libtrace_t *libtrace)
249{
250        if (INPUT.file)
251                return 0; /* success */
252
253        INPUT.file = trace_open_file(libtrace);
254
255        if (!INPUT.file)
256                return -1;
257
258        return 0; /* success */
259}
260
261/* Binary search through the index to find the closest point before
262 * the packet.  Consider in future having a btree index perhaps?
263 */
264static int erf_fast_seek_start(libtrace_t *libtrace,uint64_t erfts)
265{
266        size_t max_off = DATA(libtrace)->seek.index_len/sizeof(erf_index_t);
267        size_t min_off = 0;
268        off_t current;
269        erf_index_t record;
270        do {
271                current=(max_off+min_off)>>2;
272
273                libtrace_io_seek(DATA(libtrace)->seek.index,
274                                current*sizeof(record),
275                                SEEK_SET);
276                libtrace_io_read(DATA(libtrace)->seek.index,
277                                &record,sizeof(record));
278                if (record.timestamp < erfts) {
279                        min_off=current;
280                }
281                if (record.timestamp > erfts) {
282                        max_off=current;
283                }
284                if (record.timestamp == erfts)
285                        break;
286        } while(min_off<max_off);
287
288        /* If we've passed it, seek backwards.  This loop shouldn't
289         * execute more than twice.
290         */
291        do {
292                libtrace_io_seek(DATA(libtrace)->seek.index,
293                                current*sizeof(record),SEEK_SET);
294                libtrace_io_read(DATA(libtrace)->seek.index,
295                                &record,sizeof(record));
296                current--;
297        } while(record.timestamp>erfts);
298
299        /* We've found our location in the trace, now use it. */
300        libtrace_io_seek(INPUT.file,record.offset,SEEK_SET);
301
302        return 0; /* success */
303}
304
305/* There is no index.  Seek through the entire trace from the start, nice
306 * and slowly.
307 */
308static int erf_slow_seek_start(libtrace_t *libtrace,uint64_t erfts)
309{
310        if (INPUT.file) {
311                libtrace_io_close(INPUT.file);
312        }
313        INPUT.file = trace_open_file(libtrace);
314        if (!INPUT.file)
315                return -1;
316        return 0;
317}
318
319static int erf_seek_erf(libtrace_t *libtrace,uint64_t erfts)
320{
321        libtrace_packet_t *packet;
322        off_t off = 0;
323
324        if (DATA(libtrace)->seek.exists==INDEX_UNKNOWN) {
325                char buffer[PATH_MAX];
326                snprintf(buffer,sizeof(buffer),"%s.idx",libtrace->uridata);
327                DATA(libtrace)->seek.index=libtrace_io_open(buffer,"rb");
328                if (DATA(libtrace)->seek.index) {
329                        DATA(libtrace)->seek.exists=INDEX_EXISTS;
330                }
331                else {
332                        DATA(libtrace)->seek.exists=INDEX_NONE;
333                }
334        }
335
336        /* If theres an index, use it to find the nearest packet that isn't
337         * after the time we're looking for.  If there is no index we need
338         * to seek slowly through the trace from the beginning.  Sigh.
339         */
340        switch(DATA(libtrace)->seek.exists) {
341                case INDEX_EXISTS:
342                        erf_fast_seek_start(libtrace,erfts);
343                        break;
344                case INDEX_NONE:
345                        erf_slow_seek_start(libtrace,erfts);
346                        break;
347                case INDEX_UNKNOWN:
348                        assert(0);
349                        break;
350        }
351
352        /* Now seek forward looking for the correct timestamp */
353        packet=trace_create_packet();
354        do {
355                trace_read_packet(libtrace,packet);
356                if (trace_get_erf_timestamp(packet)==erfts)
357                        break;
358                off=libtrace_io_tell(INPUT.file);
359        } while(trace_get_erf_timestamp(packet)<erfts);
360
361        libtrace_io_seek(INPUT.file,off,SEEK_SET);
362
363        return 0;
364}
365
366static int rtclient_init_input(libtrace_t *libtrace) {
367        char *scan;
368        libtrace->format_data = malloc(sizeof(struct erf_format_data_t));
369
370        if (strlen(libtrace->uridata) == 0) {
371                CONNINFO.rt.hostname = 
372                        strdup("localhost");
373                CONNINFO.rt.port = 
374                        COLLECTOR_PORT;
375        } else {
376                if ((scan = strchr(libtrace->uridata,':')) == NULL) {
377                        CONNINFO.rt.hostname = 
378                                strdup(libtrace->uridata);
379                        CONNINFO.rt.port =
380                                COLLECTOR_PORT;
381                } else {
382                        CONNINFO.rt.hostname = 
383                                (char *)strndup(libtrace->uridata,
384                                                (scan - libtrace->uridata));
385                        CONNINFO.rt.port = 
386                                atoi(++scan);
387                }
388        }
389
390        return 0; /* success */
391}
392
393static int rtclient_start_input(libtrace_t *libtrace)
394{
395        struct hostent *he;
396        struct sockaddr_in remote;
397        if ((he=gethostbyname(CONNINFO.rt.hostname)) == NULL) { 
398                trace_set_err(libtrace,TRACE_ERR_INIT_FAILED,"failed to resolve %s",
399                                CONNINFO.rt.hostname);
400                return -1;
401        } 
402        if ((INPUT.fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
403                trace_set_err(libtrace,errno,"socket(AF_INET,SOCK_STREAM)");
404                return -1;
405        }
406
407        remote.sin_family = AF_INET;   
408        remote.sin_port = htons(CONNINFO.rt.port);
409        remote.sin_addr = *((struct in_addr *)he->h_addr);
410        memset(&(remote.sin_zero), 0, 8);
411
412        if (connect(INPUT.fd, (struct sockaddr *)&remote,
413                                sizeof(struct sockaddr)) == -1) {
414                trace_set_err(libtrace,errno,"connect(%s)",
415                                CONNINFO.rt.hostname);
416                return -1;
417        }
418        return 0; /* success */
419}
420
421static int rtclient_pause_input(libtrace_t *libtrace)
422{
423        close(INPUT.fd);
424        return 0; /* success */
425}
426
427static int erf_init_output(libtrace_out_t *libtrace) {
428        libtrace->format_data = malloc(sizeof(struct erf_format_data_out_t));
429
430        OPTIONS.erf.level = 0;
431        OPTIONS.erf.fileflag = O_CREAT | O_WRONLY;
432        OUTPUT.file = 0;
433
434        return 0;
435}
436
437static int erf_config_output(libtrace_out_t *libtrace, trace_option_output_t option,
438                void *value) {
439
440        switch (option) {
441                case TRACE_OPTION_OUTPUT_COMPRESS:
442                        OPTIONS.erf.level = *(int*)value;
443                        return 0;
444                case TRACE_OPTION_OUTPUT_FILEFLAGS:
445                        OPTIONS.erf.fileflag = *(int*)value;
446                        return 0;
447                default:
448                        /* Unknown option */
449                        trace_set_err_out(libtrace,TRACE_ERR_UNKNOWN_OPTION,
450                                        "Unknown option");
451                        return -1;
452        }
453}
454
455
456#ifdef HAVE_DAG
457static int dag_pause_input(libtrace_t *libtrace) {
458#if DAG_VERSION_2_4
459        dag_stop(INPUT.fd);
460#else
461        if (dag_stop_stream(INPUT.fd, DAG.dagstream) < 0) {
462                trace_set_err(libtrace, errno, "Could not stop DAG stream");
463                return -1;
464        }
465        if (dag_detach_stream(INPUT.fd, DAG.dagstream) < 0) {
466                trace_set_err(libtrace, errno, "Could not detach DAG stream");
467                return -1;
468        }
469#endif
470        return 0; /* success */
471}
472
473static int dag_fin_input(libtrace_t *libtrace) {
474        /* dag pause input implicitly called to cleanup before this */
475       
476        dag_close(INPUT.fd);
477        if (DUCK.dummy_duck)
478                trace_destroy_dead(DUCK.dummy_duck);
479        free(libtrace->format_data);
480        return 0; /* success */
481}
482#endif
483
484static int rtclient_fin_input(libtrace_t *libtrace) {
485        free(CONNINFO.rt.hostname);
486        close(INPUT.fd);
487        free(libtrace->format_data);
488        return 0;
489}
490
491static int erf_fin_input(libtrace_t *libtrace) {
492        libtrace_io_close(INPUT.file);
493        free(libtrace->format_data);
494        return 0;
495}
496
497static int erf_fin_output(libtrace_out_t *libtrace) {
498        libtrace_io_close(OUTPUT.file);
499        free(libtrace->format_data);
500        return 0;
501}
502 
503#if HAVE_DAG
504#if DAG_VERSION_2_4
505static int dag_get_duckinfo(libtrace_t *libtrace, 
506                                libtrace_packet_t *packet) {
507        dag_inf lt_dag_inf;
508       
509        if (packet->buf_control == TRACE_CTRL_EXTERNAL || 
510                        !packet->buffer) {
511                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
512                packet->buf_control = TRACE_CTRL_PACKET;
513                if (!packet->buffer) {
514                        trace_set_err(libtrace, errno,
515                                        "Cannot allocate packet buffer");
516                        return -1;
517                }
518        }
519       
520        packet->header = 0;
521        packet->payload = packet->buffer;
522       
523        if ((ioctl(INPUT.fd, DAG_IOINF, &lt_dag_inf) < 0)) {
524                trace_set_err(libtrace, errno,
525                                "Error using DAG_IOINF");
526                return -1;
527        }
528        if (!IsDUCK(&lt_dag_inf)) {
529                printf("WARNING: %s does not have modern clock support - No DUCK information will be gathered\n", libtrace->uridata);
530                return 0;
531        }
532
533        if ((ioctl(INPUT.fd, DAG_IOGETDUCK, (duck_inf *)packet->payload) 
534                                < 0)) {
535                trace_set_err(libtrace, errno, "Error using DAG_IOGETDUCK");
536                return -1;
537        }
538
539        packet->type = RT_DUCK_2_4;
540        packet->size = sizeof(duck_inf);
541        if (!DUCK.dummy_duck) 
542                DUCK.dummy_duck = trace_create_dead("duck:dummy");
543        packet->trace = DUCK.dummy_duck;
544        return packet->size;
545}       
546#else
547static int dag_get_duckinfo(libtrace_t *libtrace, 
548                                libtrace_packet_t *packet) {
549        daginf_t lt_dag_inf;
550       
551        if (packet->buf_control == TRACE_CTRL_EXTERNAL || 
552                        !packet->buffer) {
553                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
554                packet->buf_control = TRACE_CTRL_PACKET;
555                if (!packet->buffer) {
556                        trace_set_err(libtrace, errno,
557                                        "Cannot allocate packet buffer");
558                        return -1;
559                }
560        }
561       
562        packet->header = 0;
563        packet->payload = packet->buffer;
564       
565        /* No need to check if we can get DUCK or not - we're modern
566         * enough */
567        if ((ioctl(INPUT.fd, DAGIOCDUCK, (duckinf_t *)packet->payload) 
568                                < 0)) {
569                trace_set_err(libtrace, errno, "Error using DAGIOCDUCK");
570                return -1;
571        }
572
573        packet->type = RT_DUCK_2_5;
574        packet->size = sizeof(duckinf_t);
575        if (!DUCK.dummy_duck) 
576                DUCK.dummy_duck = trace_create_dead("rt:localhost:3434");
577        packet->trace = DUCK.dummy_duck;       
578        return packet->size;
579}       
580#endif
581
582static int dag_read(libtrace_t *libtrace, int block_flag) {
583
584        if (DAG.diff != 0) 
585                return DAG.diff;
586
587        DAG.bottom = DAG.top;
588
589        DAG.top = dag_offset(
590                        INPUT.fd,
591                        &(DAG.bottom),
592                        block_flag);
593
594        DAG.diff = DAG.top - DAG.bottom;
595
596        DAG.offset = 0;
597        return DAG.diff;
598}
599
600/* FIXME: dag_read_packet shouldn't update the pointers, dag_fin_packet
601 * should do that.
602 */
603static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
604        int numbytes;
605        int size;
606        struct timeval tv;
607        dag_record_t *erfptr;
608
609        if (DUCK.last_pkt - DUCK.last_duck > DUCK.duck_freq && 
610                        DUCK.duck_freq != 0) {
611                size = dag_get_duckinfo(libtrace, packet);
612                DUCK.last_duck = DUCK.last_pkt;
613                if (size != 0) {
614                        return size;
615                }
616                /* No DUCK support, so don't waste our time anymore */
617                DUCK.duck_freq = 0;
618        }
619       
620        if (packet->buf_control == TRACE_CTRL_PACKET) {
621                packet->buf_control = TRACE_CTRL_EXTERNAL;
622                free(packet->buffer);
623                packet->buffer = 0;
624        }
625       
626        packet->type = RT_DATA_ERF;
627       
628        if ((numbytes = dag_read(libtrace,0)) < 0) 
629                return numbytes;
630        assert(numbytes>0);
631
632        /*DAG always gives us whole packets */
633        erfptr = (dag_record_t *) ((char *)DAG.buf + 
634                        (DAG.bottom + DAG.offset));
635        size = ntohs(erfptr->rlen);
636
637        assert( size >= dag_record_size );
638        assert( size < LIBTRACE_PACKET_BUFSIZE);
639       
640        packet->buffer = erfptr;
641        packet->header = erfptr;
642        if (((dag_record_t *)packet->buffer)->flags.rxerror == 1) {
643                packet->payload = NULL;
644        } else {
645                packet->payload = (char*)packet->buffer
646                        + erf_get_framing_length(packet);
647        }
648
649        DAG.offset += size;
650        DAG.diff -= size;
651
652        packet->size = size;
653        tv = trace_get_timeval(packet);
654        DUCK.last_pkt = tv.tv_sec;
655       
656        return (size);
657}
658
659static int dag_start_input(libtrace_t *libtrace) {
660#if DAG_VERSION_2_4
661        if(dag_start(INPUT.fd) < 0) {
662                trace_set_err(libtrace,errno,"Cannot start DAG %s",
663                                libtrace->uridata);
664                return -1;
665        }
666#else
667        if (dag_attach_stream(INPUT.fd, DAG.dagstream, 0, 0) < 0) {
668                trace_set_err(libtrace, errno, "Cannot attach DAG stream");
669                return -1;
670        }
671        if (dag_start_stream(INPUT.fd, DAG.dagstream) < 0) {
672                trace_set_err(libtrace, errno, "Cannot start DAG stream");
673                return -1;
674        }
675#endif
676        /* dags appear to have a bug where if you call dag_start after
677         * calling dag_stop, and at least one packet has arrived, bad things
678         * happen.  flush the memory hole
679         */
680        while(dag_read(libtrace,1)!=0)
681                DAG.diff=0;
682        return 0;
683}
684#endif
685
686static int erf_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
687        int numbytes;
688        int size;
689        void *buffer2 = packet->buffer;
690        int rlen;
691
692        if (!packet->buffer || packet->buf_control == TRACE_CTRL_EXTERNAL) {
693                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
694                packet->buf_control = TRACE_CTRL_PACKET;
695                if (!packet->buffer) {
696                        trace_set_err(libtrace, errno, 
697                                        "Cannot allocate memory");
698                        return -1;
699                }
700        }
701
702       
703       
704        packet->header = packet->buffer;
705        packet->type = RT_DATA_ERF;
706
707        if ((numbytes=libtrace_io_read(INPUT.file,
708                                        packet->buffer,
709                                        dag_record_size)) == -1) {
710                trace_set_err(libtrace,errno,"read(%s)",
711                                libtrace->uridata);
712                return -1;
713        }
714        if (numbytes == 0) {
715                return 0;
716        }
717
718        rlen = ntohs(((dag_record_t *)packet->buffer)->rlen);
719        buffer2 = (char*)packet->buffer + dag_record_size;
720        size = rlen - dag_record_size;
721
722        assert(size < LIBTRACE_PACKET_BUFSIZE && size >= dag_record_size);
723
724        /* Unknown/corrupt */
725        assert(((dag_record_t *)packet->buffer)->type < 10);
726       
727        /* read in the rest of the packet */
728        if ((numbytes=libtrace_io_read(INPUT.file,
729                                        buffer2,
730                                        size)) != size) {
731                if (numbytes==-1) {
732                        trace_set_err(libtrace,errno, "read(%s)", libtrace->uridata);
733                        return -1;
734                }
735                /* Failed to read the full packet?  must be EOF */
736                return 0;
737        }
738        if (((dag_record_t *)packet->buffer)->flags.rxerror == 1) {
739                packet->payload = NULL;
740        } else {
741                packet->payload = (char*)packet->buffer + erf_get_framing_length(packet);
742        }
743        return rlen;
744}
745
746static int rtclient_read(libtrace_t *libtrace, void *buffer, size_t len) {
747        int numbytes;
748
749        while(1) {
750#ifndef MSG_NOSIGNAL
751#  define MSG_NOSIGNAL 0
752#endif
753                if ((numbytes = recv(INPUT.fd,
754                                                buffer,
755                                                len,
756                                                MSG_NOSIGNAL)) == -1) {
757                        if (errno == EINTR) {
758                                /*ignore EINTR in case
759                                 *a caller is using signals
760                                 */
761                                continue;
762                        }
763                        trace_set_err(libtrace,errno,"recv(%s)",
764                                        libtrace->uridata);
765                        return -1;
766                }
767                break;
768
769        }
770        return numbytes;
771}
772
773static int rtclient_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
774        int numbytes = 0;
775        char buf[RP_BUFSIZE];
776        int read_required = 0;
777       
778        void *buffer = 0;
779
780        if (packet->buf_control == TRACE_CTRL_EXTERNAL || !packet->buffer) {
781                packet->buf_control = TRACE_CTRL_PACKET;
782                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
783        }
784
785        buffer = packet->buffer;
786        packet->header = packet->buffer;
787       
788        packet->type = RT_DATA_ERF;
789
790       
791        do {
792                libtrace_packet_status_t status;
793                int size;
794                if (tracefifo_out_available(libtrace->fifo) == 0 
795                                || read_required) {
796                        if ((numbytes = rtclient_read(
797                                        libtrace,buf,RP_BUFSIZE))<=0) {
798                                return numbytes;
799                        }
800                        tracefifo_write(libtrace->fifo,buf,numbytes);
801                        read_required = 0;
802                }
803                /* Read status byte */
804                if (tracefifo_out_read(libtrace->fifo,
805                                &status, sizeof(uint32_t)) == 0) {
806                        read_required = 1;
807                        continue;
808                }
809                tracefifo_out_update(libtrace->fifo,sizeof(uint32_t));
810                /* Read in packet size */
811                if (tracefifo_out_read(libtrace->fifo,
812                                &size, sizeof(uint32_t)) == 0) {
813                        tracefifo_out_reset(libtrace->fifo);
814                        read_required = 1;
815                        continue;
816                }
817                tracefifo_out_update(libtrace->fifo, sizeof(uint32_t));
818               
819                if (status.type == 2 /* RT_MSG */) {
820                        /* Need to skip this packet as it is a message packet */
821                        tracefifo_out_update(libtrace->fifo, size);
822                        tracefifo_ack_update(libtrace->fifo, size + 
823                                        sizeof(uint32_t) + 
824                                        sizeof(libtrace_packet_status_t));
825                        continue;
826                }
827               
828                /* read in the full packet */
829                if ((numbytes = tracefifo_out_read(libtrace->fifo, 
830                                                buffer, size)) == 0) {
831                        tracefifo_out_reset(libtrace->fifo);
832                        read_required = 1;
833                        continue;
834                }
835
836                /* got in our whole packet, so... */
837                tracefifo_out_update(libtrace->fifo,size);
838
839                tracefifo_ack_update(libtrace->fifo,size + 
840                                sizeof(uint32_t) + 
841                                sizeof(libtrace_packet_status_t));
842
843                if (((dag_record_t *)buffer)->flags.rxerror == 1) {
844                        packet->payload = NULL;
845                } else {
846                        packet->payload = (char*)packet->buffer + erf_get_framing_length(packet);
847                }
848                return numbytes;
849        } while(1);
850}
851
852static int erf_dump_packet(libtrace_out_t *libtrace,
853                dag_record_t *erfptr, int pad, void *buffer, size_t size) {
854        int numbytes = 0;
855        assert(size<=65536);
856
857        if ((numbytes = libtrace_io_write(OUTPUT.file, erfptr, dag_record_size + pad)) != dag_record_size+pad) {
858                trace_set_err_out(libtrace,errno,
859                                "write(%s)",libtrace->uridata);
860                return -1;
861        }
862
863        if ((numbytes=libtrace_io_write(OUTPUT.file, buffer, size)) != (int)size) {
864                trace_set_err_out(libtrace,errno,
865                                "write(%s)",libtrace->uridata);
866                return -1;
867        }
868
869        return numbytes + pad + dag_record_size;
870}
871
872static int erf_start_output(libtrace_out_t *libtrace)
873{
874        OUTPUT.file = trace_open_file_out(libtrace,
875                        OPTIONS.erf.level,
876                        OPTIONS.erf.fileflag);
877        if (!OUTPUT.file) {
878                return -1;
879        }
880        return 0;
881}
882               
883static int erf_write_packet(libtrace_out_t *libtrace, 
884                const libtrace_packet_t *packet) 
885{
886        int numbytes = 0;
887        int pad = 0;
888        dag_record_t *dag_hdr = (dag_record_t *)packet->header;
889        void *payload = packet->payload;
890
891        assert(OUTPUT.file);
892
893        if (!packet->header) {
894                //trace_set_err_output(libtrace, TRACE_ERR_BAD_PACKET,
895                //              "Packet has no header - probably an RT packet");
896                return -1;
897        }
898       
899        pad = erf_get_padding(packet);
900
901        /* If we've had an rxerror, we have no payload to write - fix rlen to
902         * be the correct length */
903        if (payload == NULL) {
904                dag_hdr->rlen = htons(dag_record_size + pad);
905        } 
906       
907        if (packet->trace->format == &erf 
908#if HAVE_DAG
909                        || packet->trace->format == &dag
910#endif
911                        ) {
912                numbytes = erf_dump_packet(libtrace,
913                                (dag_record_t *)packet->header,
914                                pad,
915                                payload,
916                                trace_get_capture_length(packet)
917                                );
918        } else {
919                dag_record_t erfhdr;
920                int type;
921                /* convert format - build up a new erf header */
922                /* Timestamp */
923                erfhdr.ts = trace_get_erf_timestamp(packet);
924                type=libtrace_to_erf_type(trace_get_link_type(packet));
925                if (type==(char)-1) {
926                        trace_set_err_out(libtrace,TRACE_ERR_NO_CONVERSION,
927                                        "No erf type for packet");
928                        return -1;
929                }
930                erfhdr.type = type;
931                /* Flags. Can't do this */
932                memset(&erfhdr.flags,1,sizeof(erfhdr.flags));
933                /* Packet length (rlen includes format overhead) */
934                erfhdr.rlen = trace_get_capture_length(packet) 
935                        + erf_get_framing_length(packet);
936                /* loss counter. Can't do this */
937                erfhdr.lctr = 0;
938                /* Wire length */
939                erfhdr.wlen = trace_get_wire_length(packet);
940
941                /* Write it out */
942                numbytes = erf_dump_packet(libtrace,
943                                &erfhdr,
944                                pad,
945                                payload,
946                                trace_get_capture_length(packet));
947        }
948        return numbytes;
949}
950
951static libtrace_linktype_t erf_get_link_type(const libtrace_packet_t *packet) {
952        dag_record_t *erfptr = 0;
953        erfptr = (dag_record_t *)packet->header;
954        return erf_type_to_libtrace(erfptr->type);
955}
956
957static libtrace_direction_t erf_get_direction(const libtrace_packet_t *packet) {
958        dag_record_t *erfptr = 0;
959        erfptr = (dag_record_t *)packet->header;
960        return erfptr->flags.iface;
961}
962
963static libtrace_direction_t erf_set_direction(libtrace_packet_t *packet, libtrace_direction_t direction) {
964        dag_record_t *erfptr = 0;
965        erfptr = (dag_record_t *)packet->header;
966        erfptr->flags.iface = direction;
967        return erfptr->flags.iface;
968}
969
970static uint64_t erf_get_erf_timestamp(const libtrace_packet_t *packet) {
971        dag_record_t *erfptr = 0;
972        erfptr = (dag_record_t *)packet->header;
973        return erfptr->ts;
974}
975
976static int erf_get_capture_length(const libtrace_packet_t *packet) {
977        dag_record_t *erfptr = 0;
978        erfptr = (dag_record_t *)packet->header;
979        return (ntohs(erfptr->rlen) - erf_get_framing_length(packet));
980}
981
982static int erf_get_wire_length(const libtrace_packet_t *packet) {
983        dag_record_t *erfptr = 0;
984        erfptr = (dag_record_t *)packet->header;
985        return ntohs(erfptr->wlen);
986}
987
988static size_t erf_set_capture_length(libtrace_packet_t *packet, size_t size) {
989        dag_record_t *erfptr = 0;
990        assert(packet);
991        if(size  > trace_get_capture_length(packet)) {
992                /* can't make a packet larger */
993                return trace_get_capture_length(packet);
994        }
995        erfptr = (dag_record_t *)packet->header;
996        erfptr->rlen = htons(size + erf_get_framing_length(packet));
997        return trace_get_capture_length(packet);
998}
999
1000static int rtclient_get_fd(const libtrace_t *libtrace) {
1001        return INPUT.fd;
1002}
1003
1004#ifdef HAVE_DAG
1005libtrace_eventobj_t trace_event_dag(libtrace_t *trace, libtrace_packet_t *packet) {
1006        libtrace_eventobj_t event = {0,0,0.0,0};
1007        int dag_fd;
1008        int data;
1009
1010        if (trace->format->get_fd) {
1011                dag_fd = trace->format->get_fd(trace);
1012        } else {
1013                dag_fd = 0;
1014        }
1015       
1016        data = dag_read(trace, DAGF_NONBLOCK);
1017
1018        if (data > 0) {
1019                event.size = trace_read_packet(trace,packet);
1020                event.type = TRACE_EVENT_PACKET;
1021                return event;
1022        }
1023        event.type = TRACE_EVENT_SLEEP;
1024        event.seconds = 0.0001;
1025        return event;
1026}
1027#endif
1028
1029#if HAVE_DAG
1030static void dag_help() {
1031        printf("dag format module: $Revision$\n");
1032        printf("Supported input URIs:\n");
1033        printf("\tdag:/dev/dagn\n");
1034        printf("\n");
1035        printf("\te.g.: dag:/dev/dag0\n");
1036        printf("\n");
1037        printf("Supported output URIs:\n");
1038        printf("\tnone\n");
1039        printf("\n");
1040}
1041#endif
1042
1043static void erf_help() {
1044        printf("erf format module: $Revision$\n");
1045        printf("Supported input URIs:\n");
1046        printf("\terf:/path/to/file\t(uncompressed)\n");
1047        printf("\terf:/path/to/file.gz\t(gzip-compressed)\n");
1048        printf("\terf:-\t(stdin, either compressed or not)\n");
1049        printf("\terf:/path/to/socket\n");
1050        printf("\n");
1051        printf("\te.g.: erf:/tmp/trace\n");
1052        printf("\n");
1053        printf("Supported output URIs:\n");
1054        printf("\terf:path/to/file\t(uncompressed)\n");
1055        printf("\terf:/path/to/file.gz\t(gzip-compressed)\n");
1056        printf("\terf:-\t(stdout, either compressed or not)\n");
1057        printf("\n");
1058        printf("\te.g.: erf:/tmp/trace\n");
1059        printf("\n");
1060        printf("Supported output options:\n");
1061        printf("\t-z\tSpecify the gzip compression, ranging from 0 (uncompressed) to 9 - defaults to 1\n");
1062        printf("\n");
1063
1064       
1065}
1066
1067static void rtclient_help() {
1068        printf("rtclient format module: $Revision$\n");
1069        printf("DEPRECATED - use rt module instead\n");
1070        printf("Supported input URIs:\n");
1071        printf("\trtclient:host:port\n");
1072        printf("\n");
1073        printf("\te.g.:rtclient:localhost:3435\n");
1074        printf("\n");
1075        printf("Supported output URIs:\n");
1076        printf("\tnone\n");
1077        printf("\n");
1078}       
1079
1080static struct libtrace_format_t erf = {
1081        "erf",
1082        "$Id$",
1083        TRACE_FORMAT_ERF,
1084        erf_init_input,                 /* init_input */       
1085        NULL,                           /* config_input */
1086        erf_start_input,                /* start_input */
1087        NULL,                           /* pause_input */
1088        erf_init_output,                /* init_output */
1089        erf_config_output,              /* config_output */
1090        erf_start_output,               /* start_output */
1091        erf_fin_input,                  /* fin_input */
1092        erf_fin_output,                 /* fin_output */
1093        erf_read_packet,                /* read_packet */
1094        NULL,                           /* fin_packet */
1095        erf_write_packet,               /* write_packet */
1096        erf_get_link_type,              /* get_link_type */
1097        erf_get_direction,              /* get_direction */
1098        erf_set_direction,              /* set_direction */
1099        erf_get_erf_timestamp,          /* get_erf_timestamp */
1100        NULL,                           /* get_timeval */
1101        NULL,                           /* get_seconds */
1102        erf_seek_erf,                   /* seek_erf */
1103        NULL,                           /* seek_timeval */
1104        NULL,                           /* seek_seconds */
1105        erf_get_capture_length,         /* get_capture_length */
1106        erf_get_wire_length,            /* get_wire_length */
1107        erf_get_framing_length,         /* get_framing_length */
1108        erf_set_capture_length,         /* set_capture_length */
1109        NULL,                           /* get_fd */
1110        trace_event_trace,              /* trace_event */
1111        erf_help,                       /* help */
1112        NULL                            /* next pointer */
1113};
1114
1115#ifdef HAVE_DAG
1116static struct libtrace_format_t dag = {
1117        "dag",
1118        "$Id$",
1119        TRACE_FORMAT_ERF,
1120        dag_init_input,                 /* init_input */       
1121        dag_config_input,               /* config_input */
1122        dag_start_input,                /* start_input */
1123        dag_pause_input,                /* pause_input */
1124        NULL,                           /* init_output */
1125        NULL,                           /* config_output */
1126        NULL,                           /* start_output */
1127        dag_fin_input,                  /* fin_input */
1128        NULL,                           /* fin_output */
1129        dag_read_packet,                /* read_packet */
1130        NULL,                           /* fin_packet */
1131        NULL,                           /* write_packet */
1132        erf_get_link_type,              /* get_link_type */
1133        erf_get_direction,              /* get_direction */
1134        erf_set_direction,              /* set_direction */
1135        erf_get_erf_timestamp,          /* get_erf_timestamp */
1136        NULL,                           /* get_timeval */
1137        NULL,                           /* get_seconds */
1138        NULL,                           /* seek_erf */
1139        NULL,                           /* seek_timeval */
1140        NULL,                           /* seek_seconds */
1141        erf_get_capture_length,         /* get_capture_length */
1142        erf_get_wire_length,            /* get_wire_length */
1143        erf_get_framing_length,         /* get_framing_length */
1144        erf_set_capture_length,         /* set_capture_length */
1145        NULL,                           /* get_fd */
1146        trace_event_dag,                /* trace_event */
1147        dag_help,                       /* help */
1148        NULL                            /* next pointer */
1149};
1150#endif
1151
1152static struct libtrace_format_t rtclient = {
1153        "rtclient",
1154        "$Id$",
1155        TRACE_FORMAT_ERF,
1156        rtclient_init_input,            /* init_input */       
1157        NULL,                           /* config_input */
1158        rtclient_start_input,           /* start_input */
1159        rtclient_pause_input,           /* pause_input */
1160        NULL,                           /* init_output */
1161        NULL,                           /* config_output */
1162        NULL,                           /* start_output */
1163        rtclient_fin_input,             /* fin_input */
1164        NULL,                           /* fin_output */
1165        rtclient_read_packet,           /* read_packet */
1166        NULL,                           /* fin_packet */
1167        NULL,                           /* write_packet */
1168        erf_get_link_type,              /* get_link_type */
1169        erf_get_direction,              /* get_direction */
1170        erf_set_direction,              /* set_direction */
1171        erf_get_erf_timestamp,          /* get_erf_timestamp */
1172        NULL,                           /* get_timeval */
1173        NULL,                           /* get_seconds */
1174        NULL,                           /* seek_erf */
1175        NULL,                           /* seek_timeval */
1176        NULL,                           /* seek_seconds */
1177        erf_get_capture_length,         /* get_capture_length */
1178        erf_get_wire_length,            /* get_wire_length */
1179        erf_get_framing_length,         /* get_framing_length */
1180        erf_set_capture_length,         /* set_capture_length */
1181        rtclient_get_fd,                /* get_fd */
1182        trace_event_device,             /* trace_event */
1183        rtclient_help,                  /* help */
1184        NULL                            /* next pointer */
1185};
1186
1187void CONSTRUCTOR erf_constructor() {
1188        register_format(&rtclient);
1189        register_format(&erf);
1190#ifdef HAVE_DAG
1191        register_format(&dag);
1192#endif
1193}
Note: See TracBrowser for help on using the repository browser.