Changeset 417a136 for lib


Ignore:
Timestamp:
12/18/08 14:07:58 (13 years ago)
Author:
Perry Lorier <perry@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, getfragoff, help, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
398fa86
Parents:
cf30639
Message:

Use pthread_join() to tell when we've finished cleaning up

Location:
lib
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • lib/ior-thread.c

    rcf30639 r417a136  
    2929        pthread_mutex_t mutex;
    3030        bool closing;
    31         bool closed;
    3231        io_t *io;
    3332};
     
    7978        wandio_destroy(DATA(state)->io);
    8079
    81         DATA(state)->closed = true;
    8280        pthread_cond_signal(&DATA(state)->data_ready);
    8381        pthread_mutex_unlock(&DATA(state)->mutex);
     
    107105        DATA(state)->io = parent;
    108106        DATA(state)->closing = false;
    109         DATA(state)->closed = false;
    110107
    111108        pthread_create(&DATA(state)->producer,NULL,thread_producer,state);
     
    173170        DATA(io)->closing = true;
    174171        pthread_cond_signal(&DATA(io)->space_avail);
     172        pthread_mutex_unlock(&DATA(io)->mutex);
    175173
    176         /* Wait until the producer thread dies */
    177         while (!DATA(io)->closed) {
    178                 pthread_cond_wait(&DATA(io)->data_ready, &DATA(io)->mutex);
    179         }
    180         pthread_mutex_unlock(&DATA(io)->mutex);
     174        /* Wait for the thread to exit */
     175        pthread_join(&DATA(io)->producer, NULL);
    181176        free(DATA(io));
    182177        free(io);
  • lib/iow-thread.c

    rcf30639 r417a136  
    1818        int len;
    1919        enum { EMPTY = 0, FULL = 1 } state;
    20         pthread_cond_t dataready;
    21         pthread_cond_t spaceavail;
    22         pthread_mutex_t mutex;
    2320};
    2421
     
    3027        bool closing;
    3128        iow_t *iow;
     29        pthread_cond_t data_ready;
     30        pthread_cond_t space_avail;
     31        pthread_mutex_t mutex;
    3232};
    3333
     
    4242        iow_t *state = (iow_t *) userdata;
    4343
     44        pthread_mutex_lock(&DATA(state)->mutex);
    4445        do {
    45                 pthread_mutex_lock(&DATA(state)->buffer[buffer].mutex);
    4646                while (DATA(state)->buffer[buffer].state == EMPTY) {
    4747                        if (DATA(state)->closing)
    4848                                break;
    49                         pthread_cond_wait(&DATA(state)->buffer[buffer].dataready,
    50                                         &DATA(state)->buffer[buffer].mutex);
     49                        pthread_cond_wait(&DATA(state)->data_ready,
     50                                        &DATA(state)->mutex);
    5151                }
    5252                /* Empty the buffer */
     53
     54                if (DATA(state)->closing)
     55                        break;
     56
     57                pthread_mutex_unlock(&DATA(state)->mutex);
    5358                wandio_wwrite(
    5459                                DATA(state)->iow,
    5560                                DATA(state)->buffer[buffer].buffer,
    5661                                DATA(state)->buffer[buffer].len);
    57 
     62                pthread_mutex_lock(&DATA(state)->mutex);
    5863
    5964                /* if we've not reached the end of the file keep going */
     
    6267                DATA(state)->buffer[buffer].state = EMPTY;
    6368
    64                 pthread_cond_signal(&DATA(state)->buffer[buffer].spaceavail);
     69                pthread_cond_signal(&DATA(state)->space_avail);
    6570
    66                 pthread_mutex_unlock(&DATA(state)->buffer[buffer].mutex);
    6771
    6872                /* Flip buffers */
     
    7579        wandio_wdestroy(DATA(state)->iow);
    7680
     81        pthread_mutex_unlock(&DATA(state)->mutex);
    7782        return NULL;
    7883}
     
    9398        DATA(state)->out_buffer = 0;
    9499        DATA(state)->offset = 0;
    95         pthread_mutex_init(&DATA(state)->buffer[0].mutex,NULL);
    96         pthread_cond_init(&DATA(state)->buffer[0].dataready,NULL);
    97         pthread_cond_init(&DATA(state)->buffer[0].spaceavail,NULL);
     100        pthread_mutex_init(&DATA(state)->mutex,NULL);
     101        pthread_cond_init(&DATA(state)->data_ready,NULL);
     102        pthread_cond_init(&DATA(state)->space_avail,NULL);
    98103
    99104        DATA(state)->iow = child;
     
    111116        int newbuffer;
    112117
     118        pthread_mutex_lock(&DATA(state)->mutex);
    113119        while(len>0) {
    114                 pthread_mutex_lock(&OUTBUFFER(state).mutex);
    115120                while (OUTBUFFER(state).state == FULL) {
    116                         pthread_cond_wait(&OUTBUFFER(state).spaceavail,
    117                                         &OUTBUFFER(state).mutex);
     121                        pthread_cond_wait(&DATA(state)->space_avail,
     122                                        &DATA(state)->mutex);
    118123                }
    119124
     
    122127                        len);
    123128                               
     129                pthread_mutex_unlock(&DATA(state)->mutex);
    124130                memcpy(
    125131                        OUTBUFFER(state).buffer+DATA(state)->offset,
     
    127133                        slice
    128134                        );
     135                pthread_mutex_lock(&DATA(state)->mutex);
    129136
    130137                DATA(state)->offset += slice;
     
    138145                if (DATA(state)->offset >= (off_t)sizeof(OUTBUFFER(state).buffer)) {
    139146                        OUTBUFFER(state).state = FULL;
    140                         pthread_cond_signal(&OUTBUFFER(state).dataready);
     147                        pthread_cond_signal(&DATA(state)->data_ready);
    141148                        DATA(state)->offset = 0;
    142149                        newbuffer = (newbuffer+1) % BUFFERS;
    143150                }
    144151
    145                 pthread_mutex_unlock(&OUTBUFFER(state).mutex);
    146 
    147152                DATA(state)->out_buffer = newbuffer;
    148153        }
     154
     155        pthread_mutex_unlock(&DATA(state)->mutex);
    149156        return copied;
    150157}
     
    152159static void thread_wclose(iow_t *iow)
    153160{
    154         pthread_mutex_lock(&OUTBUFFER(iow).mutex);
     161        pthread_mutex_lock(&DATA(iow)->mutex);
    155162        DATA(iow)->closing = true;
    156         pthread_cond_signal(&OUTBUFFER(iow).dataready);
    157         pthread_mutex_unlock(&OUTBUFFER(iow).mutex);
     163        pthread_cond_signal(&DATA(iow)->data_ready);
     164        pthread_mutex_unlock(&DATA(iow)->mutex);
    158165        pthread_join(DATA(iow)->consumer,NULL);
    159166        free(iow->data);
Note: See TracChangeset for help on using the changeset viewer.