source: lib/iow-thread.c @ 22a9ccc

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivegetfragoffhelplibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 22a9ccc was 22a9ccc, checked in by Shane Alcock <salcock@…>, 12 years ago
  • Updated licensing and documentation for all the IO writer modules
  • Got rid of annoying "Write thread leaving" message!
  • Added Id keyword support to all IO modules
  • Property mode set to 100644
File size: 6.4 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Authors: Daniel Lawson
8 *          Perry Lorier
9 *          Shane Alcock
10 *         
11 * All rights reserved.
12 *
13 * This code has been developed by the University of Waikato WAND
14 * research group. For further information please see http://www.wand.net.nz/
15 *
16 * libtrace is free software; you can redistribute it and/or modify
17 * it under the terms of the GNU General Public License as published by
18 * the Free Software Foundation; either version 2 of the License, or
19 * (at your option) any later version.
20 *
21 * libtrace is distributed in the hope that it will be useful,
22 * but WITHOUT ANY WARRANTY; without even the implied warranty of
23 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24 * GNU General Public License for more details.
25 *
26 * You should have received a copy of the GNU General Public License
27 * along with libtrace; if not, write to the Free Software
28 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
29 *
30 * $Id$
31 *
32 */
33
34#include "wandio.h"
35#include <sys/types.h>
36#include <sys/stat.h>
37#include <fcntl.h>
38#include <stdlib.h>
39#include <pthread.h>
40#include <string.h>
41#include <stdbool.h>
42
43/* Libtrace IO module implementing a threaded writer.
44 *
45 * This module enables another IO writer, called the "child", to perform its
46 * writing using a separate thread. The main thread writes data into a series
47 * of 1MB buffers. Meanwhile, the writing thread writes out of these buffers
48 * using the callback for the child reader. pthread conditions are used to
49 * communicate between the two threads, e.g. when there are buffers available
50 * for the main thread to copy data into or when there is data available for
51 * the write thread to write.
52 */
53
54/* 1MB Buffer */
55#define BUFFERSIZE (1024*1024)
56#define BUFFERS 100
57
58extern iow_source_t thread_wsource;
59
60/* This structure defines a single buffer or "slice" */
61struct buffer_t {
62        char buffer[BUFFERSIZE];        /* The buffer itself */
63        int len;                        /* The size of the buffer */
64        enum { EMPTY = 0, FULL = 1 } state;     /* Is the buffer in use? */
65};
66
67struct state_t {
68        /* The collection of buffers (or slices) */
69        struct buffer_t buffer[BUFFERS];
70        /* The write offset into the current buffer */
71        off_t offset;
72        /* The writing thread */
73        pthread_t consumer;
74        /* The child writer */
75        iow_t *iow;
76        /* Indicates that there is data in one of the buffers */
77        pthread_cond_t data_ready;
78        /* Indicates that there is a free buffer to write into */
79        pthread_cond_t space_avail;
80        /* The mutex for the write buffers */
81        pthread_mutex_t mutex;
82        /* The index of the buffer to write into next */
83        int out_buffer;
84        /* Indicates whether the main thread is concluding */
85        bool closing;
86};
87
88#define DATA(x) ((struct state_t *)((x)->data))
89#define OUTBUFFER(x) (DATA(x)->buffer[DATA(x)->out_buffer])
90#define min(a,b) ((a)<(b) ? (a) : (b))
91
92/* The writing thread */
93static void *thread_consumer(void *userdata)
94{
95        int buffer=0;
96        bool running = true;
97        iow_t *state = (iow_t *) userdata;
98
99        pthread_mutex_lock(&DATA(state)->mutex);
100        do {
101                /* Wait for data that we can write */
102                while (DATA(state)->buffer[buffer].state == EMPTY) {
103                        /* Unless, of course, the program is over! */
104                        if (DATA(state)->closing)
105                                break;
106                        pthread_cond_wait(&DATA(state)->data_ready,
107                                        &DATA(state)->mutex);
108                }
109               
110                /* Empty the buffer using the child writer */
111                pthread_mutex_unlock(&DATA(state)->mutex);
112                wandio_wwrite(
113                                DATA(state)->iow,
114                                DATA(state)->buffer[buffer].buffer,
115                                DATA(state)->buffer[buffer].len);
116                pthread_mutex_lock(&DATA(state)->mutex);
117
118                /* If we've not reached the end of the file keep going */
119                running = ( DATA(state)->buffer[buffer].len > 0 );
120                DATA(state)->buffer[buffer].len = 0;
121                DATA(state)->buffer[buffer].state = EMPTY;
122
123                /* Signal that we've freed up another buffer for the main
124                 * thread to copy data into */
125                pthread_cond_signal(&DATA(state)->space_avail);
126
127
128                /* Move on to the next buffer */
129                buffer=(buffer+1) % BUFFERS;
130
131        } while(running);
132
133        /* If we reach here, it's all over so start tidying up */
134        wandio_wdestroy(DATA(state)->iow);
135
136        pthread_mutex_unlock(&DATA(state)->mutex);
137        return NULL;
138}
139
140iow_t *thread_wopen(iow_t *child)
141{
142        iow_t *state;
143
144        if (!child) {
145                return NULL;
146        }
147       
148
149        state = malloc(sizeof(iow_t));
150        state->data = calloc(1,sizeof(struct state_t));
151        state->source = &thread_wsource;
152
153        DATA(state)->out_buffer = 0;
154        DATA(state)->offset = 0;
155        pthread_mutex_init(&DATA(state)->mutex,NULL);
156        pthread_cond_init(&DATA(state)->data_ready,NULL);
157        pthread_cond_init(&DATA(state)->space_avail,NULL);
158
159        DATA(state)->iow = child;
160        DATA(state)->closing = false;
161
162        /* Start the writer thread */
163        pthread_create(&DATA(state)->consumer,NULL,thread_consumer,state);
164
165        return state;
166}
167
168static off_t thread_wwrite(iow_t *state, const char *buffer, off_t len)
169{
170        int slice;
171        int copied=0;
172        int newbuffer;
173
174        pthread_mutex_lock(&DATA(state)->mutex);
175        while(len>0) {
176
177                /* Wait for there to be space available for us to write into */
178                while (OUTBUFFER(state).state == FULL) {
179                        pthread_cond_wait(&DATA(state)->space_avail,
180                                        &DATA(state)->mutex);
181                }
182
183                /* Copy out of our main buffer into the next available slice */
184                slice=min( 
185                        (off_t)sizeof(OUTBUFFER(state).buffer)-DATA(state)->offset,
186                        len);
187                               
188                pthread_mutex_unlock(&DATA(state)->mutex);
189                memcpy(
190                        OUTBUFFER(state).buffer+DATA(state)->offset,
191                        buffer,
192                        slice
193                        );
194                pthread_mutex_lock(&DATA(state)->mutex);
195
196                DATA(state)->offset += slice;
197                OUTBUFFER(state).len += slice;
198
199                buffer += slice;
200                len -= slice;
201                copied += slice;
202                newbuffer = DATA(state)->out_buffer;
203
204                /* If we've filled a buffer, move on to the next one and
205                 * signal to the write thread that there is something for it
206                 * to do */
207                if (DATA(state)->offset >= (off_t)sizeof(OUTBUFFER(state).buffer)) {
208                        OUTBUFFER(state).state = FULL;
209                        pthread_cond_signal(&DATA(state)->data_ready);
210                        DATA(state)->offset = 0;
211                        newbuffer = (newbuffer+1) % BUFFERS;
212                }
213
214                DATA(state)->out_buffer = newbuffer;
215        }
216
217        pthread_mutex_unlock(&DATA(state)->mutex);
218        return copied;
219}
220
221static void thread_wclose(iow_t *iow)
222{
223        pthread_mutex_lock(&DATA(iow)->mutex);
224        DATA(iow)->closing = true;
225        pthread_cond_signal(&DATA(iow)->data_ready);
226        pthread_mutex_unlock(&DATA(iow)->mutex);
227        pthread_join(DATA(iow)->consumer,NULL);
228        free(iow->data);
229        free(iow);
230}
231
232iow_source_t thread_wsource = {
233        "threadw",
234        thread_wwrite,
235        thread_wclose
236};
Note: See TracBrowser for help on using the repository browser.