Changeset 9d6452b


Ignore:
Timestamp:
03/15/10 14:38:45 (11 years ago)
Author:
Perry Lorier <perry@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, getfragoff, help, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
91c38f8
Parents:
6f8745b
Message:

Add support for compressing each block in a seperate thread.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • lib/iow-lzo.c

    r5e26f1d r9d6452b  
    2727 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
    2828 *
    29  * $Id: iow-zlib.c 1521 2010-02-08 22:21:16Z salcock $
     29 * $Id: iow-lzo.c 1521 2010-02-08 22:21:16Z salcock $
    3030 *
    3131 */
    3232
     33/* This writes out lzo files in the same format as lzop does.  It's not as
     34 * flexible as lzop in an attempt to try and create a very fast method for
     35 * writing data out.
     36 *
     37 * Data is written out in blocks, and the blocks are all compressed in seperate
     38 * independant threads (if possible), thus letting you use multicore cpu's to
     39 * get compression for the absolute least amount of walltime while capturing.
     40 */
    3341
    3442#include <lzo/lzo1x.h>
    3543#include "wandio.h"
     44#include "config.h"
    3645#include <sys/types.h>
    3746#include <sys/stat.h>
     
    4453#include <inttypes.h>
    4554#include <arpa/inet.h>
     55#include <pthread.h>
     56#include <unistd.h> /* for sysconf */
     57#include <stdbool.h>
     58#ifdef HAVE_SYS_PRCTL_H
     59#include <sys/prctl.h>
     60#endif
    4661
    4762enum {
     
    5772};
    5873
    59 const int F_OS_UNIX   = 0x03000000L;
    60 const int F_OS_MASK   = 0xff000000L;
    61 
    62 const int F_CS_NATIVE = 0x00000000L;
    63 const int F_CS_MASK   = 0x00f00000L;
    64 
    65 const int F_H_CRC32   = 0x00001000L;
    66 const int F_ADLER32_D = 0x00000001L;
    67 const int F_ADLER32_C = 0x00000002L;
     74static const int F_OS_UNIX   = 0x03000000L;
     75static const int F_OS_MASK   = 0xff000000L;
     76
     77static const int F_CS_NATIVE = 0x00000000L;
     78static const int F_CS_MASK   = 0x00f00000L;
     79
     80static const int F_H_CRC32   = 0x00001000L;
     81static const int F_ADLER32_D = 0x00000001L;
     82static const int F_ADLER32_C = 0x00000002L;
     83
     84/* popquiz! You throught "static const int" would be well constant didn't you?
     85 * You'd be wrong, you can't use them in places where the compiler needs a
     86 * constant, so you need to use an enum, since enums /are/ constant the compiler
     87 * will let you use them as such.  Sigh.
     88 */
     89enum { MAX_BLOCK_SIZE = 128*1024 }; /* lzop can only decompress blocks
     90                                        this large */
     91
     92/* According to lzop lzo can increase the data to this size, so save this
     93 * much space in our buffers
     94 */
     95enum { MAX_BUFFER_SIZE = MAX_BLOCK_SIZE+MAX_BLOCK_SIZE/16+64+3 };
    6896
    6997static const unsigned char lzop_magic[9] =
     
    82110const int CRC32_INIT_VALUE = 0;
    83111
     112struct buffer_t {
     113        unsigned int offset;
     114        char buffer[MAX_BUFFER_SIZE];
     115};
     116
     117struct lzothread_t {
     118        pthread_t thread;
     119        pthread_cond_t in_ready;
     120        pthread_cond_t out_ready;
     121        pthread_mutex_t mutex;
     122        bool closing;
     123        enum { EMPTY, WAITING, FULL } state;
     124        int num;
     125        struct buffer_t inbuf;
     126        struct buffer_t outbuf;
     127};
     128
    84129struct lzow_t {
    85         uint8_t outbuff[1024*1024];
    86130        iow_t *child;
    87131        enum err_t err;
    88         int inoffset;
    89         int outoffset;
    90         void *buffer;
     132        int threads;
     133        int next_thread;
     134        struct lzothread_t *thread;
    91135};
    92136
     
    96140#define min(a,b) ((a)<(b) ? (a) : (b))
    97141
    98 static void write_buf(iow_t *iow,const void *data, size_t len)
    99 {
    100         assert(DATA(iow)->outoffset + len < sizeof(DATA(iow)->outbuff) && "Exceeded output buffer size in lzo compressor");
    101         memcpy(&DATA(iow)->outbuff[DATA(iow)->outoffset], data, len);
    102         DATA(iow)->outoffset += len;
    103 }
    104 
    105 static void write32(iow_t *iow, uint32_t value)
     142static void write_buf(struct buffer_t *buffer,const void *data, size_t len)
     143{
     144        assert(buffer->offset + len < sizeof(buffer->buffer) && "Exceeded output buffer size in lzo compressor");
     145        memcpy(&buffer->buffer[buffer->offset], data, len);
     146        buffer->offset += len;
     147}
     148
     149static void write32(struct buffer_t *buffer, uint32_t value)
    106150{
    107151        value = htonl(value);
    108         write_buf(iow, &value, sizeof(value));
    109 }
    110 
    111 static void write16(iow_t *iow, uint16_t value)
     152        write_buf(buffer, &value, sizeof(value));
     153}
     154
     155static void write16(struct buffer_t *buffer, uint16_t value)
    112156{
    113157        value = htons(value);
    114         write_buf(iow, &value, sizeof(value));
    115 }
    116 
    117 static void write8(iow_t *iow, uint8_t value)
    118 {
    119         write_buf(iow, &value, sizeof(value));
     158        write_buf(buffer, &value, sizeof(value));
     159}
     160
     161static void write8(struct buffer_t *buffer, uint8_t value)
     162{
     163        write_buf(buffer, &value, sizeof(value));
     164}
     165
     166static int lzo_wwrite_block(const char *buffer, off_t len, struct buffer_t *outbuf)
     167{
     168        char b2[MAX_BUFFER_SIZE];
     169        int err;
     170        lzo_uint dst_len;
     171        char scratch[LZO1X_1_MEM_COMPRESS];
     172
     173        outbuf->offset=0;
     174
     175        err=lzo1x_1_compress((void*)buffer, len,
     176                        (void*)b2, &dst_len,
     177                        scratch);
     178
     179        switch(err) {
     180                case LZO_E_OK:
     181                        break;
     182                case LZO_E_ERROR:
     183                        return -EINVAL; /* WTF? */
     184                case LZO_E_OUT_OF_MEMORY:
     185                        return -ENOMEM; /* Uh oh */
     186                case LZO_E_NOT_COMPRESSIBLE:
     187                        return -EINVAL; /* Claimed not to be used, dunno what we'll do */
     188                case LZO_E_INPUT_OVERRUN:
     189                        return -EINVAL;  /* Can't happen on compress? */
     190                case LZO_E_OUTPUT_OVERRUN:
     191                        return -ENOMEM;
     192                case LZO_E_LOOKBEHIND_OVERRUN:
     193                        return -EINVAL;
     194                case LZO_E_EOF_NOT_FOUND:
     195                        return -ENOENT; /* Can't happen on compress? */
     196                case LZO_E_INPUT_NOT_CONSUMED:
     197                        return -EINVAL;
     198                case LZO_E_NOT_YET_IMPLEMENTED:
     199                        return -ENOSYS;
     200                default:
     201                        fprintf(stderr,"Unknown lzo error %d\n",err);
     202                        return -EINVAL;
     203        }
     204
     205        write32(outbuf, len); /* Original length */
     206        write32(outbuf, min((uint32_t)len,(uint32_t)dst_len));
     207        /* CRC32 of the uncompressed buffer */
     208#if 0
     209        write32(outbuf, lzo_crc32(CRC32_INIT_VALUE, (void*)buffer, len));
     210#endif
     211        write32(outbuf,
     212                lzo_adler32(ADLER32_INIT_VALUE, (const void*)buffer, len));
     213        write_buf(outbuf, b2, dst_len);
     214
     215        /* Return the number of bytes compressed */
     216        return len;
     217}
     218
     219/* There is one of these threads per core in a machine.  This compresses
     220 * a block of data and returns it, the main thread tehn is responsible to
     221 * write these back out in the right order.
     222 */
     223static void *lzo_compress_thread(void *data)
     224{
     225        struct lzothread_t *tdata = (struct lzothread_t *)data;
     226        int err;
     227        char namebuf[17];
     228
     229#ifdef PR_SET_NAME
     230        if (prctl(PR_GET_NAME, namebuf, 0,0,0) == 0) {
     231                char label[16];
     232                namebuf[16] = '\0'; /* Make sure it's NUL terminated */
     233                sprintf(label,"[lzo%d]",tdata->num);
     234                /* If the filename is too long, overwrite the last few bytes */
     235                if (strlen(namebuf)>=16-strlen(label)) {
     236                        strcpy(namebuf+15-strlen(label),label);
     237                }
     238                else {
     239                        strncat(namebuf," ",16);
     240                        strncat(namebuf,label,16);
     241                }
     242                prctl(PR_SET_NAME, namebuf, 0,0,0);
     243        }
     244#endif
     245
     246        pthread_mutex_lock(&tdata->mutex);
     247        while (!tdata->closing) {
     248                while (tdata->state != WAITING) {
     249                        if (tdata->closing)
     250                                break;
     251                        pthread_cond_wait(&tdata->in_ready, &tdata->mutex);
     252                }
     253                if (tdata->closing)
     254                        break;
     255                pthread_mutex_unlock(&tdata->mutex);
     256
     257                err=lzo_wwrite_block(
     258                        tdata->inbuf.buffer,
     259                        tdata->inbuf.offset,
     260                        &tdata->outbuf);
     261
     262                tdata->state = FULL;
     263                pthread_cond_signal(&tdata->out_ready);
     264                pthread_mutex_lock(&tdata->mutex);
     265        }
     266        pthread_mutex_unlock(&tdata->mutex);
     267
     268        return NULL;
    120269}
    121270
     
    125274        int flags;
    126275        iow_t *iow;
     276        struct buffer_t buffer;
     277        buffer.offset=0;
     278        int i;
    127279
    128280        if (!child)
     
    140292        DATA(iow)->child = child;
    141293        DATA(iow)->err = ERR_OK;
    142 
    143         DATA(iow)->outoffset = 0;
    144         DATA(iow)->buffer = malloc(LZO1X_1_MEM_COMPRESS);
    145 
    146294
    147295        flags = 0;
     
    154302        /* flags |= F_H_CRC32; */
    155303
    156         write_buf(iow, lzop_magic, sizeof(lzop_magic));
    157         write16(iow, 0x1010 &0xFFFF); /* version: pretend to be LZOP version 0x1010 from lzop's version.h */
    158         write16(iow, lzo_version() & 0xFFFF); /* libversion */
    159         write16(iow, opt_filter ? 0x0950 : 0x0940); /* version needed to extract */
    160         write8(iow, M_LZO1X_1); /* method */
    161         write8(iow, 5); /* level */
    162         write32(iow, flags); /* flags */
     304        write_buf(&buffer, lzop_magic, sizeof(lzop_magic));
     305        write16(&buffer, 0x1010 &0xFFFF); /* version: pretend to be LZOP version 0x1010 from lzop's version.h */
     306        write16(&buffer, lzo_version() & 0xFFFF); /* libversion */
     307        write16(&buffer, opt_filter ? 0x0950 : 0x0940); /* version needed to extract */
     308        write8(&buffer, M_LZO1X_1);     /* method */
     309        write8(&buffer, 5); /* level */
     310        write32(&buffer, flags); /* flags */
    163311        /* if (flags & F_H_FILTER)
    164312                write32(iow, opt_filter);
    165313        */
    166         write32(iow, 0x600); /* mode: We assume traces may be sensitive */
    167         write32(iow, time(NULL)); /* mtime */
    168         write32(iow, 0); /* GMTdiff */
     314        write32(&buffer, 0x600); /* mode: We assume traces may be sensitive */
     315        write32(&buffer, time(NULL)); /* mtime */
     316        write32(&buffer, 0); /* GMTdiff */
    169317
    170318        /* Length, filename */
    171         write8(iow, strlen("compresseddata"));
    172         write_buf(iow, "compresseddata",strlen("compresseddata"));
     319        write8(&buffer, strlen("compresseddata"));
     320        write_buf(&buffer, "compresseddata",strlen("compresseddata"));
    173321
    174322        if (flags & F_H_CRC32) {
    175                 write32(iow, lzo_crc32(CRC32_INIT_VALUE, DATA(iow)->outbuff, DATA(iow)->outoffset));
     323                write32(&buffer,
     324                        lzo_crc32(CRC32_INIT_VALUE,
     325                                (const void*)buffer.buffer+sizeof(lzop_magic),
     326                                buffer.offset-sizeof(lzop_magic)));
    176327        }
    177328        else {
    178329                uint32_t chksum=lzo_adler32(
    179330                        ADLER32_INIT_VALUE,
    180                         DATA(iow)->outbuff+sizeof(lzop_magic),
    181                         DATA(iow)->outoffset-sizeof(lzop_magic));
    182                 fprintf(stderr,"writing adler32 checksum (%08x)\n",chksum);
    183                 write32(iow, chksum);
     331                        (const void *)buffer.buffer+sizeof(lzop_magic),
     332                        buffer.offset-sizeof(lzop_magic));
     333                write32(&buffer, chksum);
    184334        }
    185335
    186336        wandio_wwrite(DATA(iow)->child,
    187                 (char *)DATA(iow)->outbuff,
    188                 DATA(iow)->outoffset);
    189         DATA(iow)->outoffset = 0;
     337                buffer.buffer,
     338                buffer.offset);
     339
     340        /* Set up the thread pool -- one thread per core */
     341        DATA(iow)->threads = min(sysconf(_SC_NPROCESSORS_ONLN),use_threads);
     342        DATA(iow)->thread = malloc(
     343                        sizeof(struct lzothread_t) * DATA(iow)->threads);
     344        DATA(iow)->next_thread = 0;
     345        for(i=0; i<DATA(iow)->threads; ++i) {
     346                pthread_cond_init(&DATA(iow)->thread[i].in_ready, NULL);
     347                pthread_cond_init(&DATA(iow)->thread[i].out_ready, NULL);
     348                pthread_mutex_init(&DATA(iow)->thread[i].mutex, NULL);
     349                DATA(iow)->thread[i].closing = false;
     350                DATA(iow)->thread[i].num = i;
     351                DATA(iow)->thread[i].state = EMPTY;
     352                DATA(iow)->thread[i].inbuf.offset = 0;
     353
     354                pthread_create(&DATA(iow)->thread[i].thread,
     355                                NULL,
     356                                lzo_compress_thread,
     357                                (void*)&DATA(iow)->thread[i]);
     358        }
    190359
    191360        return iow;
    192361}
    193362
    194 static off_t lzo_wwrite_block(iow_t *iow, const char *buffer, off_t len)
    195 {
    196         char b2[1024*1024];
    197         int err;
    198         lzo_uint dst_len;
    199        
    200         if (DATA(iow)->err == ERR_EOF) {
    201                 return 0; /* EOF */
    202         }
    203         if (DATA(iow)->err == ERR_ERROR) {
    204                 return -1; /* ERROR! */
    205         }
    206 
    207         err=lzo1x_1_compress((void*)buffer, len, (void*)b2, &dst_len, DATA(iow)->buffer);
    208 
    209         switch(err) {
    210                 case LZO_E_OK:
    211                         break;
    212                 case LZO_E_ERROR:
    213                         DATA(iow)->err = EINVAL; /* "WTF?" */
    214                         break;
    215                 case LZO_E_OUT_OF_MEMORY:
    216                         DATA(iow)->err = ENOMEM;
    217                         break;
    218                 case LZO_E_NOT_COMPRESSIBLE:
    219                         DATA(iow)->err = EINVAL; /* Claimed not to be used, dunno what we'll do */
    220                         break;
    221                 case LZO_E_INPUT_OVERRUN:
    222                         DATA(iow)->err = EINVAL;
    223                         break;
    224                 case LZO_E_OUTPUT_OVERRUN:
    225                         DATA(iow)->err = ENOMEM;
    226                         break;
    227                 case LZO_E_LOOKBEHIND_OVERRUN:
    228                         DATA(iow)->err = EINVAL;
    229                         break;
    230                 case LZO_E_EOF_NOT_FOUND:
    231                         DATA(iow)->err = ENOENT;
    232                         break;
    233                 case LZO_E_INPUT_NOT_CONSUMED:
    234                         DATA(iow)->err = EINVAL;
    235                         break;
    236                 case LZO_E_NOT_YET_IMPLEMENTED:
    237                         DATA(iow)->err = ENOSYS;
    238                         break;
    239                 default:
    240                         fprintf(stderr,"Unknown lzo error %d\n",err);
    241                         DATA(iow)->err = EINVAL;
    242                         break;
    243         }
    244 
    245         write32(iow, len); /* Original length */
    246         write32(iow, min((uint32_t)len,(uint32_t)dst_len));
    247         /* CRC32 of the uncompressed buffer */
    248 #if 0
    249         write32(iow, lzo_crc32(CRC32_INIT_VALUE, (void*)buffer, len));
    250 #endif
    251         write32(iow, lzo_adler32(ADLER32_INIT_VALUE, (const void*)buffer, len));
    252         write_buf(iow, b2, dst_len);
    253 
    254         /* Flush the data out */
    255         wandio_wwrite(DATA(iow)->child,
    256                 (char *)DATA(iow)->outbuff,
    257                 DATA(iow)->outoffset);
    258         DATA(iow)->outoffset = 0;
    259 
    260         /* Return the number of bytes compressed */
    261         return len;
     363static struct lzothread_t *get_next_thread(iow_t *iow)
     364{
     365        return &DATA(iow)->thread[DATA(iow)->next_thread];
    262366}
    263367
     
    267371        off_t ret = 0;
    268372        while (len>0) {
    269                 off_t size = (len >= 256*1024 ? 256*1024 : len);
     373                unsigned int size = min(len, MAX_BLOCK_SIZE);
    270374                off_t err;
    271 
    272                 err=lzo_wwrite_block(iow, buffer, size);
    273 
    274                 if (err < 0) {/* Error */
    275                         if (ret == 0)
    276                                 return err;
    277                         /* If we've written some data, return that fact now, let them call back
    278                          * and try and write more data, fail again then.
    279                          */
    280                         return ret;
     375                struct buffer_t outbuf;
     376
     377                if (!use_threads) {
     378                        err=lzo_wwrite_block(buffer, size, &outbuf);
     379
     380                        /* Flush the data out */
     381                        wandio_wwrite(DATA(iow)->child,
     382                                        outbuf.buffer,
     383                                        outbuf.offset);
     384
     385                        if (err < 0) {/* Error */
     386                                if (ret == 0)
     387                                        return err;
     388                                /* If we've written some data, return that fact now, let them call back
     389                                 * and try and write more data, fail again then.
     390                                 */
     391                                return ret;
     392                        }
     393                        else {
     394                                assert(err == size);
     395                                buffer += size;
     396                                len -= size;
     397                        }
    281398                }
    282399                else {
    283                         assert(err == size);
     400                        pthread_mutex_lock(&get_next_thread(iow)->mutex);
     401                               
     402                        /* If this thread is still compressing, wait for it to finish */
     403                        while (get_next_thread(iow)->state == WAITING) {
     404                                pthread_cond_wait(
     405                                        &get_next_thread(iow)->out_ready,
     406                                        &get_next_thread(iow)->mutex);
     407                        }
     408
     409                        /* Flush any data out thats there */
     410                        if (get_next_thread(iow)->state == FULL) {
     411                                wandio_wwrite(DATA(iow)->child,
     412                                                get_next_thread(iow)->outbuf.buffer,
     413                                                get_next_thread(iow)->outbuf.offset);
     414                                get_next_thread(iow)->state = EMPTY;
     415                                get_next_thread(iow)->inbuf.offset = 0;
     416                        }
     417
     418                        /* Figure out how much space we can copy into this buffer */
     419                        size = min(
     420                                sizeof(get_next_thread(iow)->inbuf.buffer)-get_next_thread(iow)->inbuf.offset,
     421                                size);
     422
     423                        /* Move our data in */
     424                        memcpy(&get_next_thread(iow)->inbuf.buffer[get_next_thread(iow)->inbuf.offset],
     425                                buffer,
     426                                size);
     427                        get_next_thread(iow)->inbuf.offset += size;
     428
     429                        /* If the buffer is now full Trigger the thread to start compressing this block,
     430                         * and move onto the next block.
     431                         */
     432                        if (get_next_thread(iow)->inbuf.offset >= sizeof(get_next_thread(iow)->inbuf.buffer)
     433                          ||get_next_thread(iow)->inbuf.offset >= MAX_BLOCK_SIZE) {
     434                                get_next_thread(iow)->state = WAITING;
     435                                pthread_mutex_unlock(&get_next_thread(iow)->mutex);
     436                                pthread_cond_signal(&get_next_thread(iow)->in_ready);
     437
     438                                DATA(iow)->next_thread =
     439                                                (DATA(iow)->next_thread+1) % DATA(iow)->threads;
     440                        }
     441                        else
     442                                pthread_mutex_unlock(&get_next_thread(iow)->mutex);
     443
     444                        /* Update the lengths */
    284445                        buffer += size;
    285446                        len -= size;
    286447                }
    287448        }
    288         return ret;
     449        return len;
     450}
     451
     452static void shutdown_thread(iow_t *iow, struct lzothread_t *thread)
     453{
     454        pthread_mutex_lock(&thread->mutex);
     455
     456        if (thread->state == EMPTY) {
     457                /* Partially full buffer, force a flush */
     458                if (thread->inbuf.offset != 0) {
     459                        thread->state = WAITING;
     460                        pthread_cond_signal(&thread->in_ready);
     461                }
     462        }
     463        while (thread->state == WAITING) {
     464                pthread_cond_wait(
     465                        &thread->out_ready,
     466                        &thread->mutex);
     467        }
     468        if (thread->state == FULL) {
     469                wandio_wwrite(DATA(iow)->child,
     470                                thread->outbuf.buffer,
     471                                thread->outbuf.offset);
     472                thread->state = EMPTY;
     473                thread->inbuf.offset = 0;
     474        }
     475        /* Now the thread should be empty, so ask it to shut down */
     476        thread->closing = true;
     477        pthread_mutex_unlock(&thread->mutex);
     478        pthread_cond_signal(&thread->in_ready);
     479        /* And wait for it to die */
     480        pthread_join(thread->thread,NULL);
    289481}
    290482
     
    292484{
    293485        const uint32_t zero = 0;
     486        int i;
     487
     488        /* Right, now we have to shutdown all our threads -- in order */
     489        for(i=DATA(iow)->next_thread; i<DATA(iow)->threads; ++i) {
     490                shutdown_thread(iow,&DATA(iow)->thread[i]);
     491        }
     492        for(i=0; i<DATA(iow)->next_thread; ++i) {
     493                shutdown_thread(iow,&DATA(iow)->thread[i]);
     494        }
     495
    294496        /* Write out an end of file marker */
    295497        wandio_wwrite(DATA(iow)->child,
    296498                &zero,
    297499                sizeof(zero));
     500
     501        /* And clean everything up */
    298502        wandio_wdestroy(DATA(iow)->child);
    299         free(DATA(iow)->buffer);
     503        free(DATA(iow)->thread);
    300504        free(iow->data);
    301505        free(iow);
Note: See TracChangeset for help on using the changeset viewer.