source: lib/format_erf.c @ 9a36e6d

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivegetfragoffhelplibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 9a36e6d was 9a36e6d, checked in by Daniel Lawson <dlawson@…>, 16 years ago

updated erf code, adding in wag support

  • Property mode set to 100644
File size: 18.0 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2004 The University of Waikato, Hamilton, New Zealand.
5 * Authors: Daniel Lawson
6 *          Perry Lorier
7 *         
8 * All rights reserved.
9 *
10 * This code has been developed by the University of Waikato WAND
11 * research group. For further information please see http://www.wand.net.nz/
12 *
13 * libtrace is free software; you can redistribute it and/or modify
14 * it under the terms of the GNU General Public License as published by
15 * the Free Software Foundation; either version 2 of the License, or
16 * (at your option) any later version.
17 *
18 * libtrace is distributed in the hope that it will be useful,
19 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21 * GNU General Public License for more details.
22 *
23 * You should have received a copy of the GNU General Public License
24 * along with libtrace; if not, write to the Free Software
25 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26 *
27 * $Id$
28 *
29 */
30
31#include "libtrace.h"
32#include "format.h"
33#include "rtserver.h"
34
35#ifdef HAVE_INTTYPES_H
36#  include <inttypes.h>
37#else
38#  error "Can't find inttypes.h - this needs to be fixed"
39#endif
40
41#ifdef HAVE_STDDEF_H
42#  include <stddef.h>
43#else
44# error "Can't find stddef.h - do you define ptrdiff_t elsewhere?"
45#endif
46#include <sys/types.h>
47#include <sys/socket.h>
48#include <sys/un.h>
49#include <sys/mman.h>
50#include <sys/stat.h>
51#include <unistd.h>
52#include <assert.h>
53#include <errno.h>
54#include <netdb.h>
55#include <fcntl.h>
56
57/* Catch undefined O_LARGEFILE on *BSD etc */
58#ifndef O_LARGEFILE
59#  define O_LARGEFILE 0
60#endif
61static int dag_init_input(struct libtrace_t *libtrace) {
62#ifdef HAVE_DAG
63        struct stat buf;
64        if (stat(libtrace->conn_info.path,&buf) == -1) {
65                perror("stat");
66                return 0;
67        } 
68        if (S_ISCHR(buf.st_mode)) {
69                // DEVICE
70                if((libtrace->input.fd = 
71                                dag_open(libtrace->conn_info.path)) < 0) {
72                        fprintf(stderr,"Cannot open DAG %s: %m\n", 
73                                        libtrace->conn_info.path,errno);
74                        exit(0);
75                }
76                if((libtrace->dag.buf = (void *)
77                                dag_mmap(libtrace->input.fd)) == MAP_FAILED) {
78                        fprintf(stderr,"Cannot mmap DAG %s: %m\n", 
79                                        libtrace->conn_info.path,errno);
80                        exit(0);
81                }
82                if(dag_start(libtrace->input.fd) < 0) {
83                        fprintf(stderr,"Cannot start DAG %s: %m\n", 
84                                        libtrace->conn_info.path,errno);
85                        exit(0);
86                }
87        } else {
88                fprintf(stderr,"%s isn't a valid char device, exiting\n",
89                                libtrace->conn_info.path);
90                return 0;
91        }
92#endif
93}
94
95static int erf_init_input(struct libtrace_t *libtrace) {
96        struct stat buf;
97        struct hostent *he;
98        struct sockaddr_in remote;
99        struct sockaddr_un unix_sock;
100        if (!strncmp(libtrace->conn_info.path,"-",1)) {
101                // STDIN
102#if HAVE_ZLIB
103                libtrace->input.file = gzdopen(STDIN, "r");
104#else   
105                libtrace->input.file = stdin;
106#endif
107
108        } else {
109                if (stat(libtrace->conn_info.path,&buf) == -1 ) {
110                        perror("stat");
111                        return 0;
112                }
113                if (S_ISSOCK(buf.st_mode)) {
114                        // SOCKET
115                        if ((libtrace->input.fd = socket(
116                                        AF_UNIX, SOCK_STREAM, 0)) == -1) {
117                                perror("socket");
118                                return 0;
119                        }
120                        unix_sock.sun_family = AF_UNIX;
121                        bzero(unix_sock.sun_path,108);
122                        snprintf(unix_sock.sun_path,
123                                        108,"%s"
124                                        ,libtrace->conn_info.path);
125
126                        if (connect(libtrace->input.fd, 
127                                        (struct sockaddr *)&unix_sock,
128                                        sizeof(struct sockaddr)) == -1) {
129                                perror("connect (unix)");
130                                return 0;
131                        }
132                } else { 
133                        // TRACE
134#if HAVE_ZLIB
135                        // using gzdopen means we can set O_LARGEFILE
136                        // ourselves. However, this way is messy and
137                        // we lose any error checking on "open"
138                        libtrace->input.file = 
139                                gzdopen(open(
140                                        libtrace->conn_info.path,
141                                        O_LARGEFILE), "r");
142#else
143                        libtrace->input.file = 
144                                fdopen(open(
145                                        libtrace->conn_info.path,
146                                        O_LARGEFILE), "r");
147#endif
148
149                }
150        }
151}
152
153static int rtclient_init_input(struct libtrace_t *libtrace) {
154        char *scan;
155        char *uridata = libtrace->conn_info.path;
156        struct hostent *he;
157        struct sockaddr_in remote;
158
159        if (strlen(uridata) == 0) {
160                libtrace->conn_info.rt.hostname = 
161                        strdup("localhost");
162                libtrace->conn_info.rt.port = 
163                        COLLECTOR_PORT;
164        } else {
165                if ((scan = strchr(uridata,':')) == NULL) {
166                        libtrace->conn_info.rt.hostname = 
167                                strdup(uridata);
168                        libtrace->conn_info.rt.port =
169                                COLLECTOR_PORT;
170                } else {
171                        libtrace->conn_info.rt.hostname = 
172                                (char *)strndup(uridata,
173                                                (scan - uridata));
174                        libtrace->conn_info.rt.port = 
175                                atoi(++scan);
176                }
177        }
178       
179        if ((he=gethostbyname(libtrace->conn_info.rt.hostname)) == NULL) { 
180                perror("gethostbyname");
181                return 0;
182        } 
183        if ((libtrace->input.fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
184                perror("socket");
185                return 0;
186        }
187
188        remote.sin_family = AF_INET;   
189        remote.sin_port = htons(libtrace->conn_info.rt.port);
190        remote.sin_addr = *((struct in_addr *)he->h_addr);
191        bzero(&(remote.sin_zero), 8);
192
193        if (connect(libtrace->input.fd, (struct sockaddr *)&remote,
194                                sizeof(struct sockaddr)) == -1) {
195                perror("connect (inet)");
196                return 0;
197        }
198}
199
200static int erf_init_output(struct libtrace_out_t *libtrace) {
201        struct stat buf;
202
203        if (!strncmp(libtrace->conn_info.path,"-",1)) {
204                // STDOUT
205#if HAVE_ZLIB
206                libtrace->output.file = gzdopen(dup(1), "w");
207#else
208                libtrace->output.file = stdout;
209#endif
210        }
211        else {
212                // TRACE
213#if HAVE_ZLIB
214                // using gzdopen means we can set O_LARGEFILE
215                // ourselves. However, this way is messy and
216                // we lose any error checking on "open"
217                libtrace->output.file =  gzdopen(open(
218                                        libtrace->conn_info.path,
219                                        O_CREAT | O_LARGEFILE | O_WRONLY, 
220                                        S_IRUSR | S_IWUSR), "w");
221#else
222                libtrace->output.file =  fdopen(open(
223                                        libtrace->conn_info.path,
224                                        O_CREAT | O_LARGEFILE | O_WRONLY, 
225                                        S_IRUSR | S_IWUSR), "w");
226#endif
227        }
228
229}
230
231static int rtclient_init_output(struct libtrace_out_t *libtrace) {
232        char * uridata = libtrace->conn_info.path;
233        char * scan;
234        // extract conn_info from uridata
235        if (strlen(uridata) == 0) {
236                libtrace->conn_info.rt.hostname = strdup("localhost");
237                libtrace->conn_info.rt.port = COLLECTOR_PORT;
238        }
239        else {
240                if ((scan = strchr(uridata,':')) == NULL) {
241                        libtrace->conn_info.rt.hostname =
242                                strdup(uridata);
243                        libtrace->conn_info.rt.port =
244                                COLLECTOR_PORT;
245                } else {
246                        libtrace->conn_info.rt.hostname =
247                                (char *)strndup(uridata,
248                                                (scan - uridata));
249                        libtrace->conn_info.rt.port =
250                                atoi(++scan);
251                }
252        }
253       
254       
255        libtrace->output.rtserver = rtserver_create(libtrace->conn_info.rt.hostname,
256                                libtrace->conn_info.rt.port);
257        if (!libtrace->output.rtserver)
258                return 0;
259       
260}
261
262static int dag_fin_input(struct libtrace_t *libtrace) {
263#ifdef HAVE_DAG
264        dag_stop(libtrace->input.fd);
265#endif
266}
267
268static int erf_fin_input(struct libtrace_t *libtrace) {
269#if HAVE_ZLIB
270        gzclose(libtrace->input.file);
271#else   
272        fclose(libtrace->input.file);   
273#endif
274}
275
276static int rtclient_fin_input(struct libtrace_t *libtrace) {
277        close(libtrace->input.fd);
278}
279
280static int erf_fin_output(struct libtrace_out_t *libtrace) {
281#if HAVE_ZLIB
282        gzclose(libtrace->output.file);
283#else
284        fclose(libtrace->output.file);
285#endif
286}
287 
288
289static int rtclient_fin_output(struct libtrace_out_t *libtrace) {
290        rtserver_destroy(libtrace->output.rtserver);
291}
292
293static int dag_read(struct libtrace_t *libtrace, void *buffer, size_t len) {
294#if HAVE_DAG
295        int numbytes;
296        static short lctr = 0;
297        struct dag_record_t *erfptr = 0;
298        int rlen;
299
300        if (buffer == 0)
301                buffer = malloc(len);
302       
303        libtrace->dag.bottom = libtrace->dag.top;
304        libtrace->dag.top = dag_offset(
305                        libtrace->input.fd,
306                        &(libtrace->dag.bottom),
307                        0);
308        libtrace->dag.diff = libtrace->dag.top -
309                libtrace->dag.bottom;
310
311        numbytes=libtrace->dag.diff;
312        libtrace->dag.offset = 0;
313        return numbytes;
314#else
315        return -1;
316#endif
317}
318
319static int dag_read_packet(struct libtrace_t *libtrace, struct libtrace_packet_t *packet) {
320#if HAVE_DAG
321        int numbytes;
322        int size;
323        char buf[RP_BUFSIZE];
324        dag_record_t *erfptr;
325        void *buffer = packet->buffer;
326        void *buffer2 = buffer;
327        int rlen;
328       
329        if (libtrace->dag.diff == 0) {
330                if ((numbytes = dag_read(libtrace,buf,RP_BUFSIZE)) <= 0) 
331                        return numbytes;
332        }
333
334        //DAG always gives us whole packets
335        erfptr = (dag_record_t *) ((void *)libtrace->dag.buf + 
336                        (libtrace->dag.bottom + libtrace->dag.offset));
337        size = ntohs(erfptr->rlen);
338
339        if ( size  > LIBTRACE_PACKET_BUFSIZE) {
340                printf("%d\n",size);
341                assert( size < LIBTRACE_PACKET_BUFSIZE);
342        }
343
344        // have to copy it out of the memory hole at this stage:
345        memcpy(packet->buffer, erfptr, size);
346
347        packet->size = size;
348        libtrace->dag.offset += size;
349        libtrace->dag.diff -= size;
350
351        assert(libtrace->dag.diff >= 0);
352
353        return (size);
354#else
355        return -1;
356#endif
357}
358
359static int erf_read_packet(struct libtrace_t *libtrace, struct libtrace_packet_t *packet) {
360        int numbytes;
361        int size;
362        char buf[RP_BUFSIZE];
363        dag_record_t *erfptr;
364        void *buffer = packet->buffer;
365        void *buffer2 = buffer;
366        int rlen;
367       
368        if ((numbytes=gzread(libtrace->input.file,
369                                        buffer,
370                                        dag_record_size)) == -1) {
371                perror("gzread");
372                return -1;
373        }
374        if (numbytes == 0) {
375                return 0;
376        }
377        rlen = ntohs(((dag_record_t *)buffer)->rlen);
378        size = rlen - dag_record_size;
379        assert(size < LIBTRACE_PACKET_BUFSIZE);
380        buffer2 = buffer + dag_record_size;
381       
382        // read in the rest of the packet
383        if ((numbytes=gzread(libtrace->input.file,
384                                        buffer2,
385                                        size)) == -1) {
386                perror("gzread");
387                return -1;
388        }
389        packet->size = rlen;
390        return rlen;
391}
392
393static int rtclient_read(struct libtrace_t *libtrace, void *buffer, size_t len) {
394        int numbytes;
395        static short lctr = 0;
396        struct dag_record_t *erfptr = 0;
397        int rlen;
398
399        if (buffer == 0)
400                buffer = malloc(len);
401        while(1) {
402#ifndef MSG_NOSIGNAL
403#  define MSG_NOSIGNAL 0
404#endif
405                if ((numbytes = recv(libtrace->input.fd,
406                                                buffer,
407                                                len,
408                                                MSG_NOSIGNAL)) == -1) {
409                        if (errno == EINTR) {
410                                //ignore EINTR in case
411                                // a caller is using signals
412                                continue;
413                        }
414                        perror("recv");
415                        return -1;
416                }
417                break;
418
419        }
420        return numbytes;
421}
422
423static int rtclient_read_packet(struct libtrace_t *libtrace, struct libtrace_packet_t *packet) {
424        int numbytes;
425        int size;
426        char buf[RP_BUFSIZE];
427        dag_record_t *erfptr;
428        int read_required = 0;
429       
430        void *buffer = 0;
431        buffer = packet->buffer;
432
433        do {
434                if (fifo_out_available(libtrace->fifo) == 0 || read_required) {
435                        if ((numbytes = rtclient_read(
436                                                libtrace,buf,RP_BUFSIZE))<=0) {
437                                return numbytes;
438                        }
439                        assert(libtrace->fifo);
440                        fifo_write(libtrace->fifo,buf,numbytes);
441                        read_required = 0;
442                }
443                // Read status byte
444                if (fifo_out_read(libtrace->fifo,
445                                &packet->status, sizeof(int)) == 0) {
446                        read_required = 1;
447                        continue;
448                }
449                fifo_out_update(libtrace->fifo,sizeof(int));
450
451                // read in the ERF header
452                if ((numbytes = fifo_out_read(libtrace->fifo, buffer,
453                                                sizeof(dag_record_t))) == 0) {
454                        fifo_out_reset(libtrace->fifo);
455                        read_required = 1;
456                        continue;
457                }
458                size = ntohs(((dag_record_t *)buffer)->rlen);
459
460                // read in the full packet
461                if ((numbytes = fifo_out_read(libtrace->fifo, 
462                                                buffer, size)) == 0) {
463                        fifo_out_reset(libtrace->fifo);
464                        read_required = 1;
465                        continue;
466                }
467
468                // got in our whole packet, so...
469                fifo_out_update(libtrace->fifo,size);
470
471                fifo_ack_update(libtrace->fifo,size + sizeof(int));
472
473                packet->size = numbytes;
474                return numbytes;
475        } while(1);
476}
477
478static int erf_write_packet(struct libtrace_out_t *libtrace, struct libtrace_packet_t *packet) {
479        int numbytes = 0;
480
481        if ((numbytes = gzwrite(libtrace->output.file, packet->buffer, packet->size)) == 0) {
482                perror("gzwrite");
483                return -1;
484        }
485        return numbytes;
486}
487
488static int rtclient_write_packet(struct libtrace_out_t *libtrace, struct libtrace_packet_t *packet) {
489        int numbytes = 0;
490        int size;
491        int intsize = sizeof(int);
492        char buf[RP_BUFSIZE];
493        void *buffer = &buf[intsize];
494        int write_required = 0;
495       
496        do {
497                if (rtserver_checklisten(libtrace->output.rtserver) < 0)
498                        return -1;
499
500                assert(libtrace->fifo);
501                if (fifo_out_available(libtrace->fifo) == 0 || write_required) {
502                        // Packet added to fifo
503                        if ((numbytes = fifo_write(libtrace->fifo, packet->buffer, packet->size)) == 0) {
504                                // some error with the fifo
505                                perror("fifo_write");
506                                return -1;
507                        }
508                        write_required = 0;
509                }
510
511                // Read from fifo and add protocol header
512                if ((numbytes = fifo_out_read(libtrace->fifo, buffer, sizeof(dag_record_t))) == 0) {
513                        // failure reading in from fifo
514                        fifo_out_reset(libtrace->fifo);
515                        write_required = 1;
516                        continue;
517                }
518                size = ntohs(((dag_record_t *)buffer)->rlen);
519                assert(size < LIBTRACE_PACKET_BUFSIZE);
520                if ((numbytes = fifo_out_read(libtrace->fifo, buffer, size)) == 0) {
521                       // failure reading in from fifo
522                       fifo_out_reset(libtrace->fifo);
523                       write_required = 1;
524                       continue;
525                }
526                // Sort out the protocol header
527                memcpy(buf, &packet->status, intsize);
528
529                if ((numbytes = rtserver_sendclients(libtrace->output.rtserver, buf, size + sizeof(int))) < 0) {
530                        write_required = 0;
531                        continue;
532                }
533
534
535                fifo_out_update(libtrace->fifo, size);
536                // Need an ack to come back
537                // TODO: Obviously this is a little unfinished
538                if ("ACK_ARRIVES") {
539                        fifo_ack_update(libtrace->fifo, size);
540                        return numbytes;
541                } else {
542                        fifo_out_reset(libtrace->fifo);
543                }
544        } while(1);
545}
546
547static void *erf_get_link(const struct libtrace_packet_t *packet) {
548        const void *ethptr = 0;
549        dag_record_t *erfptr = 0;
550        erfptr = (dag_record_t *)packet->buffer;
551       
552        if (erfptr->flags.rxerror == 1) {
553                return NULL;
554        }
555        ethptr = ((uint8_t *)packet->buffer +
556                        dag_record_size + 2);
557        return (void *)ethptr;
558}
559
560static libtrace_linktype_t erf_get_link_type(const struct libtrace_packet_t *packet) {
561        dag_record_t *erfptr = 0;
562        erfptr = (dag_record_t *)packet->buffer;
563        switch (erfptr->type) {
564                case TYPE_ETH: return TRACE_TYPE_ETH;
565                case TYPE_ATM: return TRACE_TYPE_ATM;
566                default: assert(0);
567        }
568        return erfptr->type;
569}
570
571static int8_t erf_get_direction(const struct libtrace_packet_t *packet) {
572        dag_record_t *erfptr = 0;
573        erfptr = (dag_record_t *)packet->buffer;
574        return erfptr->flags.iface;
575}
576
577static int8_t erf_set_direction(const struct libtrace_packet_t *packet, int8_t direction) {
578        dag_record_t *erfptr = 0;
579        erfptr = (dag_record_t *)packet->buffer;
580        erfptr->flags.iface = direction;
581        return erfptr->flags.iface;
582}
583
584static uint64_t erf_get_erf_timestamp(const struct libtrace_packet_t *packet) {
585        dag_record_t *erfptr = 0;
586        erfptr = (dag_record_t *)packet->buffer;
587        return erfptr->ts;
588}
589
590static int erf_get_capture_length(const struct libtrace_packet_t *packet) {
591        dag_record_t *erfptr = 0;
592        erfptr = (dag_record_t *)packet->buffer;
593        return ntohs(erfptr->rlen);
594}
595
596static int erf_get_wire_length(const struct libtrace_packet_t *packet) {
597        dag_record_t *erfptr = 0;
598        erfptr = (dag_record_t *)packet->buffer;
599        return ntohs(erfptr->wlen);
600}
601
602static size_t erf_set_capture_length(struct libtrace_packet_t *packet, const size_t size) {
603        dag_record_t *erfptr = 0;
604        assert(packet);
605        if(size > packet->size) {
606                // can't make a packet larger
607                return packet->size;
608        }
609        erfptr = (dag_record_t *)packet->buffer;
610        erfptr->rlen = ntohs(size + sizeof(dag_record_t));
611        packet->size = size + sizeof(dag_record_t);
612        return packet->size;
613}
614
615static struct format_t erf = {
616        "erf",
617        "$Id$",
618        erf_init_input,                 /* init_input */       
619        erf_init_output,                /* init_output */
620        erf_fin_input,                  /* fin_input */
621        erf_fin_output,                 /* fin_output */
622        NULL,                           /* read */
623        erf_read_packet,                /* read_packet */
624        erf_write_packet,               /* write_packet */
625        erf_get_link,                   /* get_link */
626        erf_get_link_type,              /* get_link_type */
627        erf_get_direction,              /* get_direction */
628        erf_set_direction,              /* set_direction */
629        erf_get_erf_timestamp,          /* get_erf_timestamp */
630        NULL,                           /* get_timeval */
631        NULL,                           /* get_seconds */
632        erf_get_capture_length,         /* get_capture_length */
633        erf_get_wire_length,            /* get_wire_length */
634        erf_set_capture_length          /* set_capture_length */
635};
636
637static struct format_t dag = {
638        "dag",
639        "$Id$",
640        dag_init_input,                 /* init_input */       
641        NULL,                           /* init_output */
642        dag_fin_input,                  /* fin_input */
643        NULL,                           /* fin_output */
644        dag_read,                       /* read */
645        dag_read_packet,                /* read_packet */
646        NULL,                           /* write_packet */
647        erf_get_link,                   /* get_link */
648        erf_get_link_type,              /* get_link_type */
649        erf_get_direction,              /* get_direction */
650        erf_set_direction,              /* set_direction */
651        erf_get_erf_timestamp,          /* get_erf_timestamp */
652        NULL,                           /* get_timeval */
653        NULL,                           /* get_seconds */
654        erf_get_capture_length,         /* get_capture_length */
655        erf_get_wire_length,            /* get_wire_length */
656        erf_set_capture_length          /* set_capture_length */
657};
658
659static struct format_t rtclient = {
660        "rtclient",
661        "$Id$",
662        rtclient_init_input,            /* init_input */       
663        rtclient_init_output,           /* init_output */
664        rtclient_fin_input,             /* fin_input */
665        rtclient_fin_output,            /* fin_output */
666        rtclient_read,                  /* read */
667        rtclient_read_packet,           /* read_packet */
668        rtclient_write_packet,          /* write_packet */
669        erf_get_link,                   /* get_link */
670        erf_get_link_type,              /* get_link_type */
671        erf_get_direction,              /* get_direction */
672        erf_set_direction,              /* set_direction */
673        erf_get_erf_timestamp,          /* get_erf_timestamp */
674        NULL,                           /* get_timeval */
675        NULL,                           /* get_seconds */
676        erf_get_capture_length,         /* get_capture_length */
677        erf_get_wire_length,            /* get_wire_length */
678        erf_set_capture_length          /* set_capture_length */
679};
680
681void __attribute__((constructor)) erf_constructor() {
682        register_format(&erf);
683        register_format(&dag);
684        register_format(&rtclient);
685}
Note: See TracBrowser for help on using the repository browser.