Changeset dd6e168 for lib/trace.c


Ignore:
Timestamp:
07/28/05 13:45:54 (15 years ago)
Author:
Shane Alcock <salcock@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, getfragoff, help, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
4dedc28
Parents:
5eae97a
Message:

gzerf and rtserver output added, no fifo ack though

File:
1 edited

Legend:

Unmodified
Added
Removed
  • lib/trace.c

    r5eae97a rdd6e168  
    176176
    177177        struct fifo_t *fifo;
     178
     179        // rtserver stuff
     180       
     181        /** fd that watches for incoming connections */
     182        int connect_fd;
     183        /** fds that represent connected clients */
     184        fd_set rt_fds;
     185        /** fds that are listening for connections (this should only contain connect_fd) */
     186        fd_set listen;
     187        int max_rtfds;
     188        /** Used for creating new fds as required */
     189        struct sockaddr_in * remote;
    178190};
    179191
     
    677689 *
    678690 * @param uri   the uri string describing the output format and the destination
    679  * @returns the newly created libtrace_out_t structure
     691 * @returns the newly created libtrace_out_t structure, or NULL if an error occurs
    680692 *
    681693 * @author Shane Alcock
     
    683695struct libtrace_out_t *trace_output_create(char *uri) {
    684696        struct libtrace_out_t *libtrace = malloc(sizeof(struct libtrace_out_t));
    685         struct sockaddr_in remote, client;
    686         int client_fd, clilen;
     697        struct sockaddr_in listener;
    687698        struct hostent *he;
     699        int yes = 1;
    688700       
    689701        if (init_output(&libtrace, uri) == 0)
     
    692704        switch(libtrace->outputformat) {
    693705                case RTSERVER:
     706                        FD_ZERO(&libtrace->listen);
     707                        FD_ZERO(&libtrace->rt_fds);
     708                       
    694709                        if ((he=gethostbyname(libtrace->conn_info.rt.hostname)) == NULL) {
    695710                                perror("gethostbyname");
    696711                                return 0;
    697712                        }
    698                         if ((libtrace->output.fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
     713                        if ((libtrace->connect_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
    699714                                perror("socket");
    700715                                return 0;
    701716                        }
     717                        if (setsockopt(libtrace->connect_fd,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(int)) == -1) {
     718                                perror("setsockopt");
     719                                return 0;
     720                        }
     721                        libtrace->remote = calloc(1,sizeof(struct sockaddr_in));
    702722                        // Need to set up a listening server here
    703                         bzero((char *) &remote, sizeof(remote));
    704                         remote.sin_family = AF_INET;
    705                         remote.sin_addr.s_addr = INADDR_ANY;
    706                         remote.sin_port = htons(libtrace->conn_info.rt.port);
    707 
    708                         if (bind(libtrace->output.fd, (struct sockaddr *) &remote, sizeof(remote)) < 0) {
     723                        bzero((char *) libtrace->remote, sizeof(libtrace->remote));
     724                        libtrace->remote->sin_family = AF_INET;
     725                        libtrace->remote->sin_addr.s_addr = INADDR_ANY;
     726                        libtrace->remote->sin_port = htons(libtrace->conn_info.rt.port);
     727                       
     728                        if (bind(libtrace->connect_fd, (struct sockaddr *) libtrace->remote, sizeof(struct sockaddr_in)) < 0) {
    709729                                perror("bind");
    710730                                return 0;
    711731                        }
    712                         fprintf(stderr, "Waiting for client to connect\n");
    713 
    714                         listen(libtrace->output.fd, 5);
     732                        // fprintf(stderr, "Waiting for client to connect\n");
     733
     734                        if (listen(libtrace->connect_fd, 10) == -1) {
     735                                perror("listen");
     736                                return 0;
     737                        }
     738                               
     739                        FD_SET(libtrace->connect_fd, &libtrace->listen);
     740                        libtrace->max_rtfds = libtrace->connect_fd;
     741                        /*
    715742                        clilen = sizeof(client);
    716743                        if ((client_fd = accept(libtrace->output.fd, (struct sockaddr *) &client, &clilen)) < 0) {
     
    720747                        libtrace->output.fd = client_fd;
    721748                        fprintf(stderr, "Client connected\n");                       
     749                        */
    722750                        break;
    723751
     
    798826        }
    799827        destroy_fifo(libtrace->fifo);
     828        free(libtrace->remote);
    800829        free(libtrace);
    801830}
     
    11731202        void *buffer = &buf[intsize];
    11741203        int write_required = 0;
    1175 
     1204        struct timeval tv;
     1205        int i;
     1206        int rt_fd;
     1207        int sin_size = sizeof(struct sockaddr_in);
     1208       
     1209        fd_set current;
     1210       
    11761211        assert(libtrace);
    11771212        assert(packet);
     
    11891224        if (libtrace->outputformat == RTSERVER) {
    11901225                do {
     1226                        // check for incoming connections  NOTE: Should this be inside the do { }
     1227                        tv.tv_sec = 0;
     1228                        tv.tv_usec = 10;
     1229                        current = libtrace->listen;
     1230                        do {
     1231                                if (select(libtrace->max_rtfds + 1, &current, NULL, NULL,&tv) >=0 ) {
     1232                                        break;
     1233                                }
     1234                        }
     1235                        while (errno == EINTR);
     1236                        for (i = 0; i <= libtrace->max_rtfds; i++) {
     1237                                if (FD_ISSET(i, &current)) {
     1238                                        // Got something on the listening socket
     1239                                        if (i == libtrace->connect_fd) {
     1240                                                if ((rt_fd = accept(i, (struct sockaddr *) libtrace->remote,
     1241                                                                                &sin_size)) == -1) {
     1242                                                        perror("accept");
     1243                                                } else {
     1244                                                        printf("Client connected\n");
     1245                                                        FD_SET(rt_fd, &libtrace->rt_fds);
     1246                                                        if (rt_fd > libtrace->max_rtfds)
     1247                                                                libtrace->max_rtfds = rt_fd;
     1248                                                }
     1249                                        }
     1250                                }
     1251                        }
     1252                       
    11911253                        assert(libtrace->fifo);
    11921254
     
    12171279                                continue;
    12181280                        }
    1219                         fifo_out_update(libtrace->fifo, size); 
     1281                        current = libtrace->rt_fds;
    12201282                        // Sort out the protocol header
    12211283                        memcpy(buf, &packet->status, intsize);
    1222                                                
    1223 
    1224                         // Send the buffer out on the wire
    1225                         if ((numbytes = trace_write(libtrace, buf, size + sizeof(int))) <=0 ) {
    1226                                 return numbytes;
    1227                         }
    1228 
     1284                        if (select(libtrace->max_rtfds + 1, NULL, &current, NULL, &tv) == -1 ) {
     1285                                perror("select");
     1286                                write_required = 0;
     1287                                continue;
     1288                        }
     1289                       
     1290                        // Send the data to each ready client
     1291                        for (i = 0; i <= libtrace->max_rtfds; i++) {
     1292                                if (FD_ISSET(i, &current)) {
     1293                                        libtrace->output.fd = i;
     1294                                        if ((numbytes = trace_write(libtrace, buf, size + sizeof(int))) <=0 ) {
     1295                                                // close rt_client
     1296                                                FD_CLR(i, &libtrace->rt_fds);
     1297                                                close(i);
     1298                                                numbytes = 0;
     1299                                                continue;
     1300                                        }
     1301                                }
     1302                        }
     1303
     1304                        fifo_out_update(libtrace->fifo, size);
    12291305                        // Need an ack to come back
    12301306                        // TODO: Obviously this is a little unfinished
Note: See TracChangeset for help on using the changeset viewer.