Changeset 1fc2f6a


Ignore:
Timestamp:
08/01/05 15:40:37 (15 years ago)
Author:
Shane Alcock <salcock@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, getfragoff, help, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
9a36e6d
Parents:
74c7660
Message:

Output functions to format_erf.c

Location:
lib
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • lib/Makefile.am

    r7050c10 r1fc2f6a  
    11lib_LTLIBRARIES = libtrace.la
    22include_HEADERS = libtrace.h dagformat.h wag.h
    3 libtrace_la_SOURCES = trace.c fifo.c fifo.h common.h format_template.c format_erf.c format_pcap.c
     3libtrace_la_SOURCES = trace.c fifo.c fifo.h common.h format_template.c format_erf.c format_pcap.c rtserver.c rtserver.h
    44libtrace_la_CFLAGS = @ADD_INCLS@
    55libtrace_la_LIBADD = @ADD_LIBS@ @LTLIBOBJS@
  • lib/format.h

    r7050c10 r1fc2f6a  
    4040#include "libtrace.h"
    4141#include "fifo.h"
     42#include "rtserver.h"
    4243
    4344#if HAVE_PCAP_BPF_H
     
    125126        double start_ts;
    126127};
     128
     129struct libtrace_out_t {
     130        struct format_t * format;
     131
     132        union {
     133                struct {
     134                        char *hostname;
     135                        short port;
     136                } rt;
     137                char *path;
     138                char *interface;
     139        } conn_info;
     140
     141        union {
     142                int fd;
     143                struct rtserver_t * rtserver;
     144#if HAVE_ZLIB
     145                gzFile *file;
     146#else
     147                FILE *file;
     148#endif
     149#if HAVE_PCAP
     150                pcap_t *pcap;
     151#endif
     152        } output;
     153
     154        struct fifo_t *fifo;
     155
     156
     157};
     158
    127159
    128160struct trace_sll_header_t {
  • lib/format_erf.c

    r7050c10 r1fc2f6a  
    3131#include "libtrace.h"
    3232#include "format.h"
     33#include "rtserver.h"
    3334
    3435#ifdef HAVE_INTTYPES_H
     
    5253#include <errno.h>
    5354#include <netdb.h>
     55#include <fcntl.h>
    5456
    5557/* Catch undefined O_LARGEFILE on *BSD etc */
     
    196198}
    197199
     200static int erf_init_output(struct libtrace_out_t *libtrace) {
     201        struct stat buf;
     202
     203        if (!strncmp(libtrace->conn_info.path,"-",1)) {
     204                // STDOUT
     205#if HAVE_ZLIB
     206                libtrace->output.file = gzdopen(dup(1), "w");
     207#else
     208                libtrace->output.file = stdout;
     209#endif
     210        }
     211        else {
     212                // TRACE
     213#if HAVE_ZLIB
     214                // using gzdopen means we can set O_LARGEFILE
     215                // ourselves. However, this way is messy and
     216                // we lose any error checking on "open"
     217                libtrace->output.file =  gzdopen(open(
     218                                        libtrace->conn_info.path,
     219                                        O_CREAT | O_LARGEFILE | O_WRONLY,
     220                                        S_IRUSR | S_IWUSR), "w");
     221#else
     222                libtrace->output.file =  fdopen(open(
     223                                        libtrace->conn_info.path,
     224                                        O_CREAT | O_LARGEFILE | O_WRONLY,
     225                                        S_IRUSR | S_IWUSR), "w");
     226#endif
     227        }
     228
     229}
     230
     231static int rtclient_init_output(struct libtrace_out_t *libtrace) {
     232        char * uridata = libtrace->conn_info.path;
     233        char * scan;
     234        // extract conn_info from uridata
     235        if (strlen(uridata) == 0) {
     236                libtrace->conn_info.rt.hostname = strdup("localhost");
     237                libtrace->conn_info.rt.port = COLLECTOR_PORT;
     238        }
     239        else {
     240                if ((scan = strchr(uridata,':')) == NULL) {
     241                        libtrace->conn_info.rt.hostname =
     242                                strdup(uridata);
     243                        libtrace->conn_info.rt.port =
     244                                COLLECTOR_PORT;
     245                } else {
     246                        libtrace->conn_info.rt.hostname =
     247                                (char *)strndup(uridata,
     248                                                (scan - uridata));
     249                        libtrace->conn_info.rt.port =
     250                                atoi(++scan);
     251                }
     252        }
     253       
     254       
     255        libtrace->output.rtserver = rtserver_create(libtrace->conn_info.rt.hostname,
     256                                libtrace->conn_info.rt.port);
     257        if (!libtrace->output.rtserver)
     258                return 0;
     259       
     260}
     261
    198262static int dag_fin_input(struct libtrace_t *libtrace) {
    199263#ifdef HAVE_DAG
     
    212276static int rtclient_fin_input(struct libtrace_t *libtrace) {
    213277        close(libtrace->input.fd);
     278}
     279
     280static int erf_fin_output(struct libtrace_out_t *libtrace) {
     281#if HAVE_ZLIB
     282        gzclose(libtrace->output.file);
     283#else
     284        fclose(libtrace->output.file);
     285#endif
     286}
     287 
     288
     289static int rtclient_fin_output(struct libtrace_out_t *libtrace) {
     290        rtserver_destroy(libtrace->output.rtserver);
    214291}
    215292
     
    399476}
    400477
     478static int erf_write_packet(struct libtrace_out_t *libtrace, struct libtrace_packet_t *packet) {
     479        int numbytes = 0;
     480
     481        if ((numbytes = gzwrite(libtrace->output.file, packet->buffer, packet->size)) == 0) {
     482                perror("gzwrite");
     483                return -1;
     484        }
     485        return numbytes;
     486}
     487
     488static int rtclient_write_packet(struct libtrace_out_t *libtrace, struct libtrace_packet_t *packet) {
     489        int numbytes = 0;
     490        int size;
     491        int intsize = sizeof(int);
     492        char buf[RP_BUFSIZE];
     493        void *buffer = &buf[intsize];
     494        int write_required = 0;
     495       
     496        do {
     497                if (rtserver_checklisten(libtrace->output.rtserver) < 0)
     498                        return -1;
     499
     500                assert(libtrace->fifo);
     501                if (fifo_out_available(libtrace->fifo) == 0 || write_required) {
     502                        // Packet added to fifo
     503                        if ((numbytes = fifo_write(libtrace->fifo, packet->buffer, packet->size)) == 0) {
     504                                // some error with the fifo
     505                                perror("fifo_write");
     506                                return -1;
     507                        }
     508                        write_required = 0;
     509                }
     510
     511                // Read from fifo and add protocol header
     512                if ((numbytes = fifo_out_read(libtrace->fifo, buffer, sizeof(dag_record_t))) == 0) {
     513                        // failure reading in from fifo
     514                        fifo_out_reset(libtrace->fifo);
     515                        write_required = 1;
     516                        continue;
     517                }
     518                size = ntohs(((dag_record_t *)buffer)->rlen);
     519                assert(size < LIBTRACE_PACKET_BUFSIZE);
     520                if ((numbytes = fifo_out_read(libtrace->fifo, buffer, size)) == 0) {
     521                       // failure reading in from fifo
     522                       fifo_out_reset(libtrace->fifo);
     523                       write_required = 1;
     524                       continue;
     525                }
     526                // Sort out the protocol header
     527                memcpy(buf, &packet->status, intsize);
     528
     529                if ((numbytes = rtserver_sendclients(libtrace->output.rtserver, buf, size + sizeof(int))) < 0) {
     530                        write_required = 0;
     531                        continue;
     532                }
     533
     534
     535                fifo_out_update(libtrace->fifo, size);
     536                // Need an ack to come back
     537                // TODO: Obviously this is a little unfinished
     538                if ("ACK_ARRIVES") {
     539                        fifo_ack_update(libtrace->fifo, size);
     540                        return numbytes;
     541                } else {
     542                        fifo_out_reset(libtrace->fifo);
     543                }
     544        } while(1);
     545}
     546
    401547static void *erf_get_link(const struct libtrace_packet_t *packet) {
    402         dag_record_t *erfptr = 0;
    403         erfptr = (dag_record_t *)packet->buffer;
    404         const void *ethptr = 0;
    405 
     548        const void *ethptr = 0;
     549        dag_record_t *erfptr = 0;
     550        erfptr = (dag_record_t *)packet->buffer;
     551       
    406552        if (erfptr->flags.rxerror == 1) {
    407553                return NULL;
     
    471617        "$Id$",
    472618        erf_init_input,                 /* init_input */       
    473         NULL,                           /* init_output */
     619        erf_init_output,                /* init_output */
    474620        erf_fin_input,                  /* fin_input */
    475         NULL,                           /* fin_output */
     621        erf_fin_output,                 /* fin_output */
    476622        NULL,                           /* read */
    477623        erf_read_packet,                /* read_packet */
    478         NULL,                           /* write_packet */
     624        erf_write_packet,               /* write_packet */
    479625        erf_get_link,                   /* get_link */
    480626        erf_get_link_type,              /* get_link_type */
     
    515661        "$Id$",
    516662        rtclient_init_input,            /* init_input */       
    517         NULL,                           /* init_output */
     663        rtclient_init_output,           /* init_output */
    518664        rtclient_fin_input,             /* fin_input */
    519         NULL,                           /* fin_output */
     665        rtclient_fin_output,            /* fin_output */
    520666        rtclient_read,                  /* read */
    521667        rtclient_read_packet,           /* read_packet */
    522         NULL,                           /* write_packet */
     668        rtclient_write_packet,          /* write_packet */
    523669        erf_get_link,                   /* get_link */
    524670        erf_get_link_type,              /* get_link_type */
  • lib/trace.c

    r74c7660 r1fc2f6a  
    4242#include "common.h"
    4343#include "config.h"
    44 #include "format.h"
    4544#include <assert.h>
    4645#include <errno.h>
     
    103102#include "libtrace.h"
    104103#include "fifo.h"
     104#include "format.h"
    105105
    106106#if HAVE_PCAP_BPF_H
     
    141141//typedef enum {ERF, PCAP, PCAPINT, DAG, RTCLIENT, WAG, WAGINT } format_e_t;
    142142
    143 typedef enum {RTSERVER, GZERF } output_t;
     143//typedef enum {RTSERVER, GZERF } output_t;
    144144#if HAVE_BPF
    145145/** A type encapsulating a bpf filter
     
    153153};
    154154#endif
    155 
    156 struct libtrace_out_t {
    157         output_t outputformat;
    158 
    159         union {
    160                 struct {
    161                         char *hostname;
    162                         short port;
    163                 } rt;
    164                 char *path;
    165                 char *interface;
    166         } conn_info;
    167 
    168         union {
    169                 int fd;
    170 #if HAVE_ZLIB
    171                 gzFile *file;
    172 #else
    173                 FILE *file;
    174 #endif
    175 #if HAVE_PCAP
    176                 pcap_t *pcap;
    177 #endif
    178         } output;
    179 
    180         struct fifo_t *fifo;
    181 };
    182 
    183 
    184155
    185156struct format_t **format_list = 0;
     
    280251        char *scan = calloc(sizeof(char),URI_PROTO_LINE);
    281252        char *uridata = 0;
     253        int i;
    282254
    283255        // parse the URI to determine what sort of event we are dealing with
     
    296268        strncpy(scan,uri, (uridata - uri));
    297269
    298         if (!strncasecmp(scan,"gzerf",5)) {
    299                 (*libtrace)->outputformat = GZERF;
    300         } else if (!strncasecmp(scan, "rt", 2)) {
    301                 (*libtrace)->outputformat = RTSERVER;
    302         } else {
    303                 return 0;
    304         }
    305 
    306         uridata ++;
    307         switch((*libtrace)->outputformat) {
    308                 case GZERF:
    309                         /*
    310                          * Acceptable uridata takes the form:
    311                          * /path/to/file.gz
    312                          */
    313                         (*libtrace)->conn_info.path = strdup(uridata);
    314                         break;
    315                 case RTSERVER:
    316                         /*
    317                          * Possible uridata formats:
    318                          * hostname
    319                          * hostname:port
    320                          */
    321                         if (strlen(uridata) == 0) {
    322                                 (*libtrace)->conn_info.rt.hostname =
    323                                         strdup("localhost");
    324                                 (*libtrace)->conn_info.rt.port =
    325                                         COLLECTOR_PORT;
     270        (*libtrace)->format = 0;
     271        for (i = 0; i < nformats; i++) {
     272                if (strlen(scan) == strlen(format_list[i]->name) &&
     273                                !strncasecmp(scan,
     274                                        format_list[i]->name,
     275                                        strlen(scan))) {
     276                                (*libtrace)->format=format_list[i];
    326277                                break;
    327                         }
    328                         if ((scan = strchr(uridata,':')) == NULL) {
    329                                 (*libtrace)->conn_info.rt.hostname =
    330                                         strdup(uridata);
    331                                 (*libtrace)->conn_info.rt.port =
    332                                         COLLECTOR_PORT;
    333                         } else {
    334                                 (*libtrace)->conn_info.rt.hostname =
    335                                         (char *)strndup(uridata,(scan - uridata));
    336 
    337                                 (*libtrace)->conn_info.rt.port =
    338                                         atoi(++scan);
    339                         }
    340                         break;
    341                 default:
    342                         fprintf(stderr, "How did you get here??\n");
    343         }
     278                                }
     279        }
     280        if ((*libtrace)->format == 0) {
     281                fprintf(stderr,
     282                        "libtrace has no support for this format (%s)\n",scan);
     283                return 0;
     284        }
     285
     286        // push uridata past the delimiter
     287        uridata++;
     288        (*libtrace)->conn_info.path = strdup(uridata);
     289
     290        // libtrace->format now contains the type of uri
     291        // libtrace->uridata contains the appropriate data for this
     292
     293        if ((*libtrace)->format->init_output) {
     294                (*libtrace)->format->init_output( (*libtrace));
     295        } else {
     296                fprintf(stderr,
     297                        "No init_output function for format %s\n",scan);
     298                return 0;
     299        }
     300
     301
    344302        (*libtrace)->fifo = create_fifo(1048576);
    345303        assert( (*libtrace)->fifo);
     
    399357struct libtrace_out_t *trace_output_create(char *uri) {
    400358        struct libtrace_out_t *libtrace = malloc(sizeof(struct libtrace_out_t));
    401         struct sockaddr_in remote, client;
    402         int client_fd, clilen;
    403         struct hostent *he;
    404359       
    405360        if (init_output(&libtrace, uri) == 0)
    406361                return 0;
    407362
    408         switch(libtrace->outputformat) {
     363        /*
     364         * switch(libtrace->outputformat) {
    409365                case RTSERVER:
    410366                        if ((he=gethostbyname(libtrace->conn_info.rt.hostname)) == NULL) {
     
    440396
    441397                case GZERF:
    442 /* Catch undefined O_LARGEFILE on *BSD etc */
    443398#ifndef O_LARGEFILE
    444399#  define O_LARGEFILE 0
     
    462417                        exit(0);
    463418        }
     419*/
    464420        return libtrace;
    465421}
     
    484440void trace_output_destroy(struct libtrace_out_t *libtrace) {
    485441        assert(libtrace);
    486 
    487         if (libtrace->outputformat == RTSERVER) {
    488                 close(libtrace->output.fd);
    489         }
    490         else {
    491 #if HAVE_ZLIB
    492                 gzclose(libtrace->output.file);
    493 #else
    494                 fclose(libtrace->output.file);
    495 #endif
    496         }
     442        libtrace->format->fin_output(libtrace);
    497443        destroy_fifo(libtrace->fifo);
    498444        free(libtrace);
     
    609555}
    610556
    611 static int trace_write(struct libtrace_out_t *libtrace, void *buffer, size_t len) {
    612         int numbytes = 0;
    613 
    614         assert(libtrace);
    615         assert(len >= 0);
    616 
    617         if (buffer == NULL) {
    618                 return 0;
    619         }
    620 
    621         while (1) {
    622                 switch(libtrace->outputformat) {
    623                         case RTSERVER:
    624 #ifndef MSG_NOSIGNAL
    625 #define MSG_NOSIGNAL 0
    626 #endif
    627                                 // Write to the network
    628                                 if ((numbytes = send(libtrace->output.fd,
    629                                                         buffer,
    630                                                         len,
    631                                                         MSG_NOSIGNAL)) == -1) {
    632                                         if (errno == EINTR) {
    633                                                 continue;
    634                                         }
    635                                         perror("send");
    636                                         return -1;
    637                                 }
    638                                 break;
    639                         case GZERF:                     
    640 #if HAVE_ZLIB
    641                                 if ((numbytes = gzwrite(libtrace->output.file,
    642                                                         buffer,
    643                                                         len)) == -1) {
    644                                         perror("gzwrite");
    645                                         return -1;
    646                                 }
    647                                 break;
    648 #else
    649                                 // Do binary write instead
    650                                 if ((numbytes = fwrite(buffer, len, 1, libtrace->output.file)) == 0) {
    651                                         perror("fwrite");
    652                                         return -1;
    653                                 }
    654                                 break;
    655 #endif
    656                         default:
    657                                 fprintf(stderr, "Bad output type\n");
    658                                 break;
    659                 }
    660                 break;
    661         }       
    662         return numbytes;
    663 }               
    664 
    665557/** Writes a packet to the specified output
    666558 *
     
    672564 * */
    673565int trace_write_packet(struct libtrace_out_t *libtrace, struct libtrace_packet_t *packet) {
    674         // initialise stuff
    675         int numbytes, size;
    676         char buf[RP_BUFSIZE];
    677         int intsize = sizeof(int);
    678         void *buffer = &buf[intsize];
    679         int write_required = 0;
    680 
    681566        assert(libtrace);
    682567        assert(packet);
    683568
    684         if (libtrace->outputformat == GZERF) {
    685                 // do gzwrite
    686                 if ((numbytes = gzwrite(libtrace->output.file, packet->buffer, packet->size)) == 0) {
    687                         perror("gzwrite");
    688                         return -1;
    689                 }
    690                 return numbytes;
    691         }
    692        
    693         // do fifo stuff for RT output instead
    694         if (libtrace->outputformat == RTSERVER) {
    695                 do {
    696                         assert(libtrace->fifo);
    697 
    698                         if (fifo_out_available(libtrace->fifo) == 0 || write_required) {
    699                                 // Packet added to fifo
    700                                 if ((numbytes = fifo_write(libtrace->fifo, packet->buffer, packet->size)) == 0) {
    701                                         // some error with the fifo
    702                                         perror("fifo_write");
    703                                         return -1;
    704                                 }
    705                                 write_required = 0;
    706                         }
    707                        
    708                         // Read from fifo and add protocol header
    709                         if ((numbytes = fifo_out_read(libtrace->fifo, buffer, sizeof(dag_record_t))) == 0) {
    710                                 // failure reading in from fifo
    711                                 fifo_out_reset(libtrace->fifo);
    712                                 write_required = 1;
    713                                 continue;
    714                         }
    715                         size = ntohs(((dag_record_t *)buffer)->rlen);           
    716                         assert(size < LIBTRACE_PACKET_BUFSIZE);
    717        
    718                         if ((numbytes = fifo_out_read(libtrace->fifo, buffer, size)) == 0) {
    719                                 // failure reading in from fifo
    720                                 fifo_out_reset(libtrace->fifo);
    721                                 write_required = 1;
    722                                 continue;
    723                         }
    724                         fifo_out_update(libtrace->fifo, size); 
    725                         // Sort out the protocol header
    726                         memcpy(buf, &packet->status, intsize);
    727                                                
    728 
    729                         // Send the buffer out on the wire
    730                         if ((numbytes = trace_write(libtrace, buf, size + sizeof(int))) <=0 ) {
    731                                 return numbytes;
    732                         }
    733 
    734                         // Need an ack to come back
    735                         // TODO: Obviously this is a little unfinished
    736                         if ("ACK_ARRIVES") {
    737                                 fifo_ack_update(libtrace->fifo, size);
    738                                 return numbytes;
    739                         } else {
    740                                 fifo_out_reset(libtrace->fifo);
    741                         }
    742                 } while(1);
    743         }
    744 
    745         // Unacceptable output format
    746         fprintf(stderr, "Unknown Output format \n");
    747         assert(0);
     569        if (libtrace->format->write_packet) {
     570                return libtrace->format->write_packet(libtrace, packet);
     571        }
     572
    748573}
    749574
Note: See TracChangeset for help on using the changeset viewer.