Changeset 8414770 for lib/ior-thread.c


Ignore:
Timestamp:
02/09/10 11:01:16 (12 years ago)
Author:
Shane Alcock <salcock@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, getfragoff, help, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
22a9ccc
Parents:
d026488
Message:
  • Updated licensing and documentation for all the IO reader modules
File:
1 edited

Legend:

Unmodified
Added
Removed
  • lib/ior-thread.c

    r7f2612c r8414770  
     1/*
     2 * This file is part of libtrace
     3 *
     4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
     5 * New Zealand.
     6 *
     7 * Authors: Daniel Lawson
     8 *          Perry Lorier
     9 *          Shane Alcock
     10 *         
     11 * All rights reserved.
     12 *
     13 * This code has been developed by the University of Waikato WAND
     14 * research group. For further information please see http://www.wand.net.nz/
     15 *
     16 * libtrace is free software; you can redistribute it and/or modify
     17 * it under the terms of the GNU General Public License as published by
     18 * the Free Software Foundation; either version 2 of the License, or
     19 * (at your option) any later version.
     20 *
     21 * libtrace is distributed in the hope that it will be useful,
     22 * but WITHOUT ANY WARRANTY; without even the implied warranty of
     23 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     24 * GNU General Public License for more details.
     25 *
     26 * You should have received a copy of the GNU General Public License
     27 * along with libtrace; if not, write to the Free Software
     28 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
     29 *
     30 * $Id: format_erf.c 1517 2010-02-08 01:11:04Z salcock $
     31 *
     32 */
     33
     34
    135#include "wandio.h"
    236#include <sys/types.h>
     
    943#include <errno.h>
    1044
     45/* Libtrace IO module implementing a threaded reader.
     46 *
     47 * This module enables another IO reader, called the "parent", to perform its
     48 * reading using a separate thread. The reading thread reads data into a
     49 * series of 1MB buffers. Once all the buffers are full, it waits for the
     50 * main thread to free up some of the buffers by consuming data from them. The
     51 * reading thread also uses a pthread condition to indicate to the main thread
     52 * that there is data available in the buffers.
     53 */
     54
    1155/* 1MB Buffer */
    1256#define BUFFERSIZE (1024*1024)
     
    1559extern io_source_t thread_source;
    1660
     61/* This structure defines a single buffer or "slice" */
    1762struct buffer_t {
    18         char buffer[BUFFERSIZE];
    19         int len;
    20         enum { EMPTY = 0, FULL = 1 } state;
     63        char buffer[BUFFERSIZE];        /* The buffer itself */
     64        int len;                        /* The size of the buffer */
     65        enum { EMPTY = 0, FULL = 1 } state;     /* Is the buffer in use? */
    2166};
    2267
    2368struct state_t {
     69        /* The collection of buffers (or slices) */
    2470        struct buffer_t buffer[BUFFERS];
     71        /* The index of the buffer to read into next */
    2572        int in_buffer;
     73        /* The read offset into the current buffer */
    2674        int offset;
     75        /* The reading thread */
    2776        pthread_t producer;
     77        /* Indicates that there is a free buffer to read into */
    2878        pthread_cond_t space_avail;
     79        /* Indicates that there is data in one of the buffers */
    2980        pthread_cond_t data_ready;
     81        /* The mutex for the read buffers */
    3082        pthread_mutex_t mutex;
     83        /* The parent reader */
    3184        io_t *io;
     85        /* Indicates whether the main thread is concluding */
    3286        bool closing;
    3387};
     
    3791#define min(a,b) ((a)<(b) ? (a) : (b))
    3892
     93/* The reading thread */
    3994static void *thread_producer(void* userdata)
    4095{
     
    45100        pthread_mutex_lock(&DATA(state)->mutex);
    46101        do {
     102                /* If all the buffers are full, we need to wait for one to
     103                 * become free otherwise we have nowhere to write to! */
    47104                while (DATA(state)->buffer[buffer].state == FULL) {
    48105                        if (DATA(state)->closing)
     
    51108                }
    52109
     110                /* Don't bother reading any more data if we are shutting up
     111                 * shop */
    53112                if (DATA(state)->closing) {
    54113                        break;
     
    56115                pthread_mutex_unlock(&DATA(state)->mutex);
    57116
    58                 /* Fill the buffer */
     117                /* Get the parent reader to fill the buffer */
    59118                DATA(state)->buffer[buffer].len=wandio_read(
    60119                                DATA(state)->io,
     
    66125                DATA(state)->buffer[buffer].state = FULL;
    67126
    68                 /* if we've not reached the end of the file keep going */
     127                /* If we've not reached the end of the file keep going */
    69128                running = (DATA(state)->buffer[buffer].len > 0 );
    70129
     130                /* Signal that there is data available for the main thread */
    71131                pthread_cond_signal(&DATA(state)->data_ready);
    72132
    73                 /* Flip buffers */
     133                /* Move on to the next buffer */
    74134                buffer=(buffer+1) % BUFFERS;
    75135
    76136        } while(running);
    77137
    78 
     138        /* If we reach here, it's all over so start tidying up */
    79139        wandio_destroy(DATA(state)->io);
    80140
     
    107167        DATA(state)->closing = false;
    108168
     169        /* Create the reading thread */
    109170        pthread_create(&DATA(state)->producer,NULL,thread_producer,state);
    110171
     
    120181        while(len>0) {
    121182                pthread_mutex_lock(&DATA(state)->mutex);
     183               
     184                /* Wait for the reader thread to provide us with some data */
    122185                while (INBUFFER(state).state == EMPTY) {
    123186                        pthread_cond_wait(&DATA(state)->data_ready, &DATA(state)->mutex);
    124187
    125188                }
    126 
     189               
     190                /* Check for errors and EOF */
    127191                if (INBUFFER(state).len <1) {
    128192
     
    136200                }
    137201
     202                /* Copy the next available slice into the main buffer */
    138203                slice=min( INBUFFER(state).len-DATA(state)->offset,len);
    139204
     
    153218                DATA(state)->offset+=slice;
    154219                newbuffer = DATA(state)->in_buffer;
    155 
     220               
     221                /* If we've read everything from the current slice, let the
     222                 * read thread know that there is now more space available
     223                 * and start reading from the next slice */
    156224                if (DATA(state)->offset >= INBUFFER(state).len) {
    157225                        INBUFFER(state).state = EMPTY;
Note: See TracChangeset for help on using the changeset viewer.