source: libwandio/iow-lzo.c @ ba19885

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since ba19885 was ba19885, checked in by Shane Alcock <salcock@…>, 7 years ago

Fix bug where some LZO traces would have bad checksums

If the 'compressed' block ends up larger than the uncompressed
block, lzop expects you to write the uncompressed block instead.
We were always writing the compressed block, which meant lzop couldn't
process the resulting file properly.

This was common with full payload captures, which don't compress very
well. Header captures were generally unaffected which is why we haven't
noticed this until now.

Thanks to Madhur Srivastava for reporting this problem.

  • Property mode set to 100644
File size: 14.9 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Authors: Perry Lorier
8 *          Shane Alcock
9 *         
10 * All rights reserved.
11 *
12 * This code has been developed by the University of Waikato WAND
13 * research group. For further information please see http://www.wand.net.nz/
14 *
15 * libtrace is free software; you can redistribute it and/or modify
16 * it under the terms of the GNU General Public License as published by
17 * the Free Software Foundation; either version 2 of the License, or
18 * (at your option) any later version.
19 *
20 * libtrace is distributed in the hope that it will be useful,
21 * but WITHOUT ANY WARRANTY; without even the implied warranty of
22 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23 * GNU General Public License for more details.
24 *
25 * You should have received a copy of the GNU General Public License
26 * along with libtrace; if not, write to the Free Software
27 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
28 *
29 * $Id: iow-lzo.c 1521 2010-02-08 22:21:16Z salcock $
30 *
31 */
32
33/* This writes out lzo files in the same format as lzop does.  It's not as
34 * flexible as lzop in an attempt to try and create a very fast method for
35 * writing data out.
36 *
37 * Data is written out in blocks, and the blocks are all compressed in seperate
38 * independant threads (if possible), thus letting you use multicore cpu's to
39 * get compression for the absolute least amount of walltime while capturing.
40 */
41
42#include <lzo/lzo1x.h>
43#include "wandio_internal.h"
44#include "wandio.h"
45#include "config.h"
46#include <sys/types.h>
47#include <sys/stat.h>
48#include <fcntl.h>
49#include <stdlib.h>
50#include <string.h>
51#include <time.h> /* for mtime */
52#include <errno.h>
53#include <assert.h>
54#include <inttypes.h>
55#include <arpa/inet.h>
56#include <pthread.h>
57#include <unistd.h> /* for sysconf */
58#include <stdbool.h>
59#ifdef HAVE_SYS_PRCTL_H
60#include <sys/prctl.h>
61#endif
62
63
64enum { 
65        M_LZO1X_1     =     1,
66        M_LZO1X_1_15  =     2,
67        M_LZO1X_999   =     3,
68        M_NRV1A       =  0x1a,
69        M_NRV1B       =  0x1b,
70        M_NRV2A       =  0x2a,
71        M_NRV2B       =  0x2b,
72        M_NRV2D       =  0x2d,
73        M_ZLIB        =   128,
74};
75
76static const int F_OS_UNIX   = 0x03000000L;
77static const int F_OS_MASK   = 0xff000000L;
78
79static const int F_CS_NATIVE = 0x00000000L;
80static const int F_CS_MASK   = 0x00f00000L;
81
82static const int F_H_CRC32   = 0x00001000L;
83static const int F_ADLER32_D = 0x00000001L;
84static const int F_ADLER32_C = 0x00000002L;
85
86/* popquiz! You throught "static const int" would be well constant didn't you?
87 * You'd be wrong, you can't use them in places where the compiler needs a
88 * constant, so you need to use an enum, since enums /are/ constant the compiler
89 * will let you use them as such.  Sigh.
90 */
91enum { MAX_BLOCK_SIZE = 128*1024 }; /* lzop can only decompress blocks
92                                        this large */
93
94/* According to lzop lzo can increase the data to this size, so save this
95 * much space in our buffers
96 */
97enum { MAX_BUFFER_SIZE = MAX_BLOCK_SIZE+MAX_BLOCK_SIZE/16+64+3 };
98
99static const unsigned char lzop_magic[9] =
100    { 0x89, 0x4c, 0x5a, 0x4f, 0x00, 0x0d, 0x0a, 0x1a, 0x0a };
101
102
103/* Libtrace IO module implementing a lzo writer */
104
105enum err_t {
106        ERR_OK  = 1,
107        ERR_EOF = 0,
108        ERR_ERROR = -1
109};
110
111const int ADLER32_INIT_VALUE = 1;
112const int CRC32_INIT_VALUE = 0;
113
114struct buffer_t {
115        unsigned int offset;
116        char buffer[MAX_BUFFER_SIZE];
117};
118
119struct lzothread_t {
120        pthread_t thread;
121        pthread_cond_t in_ready;
122        pthread_cond_t out_ready;
123        pthread_mutex_t mutex;
124        bool closing;
125        enum { EMPTY, WAITING, FULL } state;
126        int num;
127        struct buffer_t inbuf;
128        struct buffer_t outbuf;
129};
130
131struct lzow_t {
132        iow_t *child;
133        enum err_t err;
134        int threads;
135        int next_thread;
136        struct lzothread_t *thread;
137};
138
139extern iow_source_t lzo_wsource; 
140
141#define DATA(iow) ((struct lzow_t *)((iow)->data))
142#define min(a,b) ((a)<(b) ? (a) : (b))
143
144static void write_buf(struct buffer_t *buffer,const void *data, size_t len)
145{
146        assert(buffer->offset + len < sizeof(buffer->buffer) && "Exceeded output buffer size in lzo compressor");
147        memcpy(&buffer->buffer[buffer->offset], data, len);
148        buffer->offset += len;
149}
150
151static void write32(struct buffer_t *buffer, uint32_t value)
152{
153        value = htonl(value);
154        write_buf(buffer, &value, sizeof(value));
155}
156
157static void write16(struct buffer_t *buffer, uint16_t value)
158{
159        value = htons(value);
160        write_buf(buffer, &value, sizeof(value));
161}
162
163static void write8(struct buffer_t *buffer, uint8_t value)
164{
165        write_buf(buffer, &value, sizeof(value));
166}
167
168static int lzo_wwrite_block(const char *buffer, off_t len, struct buffer_t *outbuf)
169{
170        char b2[MAX_BUFFER_SIZE];
171        int err;
172        lzo_uint dst_len;
173        char scratch[LZO1X_1_MEM_COMPRESS];
174
175        outbuf->offset=0;
176
177        memset(scratch,0,sizeof(scratch));
178        err=lzo1x_1_compress((void*)buffer, len, 
179                        (void*)b2, &dst_len, 
180                        scratch);
181
182        switch(err) {
183                case LZO_E_OK:
184                        break;
185                case LZO_E_ERROR:
186                        return -EINVAL; /* WTF? */
187                case LZO_E_OUT_OF_MEMORY:
188                        return -ENOMEM; /* Uh oh */
189                case LZO_E_NOT_COMPRESSIBLE:
190                        return -EINVAL; /* Claimed not to be used, dunno what we'll do */
191                case LZO_E_INPUT_OVERRUN:
192                        return -EINVAL;  /* Can't happen on compress? */
193                case LZO_E_OUTPUT_OVERRUN:
194                        return -ENOMEM;
195                case LZO_E_LOOKBEHIND_OVERRUN:
196                        return -EINVAL;
197                case LZO_E_EOF_NOT_FOUND:
198                        return -ENOENT; /* Can't happen on compress? */
199                case LZO_E_INPUT_NOT_CONSUMED:
200                        return -EINVAL;
201                case LZO_E_NOT_YET_IMPLEMENTED:
202                        return -ENOSYS;
203                default:
204                        fprintf(stderr,"Unknown lzo error %d\n",err);
205                        return -EINVAL;
206        }
207
208        write32(outbuf, len); /* Original length */
209       
210        assert(len > 0);
211
212        /* If the compression actually makes the block bigger, we should write out the
213         * block uncompressed. */
214        if (dst_len < (uint32_t)len)
215                write32(outbuf, (uint32_t)dst_len);
216        else
217                write32(outbuf, len);
218        /* CRC32 of the uncompressed buffer */
219#if 0
220        write32(outbuf, lzo_crc32(CRC32_INIT_VALUE, (void*)buffer, len));
221#endif
222        write32(outbuf, 
223                lzo_adler32(ADLER32_INIT_VALUE, (const void*)buffer, len));
224       
225        if (dst_len < (uint32_t)len)
226                write_buf(outbuf, b2, dst_len);
227        else
228                write_buf(outbuf, buffer, len);
229
230        /* Return the number of bytes compressed */
231        return len;
232}
233
234/* There is one of these threads per core in a machine.  This compresses
235 * a block of data and returns it, the main thread tehn is responsible to
236 * write these back out in the right order.
237 */
238static void *lzo_compress_thread(void *data)
239{
240        struct lzothread_t *me = (struct lzothread_t *)data;
241        int err;
242        char namebuf[17];
243
244#ifdef PR_SET_NAME
245        if (prctl(PR_GET_NAME, namebuf, 0,0,0) == 0) {
246                char label[16];
247                namebuf[16] = '\0'; /* Make sure it's NUL terminated */
248                sprintf(label,"[lzo%d]",me->num);
249                /* If the filename is too long, overwrite the last few bytes */
250                if (strlen(namebuf)>=16-strlen(label)) {
251                        strcpy(namebuf+15-strlen(label),label);
252                }
253                else {
254                        strncat(namebuf," ",16);
255                        strncat(namebuf,label,16);
256                }
257                prctl(PR_SET_NAME, namebuf, 0,0,0);
258        }
259#endif
260
261        pthread_mutex_lock(&me->mutex);
262        while (!me->closing) {
263                while (me->state != WAITING) {
264                        if (me->closing)
265                                break;
266                        pthread_cond_wait(&me->in_ready, &me->mutex);
267                }
268                if (me->closing)
269                        break;
270
271                err=lzo_wwrite_block(
272                        me->inbuf.buffer, 
273                        me->inbuf.offset,
274                        &me->outbuf);
275
276                if (err < 0)
277                        break; 
278                /* Make sure someone else hasn't clobbered us!*/
279                assert(me->state == WAITING);
280                me->state = FULL;
281                pthread_cond_signal(&me->out_ready);
282        }
283        pthread_mutex_unlock(&me->mutex);
284
285        return NULL;
286}
287
288iow_t *lzo_wopen(iow_t *child, int compress_level)
289{
290        const int opt_filter = 0;
291        int flags;
292        iow_t *iow;
293        struct buffer_t buffer;
294        buffer.offset=0;
295        int i;
296
297        if (!child)
298                return NULL;
299
300        if (lzo_init() != LZO_E_OK) {
301                /* Fail */
302                return NULL;
303        }
304
305        /* Compress level is useless for LZO, but getting UNUSED into here
306         * is more trouble than it is worth so this check will at least
307         * stop us from getting warnings about it.
308         */
309        if (compress_level < 0)
310                return NULL;
311
312        iow = malloc(sizeof(iow_t));
313        iow->source = &lzo_wsource;
314        iow->data = malloc(sizeof(struct lzow_t));
315
316        DATA(iow)->child = child;
317        DATA(iow)->err = ERR_OK;
318
319        flags = 0;
320        flags |= F_OS_UNIX & F_OS_MASK; /* Operating System */
321        flags |= F_CS_NATIVE & F_CS_MASK;       /* Character Set */
322        flags |= F_ADLER32_D; /* We adler32 the uncompressed data */
323        /* flags |= F_STDIN; */
324        /* flags |= F_STDOUT */
325        /* flags |= F_MULTIPART; */
326        /* flags |= F_H_CRC32; */
327
328        write_buf(&buffer, lzop_magic, sizeof(lzop_magic));
329        write16(&buffer, 0x1010 &0xFFFF); /* version: pretend to be LZOP version 0x1010 from lzop's version.h */
330        write16(&buffer, lzo_version() & 0xFFFF); /* libversion */
331        write16(&buffer, opt_filter ? 0x0950 : 0x0940); /* version needed to extract */
332        write8(&buffer, M_LZO1X_1);     /* method */
333        write8(&buffer, 5); /* level */
334        write32(&buffer, flags); /* flags */
335        /* if (flags & F_H_FILTER)
336                write32(iow, opt_filter);
337        */ 
338        write32(&buffer, 0x600); /* mode: We assume traces may be sensitive */
339        write32(&buffer, time(NULL)); /* mtime */
340        write32(&buffer, 0); /* GMTdiff */
341
342        /* Length, filename */
343        write8(&buffer, strlen("compresseddata"));
344        write_buf(&buffer, "compresseddata",strlen("compresseddata"));
345
346        if (flags & F_H_CRC32) {
347                write32(&buffer, 
348                        lzo_crc32(CRC32_INIT_VALUE, 
349                                (const void*)buffer.buffer+sizeof(lzop_magic), 
350                                buffer.offset-sizeof(lzop_magic)));
351        }
352        else {
353                uint32_t chksum=lzo_adler32(
354                        ADLER32_INIT_VALUE, 
355                        (const void *)buffer.buffer+sizeof(lzop_magic), 
356                        buffer.offset-sizeof(lzop_magic));
357                write32(&buffer, chksum);
358        }
359
360        wandio_wwrite(DATA(iow)->child,
361                buffer.buffer,
362                buffer.offset);
363
364        /* Set up the thread pool -- one thread per core */
365        DATA(iow)->threads = min((uint32_t)sysconf(_SC_NPROCESSORS_ONLN),
366                        use_threads);
367        DATA(iow)->thread = malloc(
368                        sizeof(struct lzothread_t) * DATA(iow)->threads);
369        DATA(iow)->next_thread = 0;
370        for(i=0; i<DATA(iow)->threads; ++i) {
371                pthread_cond_init(&DATA(iow)->thread[i].in_ready, NULL);
372                pthread_cond_init(&DATA(iow)->thread[i].out_ready, NULL);
373                pthread_mutex_init(&DATA(iow)->thread[i].mutex, NULL);
374                DATA(iow)->thread[i].closing = false;
375                DATA(iow)->thread[i].num = i;
376                DATA(iow)->thread[i].state = EMPTY;
377                DATA(iow)->thread[i].inbuf.offset = 0;
378
379                pthread_create(&DATA(iow)->thread[i].thread, 
380                                NULL,
381                                lzo_compress_thread,
382                                (void*)&DATA(iow)->thread[i]);
383        }
384
385        return iow;
386}
387
388static struct lzothread_t *get_next_thread(iow_t *iow)
389{
390        return &DATA(iow)->thread[DATA(iow)->next_thread];
391}
392
393static off_t lzo_wwrite(iow_t *iow, const char *buffer, off_t len)
394{
395        off_t ret = 0;
396        while (len>0) {
397                off_t size = len;
398                off_t err;
399                struct buffer_t outbuf;
400
401                if (!DATA(iow)->threads) {
402                        size = min(len, MAX_BLOCK_SIZE);
403                        err=lzo_wwrite_block(buffer, size, &outbuf);
404                        /* Flush the data out */
405                        wandio_wwrite(DATA(iow)->child,
406                                        outbuf.buffer,
407                                        outbuf.offset);
408
409                        if (err < 0) {/* Error */
410                                if (ret == 0)
411                                        return err;
412                                /* If we've written some data, return that fact now, let them call back
413                                 * and try and write more data, fail again then.
414                                 */
415                                return ret;
416                        }
417                        else {
418                                assert(err == size);
419                                buffer += size;
420                                len -= size;
421                        }
422                }
423                else {
424                        off_t space;
425
426                        pthread_mutex_lock(&get_next_thread(iow)->mutex);
427                        /* If this thread is still compressing, wait for it to finish */
428                        while (get_next_thread(iow)->state == WAITING) {
429                                pthread_cond_wait(
430                                        &get_next_thread(iow)->out_ready, 
431                                        &get_next_thread(iow)->mutex);
432                        }
433
434                        /* Flush any data out thats there */
435                        if (get_next_thread(iow)->state == FULL) {
436                                assert(get_next_thread(iow)->outbuf.offset
437                                                < sizeof(get_next_thread(iow)->outbuf.buffer));
438                               
439                                wandio_wwrite(DATA(iow)->child,
440                                                get_next_thread(iow)->outbuf.buffer,
441                                                get_next_thread(iow)->outbuf.offset);
442                                get_next_thread(iow)->state = EMPTY;
443                                get_next_thread(iow)->inbuf.offset = 0;
444                        }
445
446                        assert(get_next_thread(iow)->state == EMPTY);
447
448                        /* Figure out how much space we can copy into this buffer */
449                        assert(MAX_BLOCK_SIZE <= sizeof(get_next_thread(iow)->inbuf.buffer));
450                        space = MAX_BLOCK_SIZE-get_next_thread(iow)->inbuf.offset;
451                        size = min(space, size);
452                        assert(size>0);
453                        assert(size <= MAX_BLOCK_SIZE);
454                        assert(get_next_thread(iow)->inbuf.offset + size <= MAX_BLOCK_SIZE);
455
456                        /* Move our data in */
457                        memcpy(&get_next_thread(iow)->inbuf.buffer[get_next_thread(iow)->inbuf.offset], 
458                                buffer, 
459                                size);
460                        get_next_thread(iow)->inbuf.offset += size;
461
462                        /* If the buffer is now full Trigger the thread to start compressing this block,
463                         * and move onto the next block.
464                         */
465                        if (get_next_thread(iow)->inbuf.offset >= sizeof(get_next_thread(iow)->inbuf.buffer)
466                          ||get_next_thread(iow)->inbuf.offset >= MAX_BLOCK_SIZE) {
467                                assert(get_next_thread(iow)->state == EMPTY);
468                                get_next_thread(iow)->state = WAITING;
469                                pthread_cond_signal(&get_next_thread(iow)->in_ready);
470
471                                pthread_mutex_unlock(&get_next_thread(iow)->mutex);
472
473                                DATA(iow)->next_thread = 
474                                                (DATA(iow)->next_thread+1) % DATA(iow)->threads;
475                        }
476                        else 
477                                pthread_mutex_unlock(&get_next_thread(iow)->mutex);
478
479                        /* Update the lengths */
480                        buffer += size;
481                        len -= size;
482                }
483        }
484        return len;
485}
486
487static void shutdown_thread(iow_t *iow, struct lzothread_t *thread)
488{
489        pthread_mutex_lock(&thread->mutex);
490
491        /* If this buffer is empty it shouldn't have any data in it, we should have taken
492         * care of that before.
493         */
494        /* thread->state == EMPTY implies thread->inbuf.offset == 0 */
495        assert(!(thread->state == EMPTY) || thread->inbuf.offset == 0);
496
497        while (thread->state == WAITING) {
498                pthread_cond_wait(
499                        &thread->out_ready,
500                        &thread->mutex);
501        }
502        if (thread->state == FULL) {
503                wandio_wwrite(DATA(iow)->child,
504                                thread->outbuf.buffer,
505                                thread->outbuf.offset);
506                thread->state = EMPTY;
507                thread->inbuf.offset = 0;
508        }
509        /* Now the thread should be empty, so ask it to shut down */
510        assert(thread->state == EMPTY && thread->inbuf.offset == 0);
511        thread->closing = true;
512        pthread_cond_signal(&thread->in_ready);
513        pthread_mutex_unlock(&thread->mutex);
514        /* And wait for it to die */
515        pthread_join(thread->thread,NULL);
516}
517
518static void lzo_wclose(iow_t *iow)
519{
520        const uint32_t zero = 0;
521        int i;
522
523        /* Flush the last buffer */
524        pthread_mutex_lock(&get_next_thread(iow)->mutex);
525        if (get_next_thread(iow)->state == EMPTY && get_next_thread(iow)->inbuf.offset != 0) {
526                get_next_thread(iow)->state = WAITING;
527                pthread_cond_signal(&get_next_thread(iow)->in_ready);
528        }
529        pthread_mutex_unlock(&get_next_thread(iow)->mutex);
530
531        DATA(iow)->next_thread = 
532                        (DATA(iow)->next_thread+1) % DATA(iow)->threads;
533
534        /* Right, now we have to shutdown all our threads -- in order */
535        for(i=DATA(iow)->next_thread; i<DATA(iow)->threads; ++i) {
536                shutdown_thread(iow,&DATA(iow)->thread[i]);
537        }
538        for(i=0; i<DATA(iow)->next_thread; ++i) {
539                shutdown_thread(iow,&DATA(iow)->thread[i]);
540        }
541
542        /* Write out an end of file marker */
543        wandio_wwrite(DATA(iow)->child,
544                &zero,
545                sizeof(zero));
546
547        /* And clean everything up */
548        wandio_wdestroy(DATA(iow)->child);
549        free(DATA(iow)->thread);
550        free(iow->data);
551        free(iow);
552}
553
554iow_source_t lzo_wsource = {
555        "lzo",
556        lzo_wwrite,
557        lzo_wclose
558};
559
Note: See TracBrowser for help on using the repository browser.