source: lib/iow-thread.c @ 3b027a6

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivegetfragoffhelplibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 3b027a6 was 3b027a6, checked in by Perry Lorier <perry@…>, 10 years ago

Don't be so memory hungry with threading buffers

  • Property mode set to 100644
File size: 6.8 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Authors: Daniel Lawson
8 *          Perry Lorier
9 *          Shane Alcock
10 *         
11 * All rights reserved.
12 *
13 * This code has been developed by the University of Waikato WAND
14 * research group. For further information please see http://www.wand.net.nz/
15 *
16 * libtrace is free software; you can redistribute it and/or modify
17 * it under the terms of the GNU General Public License as published by
18 * the Free Software Foundation; either version 2 of the License, or
19 * (at your option) any later version.
20 *
21 * libtrace is distributed in the hope that it will be useful,
22 * but WITHOUT ANY WARRANTY; without even the implied warranty of
23 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24 * GNU General Public License for more details.
25 *
26 * You should have received a copy of the GNU General Public License
27 * along with libtrace; if not, write to the Free Software
28 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
29 *
30 * $Id$
31 *
32 */
33
34#include "wandio.h"
35#include "config.h"
36#include <sys/types.h>
37#include <sys/stat.h>
38#include <fcntl.h>
39#include <stdlib.h>
40#include <pthread.h>
41#include <string.h>
42#include <stdbool.h>
43#ifdef HAVE_SYS_PRCTL_H
44#include <sys/prctl.h>
45#endif
46
47/* Libtrace IO module implementing a threaded writer.
48 *
49 * This module enables another IO writer, called the "child", to perform its
50 * writing using a separate thread. The main thread writes data into a series
51 * of 1MB buffers. Meanwhile, the writing thread writes out of these buffers
52 * using the callback for the child reader. pthread conditions are used to
53 * communicate between the two threads, e.g. when there are buffers available
54 * for the main thread to copy data into or when there is data available for
55 * the write thread to write.
56 */
57
58/* 1MB Buffer */
59#define BUFFERSIZE (1024*1024)
60#define BUFFERS 5
61
62extern iow_source_t thread_wsource;
63
64/* This structure defines a single buffer or "slice" */
65struct buffer_t {
66        char buffer[BUFFERSIZE];        /* The buffer itself */
67        int len;                        /* The size of the buffer */
68        enum { EMPTY = 0, FULL = 1 } state;     /* Is the buffer in use? */
69};
70
71struct state_t {
72        /* The collection of buffers (or slices) */
73        struct buffer_t buffer[BUFFERS];
74        /* The write offset into the current buffer */
75        off_t offset;
76        /* The writing thread */
77        pthread_t consumer;
78        /* The child writer */
79        iow_t *iow;
80        /* Indicates that there is data in one of the buffers */
81        pthread_cond_t data_ready;
82        /* Indicates that there is a free buffer to write into */
83        pthread_cond_t space_avail;
84        /* The mutex for the write buffers */
85        pthread_mutex_t mutex;
86        /* The index of the buffer to write into next */
87        int out_buffer;
88        /* Indicates whether the main thread is concluding */
89        bool closing;
90};
91
92#define DATA(x) ((struct state_t *)((x)->data))
93#define OUTBUFFER(x) (DATA(x)->buffer[DATA(x)->out_buffer])
94#define min(a,b) ((a)<(b) ? (a) : (b))
95
96/* The writing thread */
97static void *thread_consumer(void *userdata)
98{
99        int buffer=0;
100        bool running = true;
101        char namebuf[17];
102        iow_t *state = (iow_t *) userdata;
103
104#ifdef PR_SET_NAME
105        if (prctl(PR_GET_NAME, namebuf, 0,0,0) == 0) {
106                namebuf[16] = '\0'; /* Make sure it's NUL terminated */
107                /* If the filename is too long, overwrite the last few bytes */
108                if (strlen(namebuf)>9) {
109                        strcpy(namebuf+10,"[iow]");
110                }
111                else {
112                        strncat(namebuf," [iow]",16);
113                }
114                prctl(PR_SET_NAME, namebuf, 0,0,0);
115        }
116#endif
117
118        pthread_mutex_lock(&DATA(state)->mutex);
119        do {
120                /* Wait for data that we can write */
121                while (DATA(state)->buffer[buffer].state == EMPTY) {
122                        /* Unless, of course, the program is over! */
123                        if (DATA(state)->closing)
124                                break;
125                        pthread_cond_wait(&DATA(state)->data_ready,
126                                        &DATA(state)->mutex);
127                }
128               
129                /* Empty the buffer using the child writer */
130                pthread_mutex_unlock(&DATA(state)->mutex);
131                wandio_wwrite(
132                                DATA(state)->iow,
133                                DATA(state)->buffer[buffer].buffer,
134                                DATA(state)->buffer[buffer].len);
135                pthread_mutex_lock(&DATA(state)->mutex);
136
137                /* If we've not reached the end of the file keep going */
138                running = ( DATA(state)->buffer[buffer].len > 0 );
139                DATA(state)->buffer[buffer].len = 0;
140                DATA(state)->buffer[buffer].state = EMPTY;
141
142                /* Signal that we've freed up another buffer for the main
143                 * thread to copy data into */
144                pthread_cond_signal(&DATA(state)->space_avail);
145
146
147                /* Move on to the next buffer */
148                buffer=(buffer+1) % BUFFERS;
149
150        } while(running);
151
152        /* If we reach here, it's all over so start tidying up */
153        wandio_wdestroy(DATA(state)->iow);
154
155        pthread_mutex_unlock(&DATA(state)->mutex);
156        return NULL;
157}
158
159iow_t *thread_wopen(iow_t *child)
160{
161        iow_t *state;
162
163        if (!child) {
164                return NULL;
165        }
166       
167
168        state = malloc(sizeof(iow_t));
169        state->data = calloc(1,sizeof(struct state_t));
170        state->source = &thread_wsource;
171
172        DATA(state)->out_buffer = 0;
173        DATA(state)->offset = 0;
174        pthread_mutex_init(&DATA(state)->mutex,NULL);
175        pthread_cond_init(&DATA(state)->data_ready,NULL);
176        pthread_cond_init(&DATA(state)->space_avail,NULL);
177
178        DATA(state)->iow = child;
179        DATA(state)->closing = false;
180
181        /* Start the writer thread */
182        pthread_create(&DATA(state)->consumer,NULL,thread_consumer,state);
183
184        return state;
185}
186
187static off_t thread_wwrite(iow_t *state, const char *buffer, off_t len)
188{
189        int slice;
190        int copied=0;
191        int newbuffer;
192
193        pthread_mutex_lock(&DATA(state)->mutex);
194        while(len>0) {
195
196                /* Wait for there to be space available for us to write into */
197                while (OUTBUFFER(state).state == FULL) {
198                        write_waits++;
199                        pthread_cond_wait(&DATA(state)->space_avail,
200                                        &DATA(state)->mutex);
201                }
202
203                /* Copy out of our main buffer into the next available slice */
204                slice=min( 
205                        (off_t)sizeof(OUTBUFFER(state).buffer)-DATA(state)->offset,
206                        len);
207                               
208                pthread_mutex_unlock(&DATA(state)->mutex);
209                memcpy(
210                        OUTBUFFER(state).buffer+DATA(state)->offset,
211                        buffer,
212                        slice
213                        );
214                pthread_mutex_lock(&DATA(state)->mutex);
215
216                DATA(state)->offset += slice;
217                OUTBUFFER(state).len += slice;
218
219                buffer += slice;
220                len -= slice;
221                copied += slice;
222                newbuffer = DATA(state)->out_buffer;
223
224                /* If we've filled a buffer, move on to the next one and
225                 * signal to the write thread that there is something for it
226                 * to do */
227                if (DATA(state)->offset >= (off_t)sizeof(OUTBUFFER(state).buffer)) {
228                        OUTBUFFER(state).state = FULL;
229                        pthread_cond_signal(&DATA(state)->data_ready);
230                        DATA(state)->offset = 0;
231                        newbuffer = (newbuffer+1) % BUFFERS;
232                }
233
234                DATA(state)->out_buffer = newbuffer;
235        }
236
237        pthread_mutex_unlock(&DATA(state)->mutex);
238        return copied;
239}
240
241static void thread_wclose(iow_t *iow)
242{
243        pthread_mutex_lock(&DATA(iow)->mutex);
244        DATA(iow)->closing = true;
245        pthread_cond_signal(&DATA(iow)->data_ready);
246        pthread_mutex_unlock(&DATA(iow)->mutex);
247        pthread_join(DATA(iow)->consumer,NULL);
248        free(iow->data);
249        free(iow);
250}
251
252iow_source_t thread_wsource = {
253        "threadw",
254        thread_wwrite,
255        thread_wclose
256};
Note: See TracBrowser for help on using the repository browser.