source: libwandio/iow-lzo.c @ 94725ea

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 94725ea was 94725ea, checked in by Shane Alcock <salcock@…>, 5 years ago

Merge remote-tracking branch 'origin/master' into develop

  • Property mode set to 100644
File size: 14.9 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Authors: Perry Lorier
8 *          Shane Alcock
9 *         
10 * All rights reserved.
11 *
12 * This code has been developed by the University of Waikato WAND
13 * research group. For further information please see http://www.wand.net.nz/
14 *
15 * libtrace is free software; you can redistribute it and/or modify
16 * it under the terms of the GNU General Public License as published by
17 * the Free Software Foundation; either version 2 of the License, or
18 * (at your option) any later version.
19 *
20 * libtrace is distributed in the hope that it will be useful,
21 * but WITHOUT ANY WARRANTY; without even the implied warranty of
22 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23 * GNU General Public License for more details.
24 *
25 * You should have received a copy of the GNU General Public License
26 * along with libtrace; if not, write to the Free Software
27 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
28 *
29 * $Id: iow-lzo.c 1521 2010-02-08 22:21:16Z salcock $
30 *
31 */
32
33/* This writes out lzo files in the same format as lzop does.  It's not as
34 * flexible as lzop in an attempt to try and create a very fast method for
35 * writing data out.
36 *
37 * Data is written out in blocks, and the blocks are all compressed in seperate
38 * independant threads (if possible), thus letting you use multicore cpu's to
39 * get compression for the absolute least amount of walltime while capturing.
40 */
41
42#include "config.h"
43#include <lzo/lzo1x.h>
44#include "wandio_internal.h"
45#include "wandio.h"
46#include <sys/types.h>
47#include <sys/stat.h>
48#include <fcntl.h>
49#include <stdlib.h>
50#include <string.h>
51#include <time.h> /* for mtime */
52#include <errno.h>
53#include <assert.h>
54#include <inttypes.h>
55#include <arpa/inet.h>
56#include <pthread.h>
57#include <unistd.h> /* for sysconf */
58#include <stdbool.h>
59#ifdef HAVE_SYS_PRCTL_H
60#include <sys/prctl.h>
61#endif
62
63
64enum { 
65        M_LZO1X_1     =     1,
66        M_LZO1X_1_15  =     2,
67        M_LZO1X_999   =     3,
68        M_NRV1A       =  0x1a,
69        M_NRV1B       =  0x1b,
70        M_NRV2A       =  0x2a,
71        M_NRV2B       =  0x2b,
72        M_NRV2D       =  0x2d,
73        M_ZLIB        =   128,
74};
75
76static const int F_OS_UNIX   = 0x03000000L;
77static const int F_OS_MASK   = 0xff000000L;
78
79static const int F_CS_NATIVE = 0x00000000L;
80static const int F_CS_MASK   = 0x00f00000L;
81
82static const int F_H_CRC32   = 0x00001000L;
83static const int F_ADLER32_D = 0x00000001L;
84static const int F_ADLER32_C = 0x00000002L;
85
86/* popquiz! You throught "static const int" would be well constant didn't you?
87 * You'd be wrong, you can't use them in places where the compiler needs a
88 * constant, so you need to use an enum, since enums /are/ constant the compiler
89 * will let you use them as such.  Sigh.
90 */
91enum { MAX_BLOCK_SIZE = 128*1024 }; /* lzop can only decompress blocks
92                                        this large */
93
94/* According to lzop lzo can increase the data to this size, so save this
95 * much space in our buffers
96 */
97enum { MAX_BUFFER_SIZE = MAX_BLOCK_SIZE+MAX_BLOCK_SIZE/16+64+3 };
98
99static const unsigned char lzop_magic[9] =
100    { 0x89, 0x4c, 0x5a, 0x4f, 0x00, 0x0d, 0x0a, 0x1a, 0x0a };
101
102
103/* Libtrace IO module implementing a lzo writer */
104
105enum err_t {
106        ERR_OK  = 1,
107        ERR_EOF = 0,
108        ERR_ERROR = -1
109};
110
111const int ADLER32_INIT_VALUE = 1;
112const int CRC32_INIT_VALUE = 0;
113
114struct buffer_t {
115        unsigned int offset;
116        char buffer[MAX_BUFFER_SIZE];
117};
118
119struct lzothread_t {
120        pthread_t thread;
121        pthread_cond_t in_ready;
122        pthread_cond_t out_ready;
123        pthread_mutex_t mutex;
124        bool closing;
125        enum { EMPTY, WAITING, FULL } state;
126        int num;
127        struct buffer_t inbuf;
128        struct buffer_t outbuf;
129};
130
131struct lzow_t {
132        iow_t *child;
133        enum err_t err;
134        int threads;
135        int next_thread;
136        struct lzothread_t *thread;
137};
138
139extern iow_source_t lzo_wsource; 
140
141#define DATA(iow) ((struct lzow_t *)((iow)->data))
142#define min(a,b) ((a)<(b) ? (a) : (b))
143
144static void write_buf(struct buffer_t *buffer,const void *data, size_t len)
145{
146        assert(buffer->offset + len < sizeof(buffer->buffer) && "Exceeded output buffer size in lzo compressor");
147        memcpy(&buffer->buffer[buffer->offset], data, len);
148        buffer->offset += len;
149}
150
151static void write32(struct buffer_t *buffer, uint32_t value)
152{
153        value = htonl(value);
154        write_buf(buffer, &value, sizeof(value));
155}
156
157static void write16(struct buffer_t *buffer, uint16_t value)
158{
159        value = htons(value);
160        write_buf(buffer, &value, sizeof(value));
161}
162
163static void write8(struct buffer_t *buffer, uint8_t value)
164{
165        write_buf(buffer, &value, sizeof(value));
166}
167
168static int lzo_wwrite_block(const char *buffer, off_t len, struct buffer_t *outbuf)
169{
170        char b2[MAX_BUFFER_SIZE];
171        int err;
172        lzo_uint dst_len;
173        char scratch[LZO1X_1_MEM_COMPRESS];
174
175        outbuf->offset=0;
176
177        memset(scratch,0,sizeof(scratch));
178        err=lzo1x_1_compress((void*)buffer, len, 
179                        (void*)b2, &dst_len, 
180                        scratch);
181
182        switch(err) {
183                case LZO_E_OK:
184                        break;
185                case LZO_E_ERROR:
186                        return -EINVAL; /* WTF? */
187                case LZO_E_OUT_OF_MEMORY:
188                        return -ENOMEM; /* Uh oh */
189                case LZO_E_NOT_COMPRESSIBLE:
190                        return -EINVAL; /* Claimed not to be used, dunno what we'll do */
191                case LZO_E_INPUT_OVERRUN:
192                        return -EINVAL;  /* Can't happen on compress? */
193                case LZO_E_OUTPUT_OVERRUN:
194                        return -ENOMEM;
195                case LZO_E_LOOKBEHIND_OVERRUN:
196                        return -EINVAL;
197                case LZO_E_EOF_NOT_FOUND:
198                        return -ENOENT; /* Can't happen on compress? */
199                case LZO_E_INPUT_NOT_CONSUMED:
200                        return -EINVAL;
201                case LZO_E_NOT_YET_IMPLEMENTED:
202                        return -ENOSYS;
203                default:
204                        fprintf(stderr,"Unknown lzo error %d\n",err);
205                        return -EINVAL;
206        }
207
208        write32(outbuf, len); /* Original length */
209       
210        assert(len > 0);
211
212        /* If the compression actually makes the block bigger, we should write out the
213         * block uncompressed. */
214        if (dst_len < (uint32_t)len)
215                write32(outbuf, (uint32_t)dst_len);
216        else
217                write32(outbuf, len);
218        /* CRC32 of the uncompressed buffer */
219#if 0
220        write32(outbuf, lzo_crc32(CRC32_INIT_VALUE, (void*)buffer, len));
221#endif
222        write32(outbuf, 
223                lzo_adler32(ADLER32_INIT_VALUE, (const void*)buffer, len));
224       
225        if (dst_len < (uint32_t)len)
226                write_buf(outbuf, b2, dst_len);
227        else
228                write_buf(outbuf, buffer, len);
229
230        /* Return the number of bytes compressed */
231        return len;
232}
233
234/* There is one of these threads per core in a machine.  This compresses
235 * a block of data and returns it, the main thread tehn is responsible to
236 * write these back out in the right order.
237 */
238static void *lzo_compress_thread(void *data)
239{
240        struct lzothread_t *me = (struct lzothread_t *)data;
241        int err;
242        char namebuf[17];
243
244#ifdef PR_SET_NAME
245        if (prctl(PR_GET_NAME, namebuf, 0,0,0) == 0) {
246                char label[16];
247                namebuf[16] = '\0'; /* Make sure it's NUL terminated */
248                sprintf(label,"[lzo%d]",me->num);
249                /* If the filename is too long, overwrite the last few bytes */
250                if (strlen(namebuf)>=16-strlen(label)) {
251                        strcpy(namebuf+15-strlen(label),label);
252                }
253                else {
254                        strncat(namebuf," ",16);
255                        strncat(namebuf,label,16);
256                }
257                prctl(PR_SET_NAME, namebuf, 0,0,0);
258        }
259#endif
260
261        pthread_mutex_lock(&me->mutex);
262        while (!me->closing) {
263                while (me->state != WAITING) {
264                        if (me->closing)
265                                break;
266                        pthread_cond_wait(&me->in_ready, &me->mutex);
267                }
268                if (me->closing)
269                        break;
270
271                err=lzo_wwrite_block(
272                        me->inbuf.buffer, 
273                        me->inbuf.offset,
274                        &me->outbuf);
275
276                if (err < 0)
277                        break; 
278                /* Make sure someone else hasn't clobbered us!*/
279                assert(me->state == WAITING);
280                me->state = FULL;
281                pthread_cond_signal(&me->out_ready);
282        }
283        pthread_mutex_unlock(&me->mutex);
284
285        return NULL;
286}
287
288iow_t *lzo_wopen(iow_t *child, int compress_level)
289{
290        const int opt_filter = 0;
291        int flags;
292        iow_t *iow;
293        struct buffer_t buffer;
294        buffer.offset=0;
295        int i;
296
297        if (!child)
298                return NULL;
299
300        if (lzo_init() != LZO_E_OK) {
301                /* Fail */
302                return NULL;
303        }
304
305        /* Compress level is useless for LZO, but getting UNUSED into here
306         * is more trouble than it is worth so this check will at least
307         * stop us from getting warnings about it.
308         */
309        if (compress_level < 0)
310                return NULL;
311
312        iow = malloc(sizeof(iow_t));
313        iow->source = &lzo_wsource;
314        iow->data = malloc(sizeof(struct lzow_t));
315
316        DATA(iow)->child = child;
317        DATA(iow)->err = ERR_OK;
318
319        flags = 0;
320        flags |= F_OS_UNIX & F_OS_MASK; /* Operating System */
321        flags |= F_CS_NATIVE & F_CS_MASK;       /* Character Set */
322        flags |= F_ADLER32_D; /* We adler32 the uncompressed data */
323        /* flags |= F_STDIN; */
324        /* flags |= F_STDOUT */
325        /* flags |= F_MULTIPART; */
326        /* flags |= F_H_CRC32; */
327
328        write_buf(&buffer, lzop_magic, sizeof(lzop_magic));
329        write16(&buffer, 0x1010 &0xFFFF); /* version: pretend to be LZOP version 0x1010 from lzop's version.h */
330        write16(&buffer, lzo_version() & 0xFFFF); /* libversion */
331        write16(&buffer, opt_filter ? 0x0950 : 0x0940); /* version needed to extract */
332        write8(&buffer, M_LZO1X_1);     /* method */
333        write8(&buffer, 5); /* level */
334        write32(&buffer, flags); /* flags */
335        /* if (flags & F_H_FILTER)
336                write32(iow, opt_filter);
337        */ 
338        write32(&buffer, 0x600); /* mode: We assume traces may be sensitive */
339        write32(&buffer, time(NULL)); /* mtime */
340        write32(&buffer, 0); /* GMTdiff */
341
342        /* Length, filename */
343        write8(&buffer, strlen("compresseddata"));
344        write_buf(&buffer, "compresseddata",strlen("compresseddata"));
345
346        if (flags & F_H_CRC32) {
347                write32(&buffer, 
348                        lzo_crc32(CRC32_INIT_VALUE, 
349                                (const void*)buffer.buffer+sizeof(lzop_magic), 
350                                buffer.offset-sizeof(lzop_magic)));
351        }
352        else {
353                uint32_t chksum=lzo_adler32(
354                        ADLER32_INIT_VALUE, 
355                        (const void *)buffer.buffer+sizeof(lzop_magic), 
356                        buffer.offset-sizeof(lzop_magic));
357                write32(&buffer, chksum);
358        }
359
360        wandio_wwrite(DATA(iow)->child,
361                buffer.buffer,
362                buffer.offset);
363
364        /* Set up the thread pool -- one thread per core */
365        DATA(iow)->threads = min((uint32_t)sysconf(_SC_NPROCESSORS_ONLN),
366                        use_threads);
367        DATA(iow)->thread = malloc(
368                        sizeof(struct lzothread_t) * DATA(iow)->threads);
369        DATA(iow)->next_thread = 0;
370        for(i=0; i<DATA(iow)->threads; ++i) {
371                pthread_cond_init(&DATA(iow)->thread[i].in_ready, NULL);
372                pthread_cond_init(&DATA(iow)->thread[i].out_ready, NULL);
373                pthread_mutex_init(&DATA(iow)->thread[i].mutex, NULL);
374                DATA(iow)->thread[i].closing = false;
375                DATA(iow)->thread[i].num = i;
376                DATA(iow)->thread[i].state = EMPTY;
377                DATA(iow)->thread[i].inbuf.offset = 0;
378
379                pthread_create(&DATA(iow)->thread[i].thread, 
380                                NULL,
381                                lzo_compress_thread,
382                                (void*)&DATA(iow)->thread[i]);
383        }
384
385        return iow;
386}
387
388static struct lzothread_t *get_next_thread(iow_t *iow)
389{
390        return &DATA(iow)->thread[DATA(iow)->next_thread];
391}
392
393static off_t lzo_wwrite(iow_t *iow, const char *buffer, off_t len)
394{
395        off_t ret = 0;
396        while (len>0) {
397                off_t size = len;
398                off_t err;
399                struct buffer_t outbuf;
400
401                if (!DATA(iow)->threads) {
402                        size = min(len, MAX_BLOCK_SIZE);
403                        err=lzo_wwrite_block(buffer, size, &outbuf);
404                        /* Flush the data out */
405                        wandio_wwrite(DATA(iow)->child,
406                                        outbuf.buffer,
407                                        outbuf.offset);
408
409                        if (err < 0) {/* Error */
410                                if (ret == 0)
411                                        return err;
412                                /* If we've written some data, return that fact now, let them call back
413                                 * and try and write more data, fail again then.
414                                 */
415                                return ret;
416                        }
417                        else {
418                                assert(err == size);
419                                buffer += size;
420                                len -= size;
421                        }
422                }
423                else {
424                        off_t space;
425
426                        pthread_mutex_lock(&get_next_thread(iow)->mutex);
427                        /* If this thread is still compressing, wait for it to finish */
428                        while (get_next_thread(iow)->state == WAITING) {
429                                pthread_cond_wait(
430                                        &get_next_thread(iow)->out_ready, 
431                                        &get_next_thread(iow)->mutex);
432                        }
433
434                        /* Flush any data out thats there */
435                        if (get_next_thread(iow)->state == FULL) {
436                                assert(get_next_thread(iow)->outbuf.offset
437                                                < sizeof(get_next_thread(iow)->outbuf.buffer));
438                               
439                                wandio_wwrite(DATA(iow)->child,
440                                                get_next_thread(iow)->outbuf.buffer,
441                                                get_next_thread(iow)->outbuf.offset);
442                                get_next_thread(iow)->state = EMPTY;
443                                get_next_thread(iow)->inbuf.offset = 0;
444                        }
445
446                        assert(get_next_thread(iow)->state == EMPTY);
447
448                        /* Figure out how much space we can copy into this buffer */
449                        assert(MAX_BLOCK_SIZE <= sizeof(get_next_thread(iow)->inbuf.buffer));
450                        space = MAX_BLOCK_SIZE-get_next_thread(iow)->inbuf.offset;
451                        size = min(space, size);
452                        assert(size>0);
453                        assert(size <= MAX_BLOCK_SIZE);
454                        assert(get_next_thread(iow)->inbuf.offset + size <= MAX_BLOCK_SIZE);
455
456                        /* Move our data in */
457                        memcpy(&get_next_thread(iow)->inbuf.buffer[get_next_thread(iow)->inbuf.offset], 
458                                buffer, 
459                                size);
460                        get_next_thread(iow)->inbuf.offset += size;
461
462                        /* If the buffer is now full Trigger the thread to start compressing this block,
463                         * and move onto the next block.
464                         */
465                        if (get_next_thread(iow)->inbuf.offset >= sizeof(get_next_thread(iow)->inbuf.buffer)
466                          ||get_next_thread(iow)->inbuf.offset >= MAX_BLOCK_SIZE) {
467                                assert(get_next_thread(iow)->state == EMPTY);
468                                get_next_thread(iow)->state = WAITING;
469                                pthread_cond_signal(&get_next_thread(iow)->in_ready);
470
471                                pthread_mutex_unlock(&get_next_thread(iow)->mutex);
472
473                                DATA(iow)->next_thread = 
474                                                (DATA(iow)->next_thread+1) % DATA(iow)->threads;
475                        }
476                        else 
477                                pthread_mutex_unlock(&get_next_thread(iow)->mutex);
478
479                        /* Update the lengths */
480                        buffer += size;
481                        len -= size;
482                }
483        }
484        return len;
485}
486
487static void shutdown_thread(iow_t *iow, struct lzothread_t *thread)
488{
489        pthread_mutex_lock(&thread->mutex);
490
491        /* If this buffer is empty it shouldn't have any data in it, we should have taken
492         * care of that before.
493         */
494        /* thread->state == EMPTY implies thread->inbuf.offset == 0 */
495        assert(!(thread->state == EMPTY) || thread->inbuf.offset == 0);
496
497        while (thread->state == WAITING) {
498                pthread_cond_wait(
499                        &thread->out_ready,
500                        &thread->mutex);
501        }
502        if (thread->state == FULL) {
503                wandio_wwrite(DATA(iow)->child,
504                                thread->outbuf.buffer,
505                                thread->outbuf.offset);
506                thread->state = EMPTY;
507                thread->inbuf.offset = 0;
508        }
509        /* Now the thread should be empty, so ask it to shut down */
510        assert(thread->state == EMPTY && thread->inbuf.offset == 0);
511        thread->closing = true;
512        pthread_cond_signal(&thread->in_ready);
513        pthread_mutex_unlock(&thread->mutex);
514        /* And wait for it to die */
515        pthread_join(thread->thread,NULL);
516}
517
518static void lzo_wclose(iow_t *iow)
519{
520        const uint32_t zero = 0;
521        int i;
522
523        /* Flush the last buffer */
524        pthread_mutex_lock(&get_next_thread(iow)->mutex);
525        if (get_next_thread(iow)->state == EMPTY && get_next_thread(iow)->inbuf.offset != 0) {
526                get_next_thread(iow)->state = WAITING;
527                pthread_cond_signal(&get_next_thread(iow)->in_ready);
528        }
529        pthread_mutex_unlock(&get_next_thread(iow)->mutex);
530
531        DATA(iow)->next_thread = 
532                        (DATA(iow)->next_thread+1) % DATA(iow)->threads;
533
534        /* Right, now we have to shutdown all our threads -- in order */
535        for(i=DATA(iow)->next_thread; i<DATA(iow)->threads; ++i) {
536                shutdown_thread(iow,&DATA(iow)->thread[i]);
537        }
538        for(i=0; i<DATA(iow)->next_thread; ++i) {
539                shutdown_thread(iow,&DATA(iow)->thread[i]);
540        }
541
542        /* Write out an end of file marker */
543        wandio_wwrite(DATA(iow)->child,
544                &zero,
545                sizeof(zero));
546
547        /* And clean everything up */
548        wandio_wdestroy(DATA(iow)->child);
549        free(DATA(iow)->thread);
550        free(iow->data);
551        free(iow);
552}
553
554iow_source_t lzo_wsource = {
555        "lzo",
556        lzo_wwrite,
557        lzo_wclose
558};
559
Note: See TracBrowser for help on using the repository browser.