Changeset 7283767


Ignore:
Timestamp:
03/31/10 15:44:04 (11 years ago)
Author:
Perry Lorier <perry@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, getfragoff, help, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
14509f0
Parents:
22cdd39
Message:

Fix scheduling of flushing buffers on shutdown

File:
1 edited

Legend:

Unmodified
Added
Removed
  • lib/iow-lzo.c

    r91c38f8 r7283767  
    173173        outbuf->offset=0;
    174174
     175        memset(scratch,0,sizeof(scratch));
    175176        err=lzo1x_1_compress((void*)buffer, len,
    176177                        (void*)b2, &dst_len,
     
    223224static void *lzo_compress_thread(void *data)
    224225{
    225         struct lzothread_t *tdata = (struct lzothread_t *)data;
     226        struct lzothread_t *me = (struct lzothread_t *)data;
    226227        int err;
    227228        char namebuf[17];
     
    231232                char label[16];
    232233                namebuf[16] = '\0'; /* Make sure it's NUL terminated */
    233                 sprintf(label,"[lzo%d]",tdata->num);
     234                sprintf(label,"[lzo%d]",me->num);
    234235                /* If the filename is too long, overwrite the last few bytes */
    235236                if (strlen(namebuf)>=16-strlen(label)) {
     
    244245#endif
    245246
    246         pthread_mutex_lock(&tdata->mutex);
    247         while (!tdata->closing) {
    248                 while (tdata->state != WAITING) {
    249                         if (tdata->closing)
     247        pthread_mutex_lock(&me->mutex);
     248        while (!me->closing) {
     249                while (me->state != WAITING) {
     250                        if (me->closing)
    250251                                break;
    251                         pthread_cond_wait(&tdata->in_ready, &tdata->mutex);
     252                        pthread_cond_wait(&me->in_ready, &me->mutex);
    252253                }
    253                 if (tdata->closing)
     254                if (me->closing)
    254255                        break;
    255                 pthread_mutex_unlock(&tdata->mutex);
    256256
    257257                err=lzo_wwrite_block(
    258                         tdata->inbuf.buffer,
    259                         tdata->inbuf.offset,
    260                         &tdata->outbuf);
    261 
    262                 pthread_mutex_lock(&tdata->mutex);
    263                 tdata->state = FULL;
    264                 pthread_cond_signal(&tdata->out_ready);
    265         }
    266         pthread_mutex_unlock(&tdata->mutex);
     258                        me->inbuf.buffer,
     259                        me->inbuf.offset,
     260                        &me->outbuf);
     261
     262                /* Make sure someone else hasn't clobbered us!*/
     263                assert(me->state == WAITING);
     264                me->state = FULL;
     265                pthread_cond_signal(&me->out_ready);
     266        }
     267        pthread_mutex_unlock(&me->mutex);
    267268
    268269        return NULL;
     
    368369static off_t lzo_wwrite(iow_t *iow, const char *buffer, off_t len)
    369370{
    370         /* lzo can only deal with blocks up to 256k */
    371371        off_t ret = 0;
    372372        while (len>0) {
    373                 unsigned int size = min(len, MAX_BLOCK_SIZE);
     373                unsigned int size = len;
    374374                off_t err;
    375375                struct buffer_t outbuf;
    376376
    377                 if (!use_threads) {
     377                if (!DATA(iow)->threads) {
     378                        size = min(len, MAX_BLOCK_SIZE);
    378379                        err=lzo_wwrite_block(buffer, size, &outbuf);
    379 
    380380                        /* Flush the data out */
    381381                        wandio_wwrite(DATA(iow)->child,
     
    398398                }
    399399                else {
     400                        off_t space;
     401
    400402                        pthread_mutex_lock(&get_next_thread(iow)->mutex);
    401                                
    402403                        /* If this thread is still compressing, wait for it to finish */
    403404                        while (get_next_thread(iow)->state == WAITING) {
     
    409410                        /* Flush any data out thats there */
    410411                        if (get_next_thread(iow)->state == FULL) {
     412                                assert(get_next_thread(iow)->outbuf.offset
     413                                                < sizeof(get_next_thread(iow)->outbuf.buffer));
    411414                                wandio_wwrite(DATA(iow)->child,
    412415                                                get_next_thread(iow)->outbuf.buffer,
     
    416419                        }
    417420
     421                        assert(get_next_thread(iow)->state == EMPTY);
     422
    418423                        /* Figure out how much space we can copy into this buffer */
    419                         size = min(
    420                                 sizeof(get_next_thread(iow)->inbuf.buffer)-get_next_thread(iow)->inbuf.offset,
    421                                 size);
     424                        assert(MAX_BLOCK_SIZE <= sizeof(get_next_thread(iow)->inbuf.buffer));
     425                        space = MAX_BLOCK_SIZE-get_next_thread(iow)->inbuf.offset;
     426                        size = min(space, size);
     427
     428                        assert(size>0);
     429                        assert(size <= MAX_BLOCK_SIZE);
     430                        assert(get_next_thread(iow)->inbuf.offset + size <= MAX_BLOCK_SIZE);
    422431
    423432                        /* Move our data in */
     
    432441                        if (get_next_thread(iow)->inbuf.offset >= sizeof(get_next_thread(iow)->inbuf.buffer)
    433442                          ||get_next_thread(iow)->inbuf.offset >= MAX_BLOCK_SIZE) {
     443                                assert(get_next_thread(iow)->state == EMPTY);
    434444                                get_next_thread(iow)->state = WAITING;
    435445                                pthread_cond_signal(&get_next_thread(iow)->in_ready);
     446
    436447                                pthread_mutex_unlock(&get_next_thread(iow)->mutex);
    437448
     
    454465        pthread_mutex_lock(&thread->mutex);
    455466
    456         if (thread->state == EMPTY) {
    457                 /* Partially full buffer, force a flush */
    458                 if (thread->inbuf.offset != 0) {
    459                         thread->state = WAITING;
    460                         pthread_cond_signal(&thread->in_ready);
    461                 }
    462         }
     467        /* If this buffer is empty it shouldn't have any data in it, we should have taken
     468         * care of that before.
     469         */
     470        /* thread->state == EMPTY implies thread->inbuf.offset == 0 */
     471        assert(!(thread->state == EMPTY) || thread->inbuf.offset == 0);
     472
    463473        while (thread->state == WAITING) {
    464474                pthread_cond_wait(
     
    474484        }
    475485        /* Now the thread should be empty, so ask it to shut down */
     486        assert(thread->state == EMPTY && thread->inbuf.offset == 0);
    476487        thread->closing = true;
    477488        pthread_cond_signal(&thread->in_ready);
     
    486497        int i;
    487498
     499        /* Flush the last buffer */
     500        pthread_mutex_lock(&get_next_thread(iow)->mutex);
     501        if (get_next_thread(iow)->state == EMPTY && get_next_thread(iow)->inbuf.offset != 0) {
     502                get_next_thread(iow)->state = WAITING;
     503                pthread_cond_signal(&get_next_thread(iow)->in_ready);
     504        }
     505        pthread_mutex_unlock(&get_next_thread(iow)->mutex);
     506
     507        DATA(iow)->next_thread =
     508                        (DATA(iow)->next_thread+1) % DATA(iow)->threads;
     509
    488510        /* Right, now we have to shutdown all our threads -- in order */
    489511        for(i=DATA(iow)->next_thread; i<DATA(iow)->threads; ++i) {
Note: See TracChangeset for help on using the changeset viewer.