Changeset 7283767
- Timestamp:
- 03/31/10 15:44:04 (11 years ago)
- Branches:
- 4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, getfragoff, help, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
- Children:
- 14509f0
- Parents:
- 22cdd39
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
lib/iow-lzo.c
r91c38f8 r7283767 173 173 outbuf->offset=0; 174 174 175 memset(scratch,0,sizeof(scratch)); 175 176 err=lzo1x_1_compress((void*)buffer, len, 176 177 (void*)b2, &dst_len, … … 223 224 static void *lzo_compress_thread(void *data) 224 225 { 225 struct lzothread_t * tdata= (struct lzothread_t *)data;226 struct lzothread_t *me = (struct lzothread_t *)data; 226 227 int err; 227 228 char namebuf[17]; … … 231 232 char label[16]; 232 233 namebuf[16] = '\0'; /* Make sure it's NUL terminated */ 233 sprintf(label,"[lzo%d]", tdata->num);234 sprintf(label,"[lzo%d]",me->num); 234 235 /* If the filename is too long, overwrite the last few bytes */ 235 236 if (strlen(namebuf)>=16-strlen(label)) { … … 244 245 #endif 245 246 246 pthread_mutex_lock(& tdata->mutex);247 while (! tdata->closing) {248 while ( tdata->state != WAITING) {249 if ( tdata->closing)247 pthread_mutex_lock(&me->mutex); 248 while (!me->closing) { 249 while (me->state != WAITING) { 250 if (me->closing) 250 251 break; 251 pthread_cond_wait(& tdata->in_ready, &tdata->mutex);252 pthread_cond_wait(&me->in_ready, &me->mutex); 252 253 } 253 if ( tdata->closing)254 if (me->closing) 254 255 break; 255 pthread_mutex_unlock(&tdata->mutex);256 256 257 257 err=lzo_wwrite_block( 258 tdata->inbuf.buffer, 259 tdata->inbuf.offset, 260 &tdata->outbuf); 261 262 pthread_mutex_lock(&tdata->mutex); 263 tdata->state = FULL; 264 pthread_cond_signal(&tdata->out_ready); 265 } 266 pthread_mutex_unlock(&tdata->mutex); 258 me->inbuf.buffer, 259 me->inbuf.offset, 260 &me->outbuf); 261 262 /* Make sure someone else hasn't clobbered us!*/ 263 assert(me->state == WAITING); 264 me->state = FULL; 265 pthread_cond_signal(&me->out_ready); 266 } 267 pthread_mutex_unlock(&me->mutex); 267 268 268 269 return NULL; … … 368 369 static off_t lzo_wwrite(iow_t *iow, const char *buffer, off_t len) 369 370 { 370 /* lzo can only deal with blocks up to 256k */371 371 off_t ret = 0; 372 372 while (len>0) { 373 unsigned int size = min(len, MAX_BLOCK_SIZE);373 unsigned int size = len; 374 374 off_t err; 375 375 struct buffer_t outbuf; 376 376 377 if (!use_threads) { 377 if (!DATA(iow)->threads) { 378 size = min(len, MAX_BLOCK_SIZE); 378 379 err=lzo_wwrite_block(buffer, size, &outbuf); 379 380 380 /* Flush the data out */ 381 381 wandio_wwrite(DATA(iow)->child, … … 398 398 } 399 399 else { 400 off_t space; 401 400 402 pthread_mutex_lock(&get_next_thread(iow)->mutex); 401 402 403 /* If this thread is still compressing, wait for it to finish */ 403 404 while (get_next_thread(iow)->state == WAITING) { … … 409 410 /* Flush any data out thats there */ 410 411 if (get_next_thread(iow)->state == FULL) { 412 assert(get_next_thread(iow)->outbuf.offset 413 < sizeof(get_next_thread(iow)->outbuf.buffer)); 411 414 wandio_wwrite(DATA(iow)->child, 412 415 get_next_thread(iow)->outbuf.buffer, … … 416 419 } 417 420 421 assert(get_next_thread(iow)->state == EMPTY); 422 418 423 /* Figure out how much space we can copy into this buffer */ 419 size = min( 420 sizeof(get_next_thread(iow)->inbuf.buffer)-get_next_thread(iow)->inbuf.offset, 421 size); 424 assert(MAX_BLOCK_SIZE <= sizeof(get_next_thread(iow)->inbuf.buffer)); 425 space = MAX_BLOCK_SIZE-get_next_thread(iow)->inbuf.offset; 426 size = min(space, size); 427 428 assert(size>0); 429 assert(size <= MAX_BLOCK_SIZE); 430 assert(get_next_thread(iow)->inbuf.offset + size <= MAX_BLOCK_SIZE); 422 431 423 432 /* Move our data in */ … … 432 441 if (get_next_thread(iow)->inbuf.offset >= sizeof(get_next_thread(iow)->inbuf.buffer) 433 442 ||get_next_thread(iow)->inbuf.offset >= MAX_BLOCK_SIZE) { 443 assert(get_next_thread(iow)->state == EMPTY); 434 444 get_next_thread(iow)->state = WAITING; 435 445 pthread_cond_signal(&get_next_thread(iow)->in_ready); 446 436 447 pthread_mutex_unlock(&get_next_thread(iow)->mutex); 437 448 … … 454 465 pthread_mutex_lock(&thread->mutex); 455 466 456 if (thread->state == EMPTY) { 457 /* Partially full buffer, force a flush */ 458 if (thread->inbuf.offset != 0) { 459 thread->state = WAITING; 460 pthread_cond_signal(&thread->in_ready); 461 } 462 } 467 /* If this buffer is empty it shouldn't have any data in it, we should have taken 468 * care of that before. 469 */ 470 /* thread->state == EMPTY implies thread->inbuf.offset == 0 */ 471 assert(!(thread->state == EMPTY) || thread->inbuf.offset == 0); 472 463 473 while (thread->state == WAITING) { 464 474 pthread_cond_wait( … … 474 484 } 475 485 /* Now the thread should be empty, so ask it to shut down */ 486 assert(thread->state == EMPTY && thread->inbuf.offset == 0); 476 487 thread->closing = true; 477 488 pthread_cond_signal(&thread->in_ready); … … 486 497 int i; 487 498 499 /* Flush the last buffer */ 500 pthread_mutex_lock(&get_next_thread(iow)->mutex); 501 if (get_next_thread(iow)->state == EMPTY && get_next_thread(iow)->inbuf.offset != 0) { 502 get_next_thread(iow)->state = WAITING; 503 pthread_cond_signal(&get_next_thread(iow)->in_ready); 504 } 505 pthread_mutex_unlock(&get_next_thread(iow)->mutex); 506 507 DATA(iow)->next_thread = 508 (DATA(iow)->next_thread+1) % DATA(iow)->threads; 509 488 510 /* Right, now we have to shutdown all our threads -- in order */ 489 511 for(i=DATA(iow)->next_thread; i<DATA(iow)->threads; ++i) {
Note: See TracChangeset
for help on using the changeset viewer.