Changeset 1fc2f6a for lib/format_erf.c


Ignore:
Timestamp:
08/01/05 15:40:37 (15 years ago)
Author:
Shane Alcock <salcock@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, getfragoff, help, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
9a36e6d
Parents:
74c7660
Message:

Output functions to format_erf.c

File:
1 edited

Legend:

Unmodified
Added
Removed
  • lib/format_erf.c

    r7050c10 r1fc2f6a  
    3131#include "libtrace.h"
    3232#include "format.h"
     33#include "rtserver.h"
    3334
    3435#ifdef HAVE_INTTYPES_H
     
    5253#include <errno.h>
    5354#include <netdb.h>
     55#include <fcntl.h>
    5456
    5557/* Catch undefined O_LARGEFILE on *BSD etc */
     
    196198}
    197199
     200static int erf_init_output(struct libtrace_out_t *libtrace) {
     201        struct stat buf;
     202
     203        if (!strncmp(libtrace->conn_info.path,"-",1)) {
     204                // STDOUT
     205#if HAVE_ZLIB
     206                libtrace->output.file = gzdopen(dup(1), "w");
     207#else
     208                libtrace->output.file = stdout;
     209#endif
     210        }
     211        else {
     212                // TRACE
     213#if HAVE_ZLIB
     214                // using gzdopen means we can set O_LARGEFILE
     215                // ourselves. However, this way is messy and
     216                // we lose any error checking on "open"
     217                libtrace->output.file =  gzdopen(open(
     218                                        libtrace->conn_info.path,
     219                                        O_CREAT | O_LARGEFILE | O_WRONLY,
     220                                        S_IRUSR | S_IWUSR), "w");
     221#else
     222                libtrace->output.file =  fdopen(open(
     223                                        libtrace->conn_info.path,
     224                                        O_CREAT | O_LARGEFILE | O_WRONLY,
     225                                        S_IRUSR | S_IWUSR), "w");
     226#endif
     227        }
     228
     229}
     230
     231static int rtclient_init_output(struct libtrace_out_t *libtrace) {
     232        char * uridata = libtrace->conn_info.path;
     233        char * scan;
     234        // extract conn_info from uridata
     235        if (strlen(uridata) == 0) {
     236                libtrace->conn_info.rt.hostname = strdup("localhost");
     237                libtrace->conn_info.rt.port = COLLECTOR_PORT;
     238        }
     239        else {
     240                if ((scan = strchr(uridata,':')) == NULL) {
     241                        libtrace->conn_info.rt.hostname =
     242                                strdup(uridata);
     243                        libtrace->conn_info.rt.port =
     244                                COLLECTOR_PORT;
     245                } else {
     246                        libtrace->conn_info.rt.hostname =
     247                                (char *)strndup(uridata,
     248                                                (scan - uridata));
     249                        libtrace->conn_info.rt.port =
     250                                atoi(++scan);
     251                }
     252        }
     253       
     254       
     255        libtrace->output.rtserver = rtserver_create(libtrace->conn_info.rt.hostname,
     256                                libtrace->conn_info.rt.port);
     257        if (!libtrace->output.rtserver)
     258                return 0;
     259       
     260}
     261
    198262static int dag_fin_input(struct libtrace_t *libtrace) {
    199263#ifdef HAVE_DAG
     
    212276static int rtclient_fin_input(struct libtrace_t *libtrace) {
    213277        close(libtrace->input.fd);
     278}
     279
     280static int erf_fin_output(struct libtrace_out_t *libtrace) {
     281#if HAVE_ZLIB
     282        gzclose(libtrace->output.file);
     283#else
     284        fclose(libtrace->output.file);
     285#endif
     286}
     287 
     288
     289static int rtclient_fin_output(struct libtrace_out_t *libtrace) {
     290        rtserver_destroy(libtrace->output.rtserver);
    214291}
    215292
     
    399476}
    400477
     478static int erf_write_packet(struct libtrace_out_t *libtrace, struct libtrace_packet_t *packet) {
     479        int numbytes = 0;
     480
     481        if ((numbytes = gzwrite(libtrace->output.file, packet->buffer, packet->size)) == 0) {
     482                perror("gzwrite");
     483                return -1;
     484        }
     485        return numbytes;
     486}
     487
     488static int rtclient_write_packet(struct libtrace_out_t *libtrace, struct libtrace_packet_t *packet) {
     489        int numbytes = 0;
     490        int size;
     491        int intsize = sizeof(int);
     492        char buf[RP_BUFSIZE];
     493        void *buffer = &buf[intsize];
     494        int write_required = 0;
     495       
     496        do {
     497                if (rtserver_checklisten(libtrace->output.rtserver) < 0)
     498                        return -1;
     499
     500                assert(libtrace->fifo);
     501                if (fifo_out_available(libtrace->fifo) == 0 || write_required) {
     502                        // Packet added to fifo
     503                        if ((numbytes = fifo_write(libtrace->fifo, packet->buffer, packet->size)) == 0) {
     504                                // some error with the fifo
     505                                perror("fifo_write");
     506                                return -1;
     507                        }
     508                        write_required = 0;
     509                }
     510
     511                // Read from fifo and add protocol header
     512                if ((numbytes = fifo_out_read(libtrace->fifo, buffer, sizeof(dag_record_t))) == 0) {
     513                        // failure reading in from fifo
     514                        fifo_out_reset(libtrace->fifo);
     515                        write_required = 1;
     516                        continue;
     517                }
     518                size = ntohs(((dag_record_t *)buffer)->rlen);
     519                assert(size < LIBTRACE_PACKET_BUFSIZE);
     520                if ((numbytes = fifo_out_read(libtrace->fifo, buffer, size)) == 0) {
     521                       // failure reading in from fifo
     522                       fifo_out_reset(libtrace->fifo);
     523                       write_required = 1;
     524                       continue;
     525                }
     526                // Sort out the protocol header
     527                memcpy(buf, &packet->status, intsize);
     528
     529                if ((numbytes = rtserver_sendclients(libtrace->output.rtserver, buf, size + sizeof(int))) < 0) {
     530                        write_required = 0;
     531                        continue;
     532                }
     533
     534
     535                fifo_out_update(libtrace->fifo, size);
     536                // Need an ack to come back
     537                // TODO: Obviously this is a little unfinished
     538                if ("ACK_ARRIVES") {
     539                        fifo_ack_update(libtrace->fifo, size);
     540                        return numbytes;
     541                } else {
     542                        fifo_out_reset(libtrace->fifo);
     543                }
     544        } while(1);
     545}
     546
    401547static void *erf_get_link(const struct libtrace_packet_t *packet) {
    402         dag_record_t *erfptr = 0;
    403         erfptr = (dag_record_t *)packet->buffer;
    404         const void *ethptr = 0;
    405 
     548        const void *ethptr = 0;
     549        dag_record_t *erfptr = 0;
     550        erfptr = (dag_record_t *)packet->buffer;
     551       
    406552        if (erfptr->flags.rxerror == 1) {
    407553                return NULL;
     
    471617        "$Id$",
    472618        erf_init_input,                 /* init_input */       
    473         NULL,                           /* init_output */
     619        erf_init_output,                /* init_output */
    474620        erf_fin_input,                  /* fin_input */
    475         NULL,                           /* fin_output */
     621        erf_fin_output,                 /* fin_output */
    476622        NULL,                           /* read */
    477623        erf_read_packet,                /* read_packet */
    478         NULL,                           /* write_packet */
     624        erf_write_packet,               /* write_packet */
    479625        erf_get_link,                   /* get_link */
    480626        erf_get_link_type,              /* get_link_type */
     
    515661        "$Id$",
    516662        rtclient_init_input,            /* init_input */       
    517         NULL,                           /* init_output */
     663        rtclient_init_output,           /* init_output */
    518664        rtclient_fin_input,             /* fin_input */
    519         NULL,                           /* fin_output */
     665        rtclient_fin_output,            /* fin_output */
    520666        rtclient_read,                  /* read */
    521667        rtclient_read_packet,           /* read_packet */
    522         NULL,                           /* write_packet */
     668        rtclient_write_packet,          /* write_packet */
    523669        erf_get_link,                   /* get_link */
    524670        erf_get_link_type,              /* get_link_type */
Note: See TracChangeset for help on using the changeset viewer.