Changeset cf30639


Ignore:
Timestamp:
12/17/08 15:50:25 (12 years ago)
Author:
Perry Lorier <perry@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, getfragoff, help, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
417a136
Parents:
d4a1691
Message:

Threading cleanups (and some memory leaks addressed)

Location:
lib
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • lib/ior-stdio.c

    rc66068d rcf30639  
    5252{
    5353        close(DATA(io)->fd);
     54        free(io->data);
    5455        free(io);
    5556}
  • lib/ior-thread.c

    rc66068d rcf30639  
    1818        int len;
    1919        enum { EMPTY = 0, FULL = 1 } state;
    20         pthread_cond_t dataready;
    21         pthread_cond_t spaceavail;
    22         pthread_mutex_t mutex;
    2320};
    2421
     
    2825        int offset;
    2926        pthread_t producer;
     27        pthread_cond_t space_avail;
     28        pthread_cond_t data_ready;
     29        pthread_mutex_t mutex;
    3030        bool closing;
     31        bool closed;
    3132        io_t *io;
    3233};
     
    4243        bool running = true;
    4344
     45        pthread_mutex_lock(&DATA(state)->mutex);
    4446        do {
    45                 pthread_mutex_lock(&DATA(state)->buffer[buffer].mutex);
    4647                while (DATA(state)->buffer[buffer].state == FULL) {
    4748                        if (DATA(state)->closing)
    4849                                break;
    49                         pthread_cond_wait(&DATA(state)->buffer[buffer].spaceavail,
    50                                         &DATA(state)->buffer[buffer].mutex);
     50                        pthread_cond_wait(&DATA(state)->space_avail, &DATA(state)->mutex);
    5151                }
     52
     53                if (DATA(state)->closing) {
     54                        break;
     55                }
     56                pthread_mutex_unlock(&DATA(state)->mutex);
     57
    5258                /* Fill the buffer */
    5359                DATA(state)->buffer[buffer].len=wandio_read(
     
    5662                                sizeof(DATA(state)->buffer[buffer].buffer));
    5763
     64                pthread_mutex_lock(&DATA(state)->mutex);
     65
    5866                DATA(state)->buffer[buffer].state = FULL;
    5967
     
    6169                running = (DATA(state)->buffer[buffer].len > 0 );
    6270
    63                 pthread_cond_signal(&DATA(state)->buffer[buffer].dataready);
    64 
    65                 pthread_mutex_unlock(&DATA(state)->buffer[buffer].mutex);
     71                pthread_cond_signal(&DATA(state)->data_ready);
    6672
    6773                /* Flip buffers */
     
    7076        } while(running);
    7177
     78
    7279        wandio_destroy(DATA(state)->io);
     80
     81        DATA(state)->closed = true;
     82        pthread_cond_signal(&DATA(state)->data_ready);
     83        pthread_mutex_unlock(&DATA(state)->mutex);
    7384
    7485        return NULL;
     
    90101        DATA(state)->in_buffer = 0;
    91102        DATA(state)->offset = 0;
    92         pthread_mutex_init(&DATA(state)->buffer[0].mutex,NULL);
    93         pthread_cond_init(&DATA(state)->buffer[0].dataready,NULL);
    94         pthread_cond_init(&DATA(state)->buffer[0].spaceavail,NULL);
     103        pthread_mutex_init(&DATA(state)->mutex,NULL);
     104        pthread_cond_init(&DATA(state)->data_ready,NULL);
     105        pthread_cond_init(&DATA(state)->space_avail,NULL);
    95106
    96107        DATA(state)->io = parent;
    97108        DATA(state)->closing = false;
     109        DATA(state)->closed = false;
    98110
    99111        pthread_create(&DATA(state)->producer,NULL,thread_producer,state);
     
    109121
    110122        while(len>0) {
    111                 pthread_mutex_lock(&INBUFFER(state).mutex);
     123                pthread_mutex_lock(&DATA(state)->mutex);
    112124                while (INBUFFER(state).state == EMPTY) {
    113                         pthread_cond_wait(&INBUFFER(state).dataready,
    114                                         &INBUFFER(state).mutex);
     125                        pthread_cond_wait(&DATA(state)->data_ready, &DATA(state)->mutex);
    115126
    116127                }
     
    121132                                copied = INBUFFER(state).len;
    122133
    123                         pthread_mutex_unlock(&INBUFFER(state).mutex);
     134                        pthread_mutex_unlock(&DATA(state)->mutex);
    124135                        return copied;
    125136                }
    126137
    127138                slice=min( INBUFFER(state).len-DATA(state)->offset,len);
     139
     140                pthread_mutex_unlock(&DATA(state)->mutex);
    128141                               
    129142                memcpy(
     
    136149                len-=slice;
    137150                copied+=slice;
     151
     152                pthread_mutex_lock(&DATA(state)->mutex);
    138153                DATA(state)->offset+=slice;
    139154                newbuffer = DATA(state)->in_buffer;
     
    141156                if (DATA(state)->offset >= INBUFFER(state).len) {
    142157                        INBUFFER(state).state = EMPTY;
    143                         pthread_cond_signal(&INBUFFER(state).spaceavail);
     158                        pthread_cond_signal(&DATA(state)->space_avail);
    144159                        newbuffer = (newbuffer+1) % BUFFERS;
    145160                        DATA(state)->offset = 0;
    146161                }
    147162
    148                 pthread_mutex_unlock(&INBUFFER(state).mutex);
     163                pthread_mutex_unlock(&DATA(state)->mutex);
    149164
    150165                DATA(state)->in_buffer = newbuffer;
     
    155170static void thread_close(io_t *io)
    156171{
    157         pthread_mutex_lock(&INBUFFER(io).mutex);
     172        pthread_mutex_lock(&DATA(io)->mutex);
    158173        DATA(io)->closing = true;
    159         pthread_cond_signal(&INBUFFER(io).spaceavail);
    160         pthread_mutex_unlock(&INBUFFER(io).mutex);
     174        pthread_cond_signal(&DATA(io)->space_avail);
     175
     176        /* Wait until the producer thread dies */
     177        while (!DATA(io)->closed) {
     178                pthread_cond_wait(&DATA(io)->data_ready, &DATA(io)->mutex);
     179        }
     180        pthread_mutex_unlock(&DATA(io)->mutex);
     181        free(DATA(io));
    161182        free(io);
    162183}
  • lib/iow-thread.c

    rc66068d rcf30639  
    157157        pthread_mutex_unlock(&OUTBUFFER(iow).mutex);
    158158        pthread_join(DATA(iow)->consumer,NULL);
     159        free(iow->data);
    159160        free(iow);
    160161}
  • lib/wandio.c

    rc66068d rcf30639  
    1818        io_t *io = peek_open(stdio_open(filename));
    1919        char buffer[1024];
     20        int len;
    2021        if (!io)
    2122                return NULL;
    22         wandio_peek(io, buffer, sizeof(buffer));
     23        len = wandio_peek(io, buffer, sizeof(buffer));
    2324#if HAVE_LIBZ
    2425        /* auto detect gzip compressed data */
    25         if (buffer[0] == '\037' && buffer[1] == '\213') {
     26        if (len>=2 && buffer[0] == '\037' && buffer[1] == '\213') {
    2627                io = zlib_open(io);
    2728        }
    2829        /* auto detect compress(1) compressed data (gzip can read this) */
    29         if (buffer[0] == '\037' && buffer[1] == '\235') {
     30        if (len>=2 && buffer[0] == '\037' && buffer[1] == '\235') {
    3031                io = zlib_open(io);
    3132        }
     
    3334#if HAVE_LIBBZ2
    3435        /* auto detect bzip compressed data */
    35         else if (buffer[0] == 'B' && buffer[1] == 'Z' && buffer[2] == 'h') {
     36        if (len>=3 && buffer[0] == 'B' && buffer[1] == 'Z' && buffer[2] == 'h') {
    3637                io = bz_open(io);
    3738        }
Note: See TracChangeset for help on using the changeset viewer.