source: lib/ior-thread.c @ 8414770

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivegetfragoffhelplibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 8414770 was 8414770, checked in by Shane Alcock <salcock@…>, 12 years ago
  • Updated licensing and documentation for all the IO reader modules
  • Property mode set to 100644
File size: 6.8 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Authors: Daniel Lawson
8 *          Perry Lorier
9 *          Shane Alcock
10 *         
11 * All rights reserved.
12 *
13 * This code has been developed by the University of Waikato WAND
14 * research group. For further information please see http://www.wand.net.nz/
15 *
16 * libtrace is free software; you can redistribute it and/or modify
17 * it under the terms of the GNU General Public License as published by
18 * the Free Software Foundation; either version 2 of the License, or
19 * (at your option) any later version.
20 *
21 * libtrace is distributed in the hope that it will be useful,
22 * but WITHOUT ANY WARRANTY; without even the implied warranty of
23 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24 * GNU General Public License for more details.
25 *
26 * You should have received a copy of the GNU General Public License
27 * along with libtrace; if not, write to the Free Software
28 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
29 *
30 * $Id: format_erf.c 1517 2010-02-08 01:11:04Z salcock $
31 *
32 */
33
34
35#include "wandio.h"
36#include <sys/types.h>
37#include <sys/stat.h>
38#include <fcntl.h>
39#include <stdlib.h>
40#include <pthread.h>
41#include <string.h>
42#include <stdbool.h>
43#include <errno.h>
44
45/* Libtrace IO module implementing a threaded reader.
46 *
47 * This module enables another IO reader, called the "parent", to perform its
48 * reading using a separate thread. The reading thread reads data into a
49 * series of 1MB buffers. Once all the buffers are full, it waits for the
50 * main thread to free up some of the buffers by consuming data from them. The
51 * reading thread also uses a pthread condition to indicate to the main thread
52 * that there is data available in the buffers.
53 */
54
55/* 1MB Buffer */
56#define BUFFERSIZE (1024*1024)
57#define BUFFERS 100
58
59extern io_source_t thread_source;
60
61/* This structure defines a single buffer or "slice" */
62struct buffer_t {
63        char buffer[BUFFERSIZE];        /* The buffer itself */
64        int len;                        /* The size of the buffer */
65        enum { EMPTY = 0, FULL = 1 } state;     /* Is the buffer in use? */
66};
67
68struct state_t {
69        /* The collection of buffers (or slices) */
70        struct buffer_t buffer[BUFFERS];
71        /* The index of the buffer to read into next */
72        int in_buffer;
73        /* The read offset into the current buffer */
74        int offset;
75        /* The reading thread */
76        pthread_t producer;
77        /* Indicates that there is a free buffer to read into */
78        pthread_cond_t space_avail;
79        /* Indicates that there is data in one of the buffers */
80        pthread_cond_t data_ready;
81        /* The mutex for the read buffers */
82        pthread_mutex_t mutex;
83        /* The parent reader */
84        io_t *io;
85        /* Indicates whether the main thread is concluding */
86        bool closing;
87};
88
89#define DATA(x) ((struct state_t *)((x)->data))
90#define INBUFFER(x) (DATA(x)->buffer[DATA(x)->in_buffer])
91#define min(a,b) ((a)<(b) ? (a) : (b))
92
93/* The reading thread */
94static void *thread_producer(void* userdata)
95{
96        io_t *state = (io_t*) userdata;
97        int buffer=0;
98        bool running = true;
99
100        pthread_mutex_lock(&DATA(state)->mutex);
101        do {
102                /* If all the buffers are full, we need to wait for one to
103                 * become free otherwise we have nowhere to write to! */
104                while (DATA(state)->buffer[buffer].state == FULL) {
105                        if (DATA(state)->closing)
106                                break;
107                        pthread_cond_wait(&DATA(state)->space_avail, &DATA(state)->mutex);
108                }
109
110                /* Don't bother reading any more data if we are shutting up
111                 * shop */
112                if (DATA(state)->closing) {
113                        break;
114                }
115                pthread_mutex_unlock(&DATA(state)->mutex);
116
117                /* Get the parent reader to fill the buffer */
118                DATA(state)->buffer[buffer].len=wandio_read(
119                                DATA(state)->io,
120                                DATA(state)->buffer[buffer].buffer,
121                                sizeof(DATA(state)->buffer[buffer].buffer));
122
123                pthread_mutex_lock(&DATA(state)->mutex);
124
125                DATA(state)->buffer[buffer].state = FULL;
126
127                /* If we've not reached the end of the file keep going */
128                running = (DATA(state)->buffer[buffer].len > 0 );
129
130                /* Signal that there is data available for the main thread */
131                pthread_cond_signal(&DATA(state)->data_ready);
132
133                /* Move on to the next buffer */
134                buffer=(buffer+1) % BUFFERS;
135
136        } while(running);
137
138        /* If we reach here, it's all over so start tidying up */
139        wandio_destroy(DATA(state)->io);
140
141        pthread_cond_signal(&DATA(state)->data_ready);
142        pthread_mutex_unlock(&DATA(state)->mutex);
143
144        return NULL;
145}
146
147io_t *thread_open(io_t *parent)
148{
149        io_t *state;
150
151        if (!parent) {
152                return NULL;
153        }
154       
155
156        state = malloc(sizeof(io_t));
157        state->data = calloc(1,sizeof(struct state_t));
158        state->source = &thread_source;
159
160        DATA(state)->in_buffer = 0;
161        DATA(state)->offset = 0;
162        pthread_mutex_init(&DATA(state)->mutex,NULL);
163        pthread_cond_init(&DATA(state)->data_ready,NULL);
164        pthread_cond_init(&DATA(state)->space_avail,NULL);
165
166        DATA(state)->io = parent;
167        DATA(state)->closing = false;
168
169        /* Create the reading thread */
170        pthread_create(&DATA(state)->producer,NULL,thread_producer,state);
171
172        return state;
173}
174
175static off_t thread_read(io_t *state, void *buffer, off_t len)
176{
177        int slice;
178        int copied=0;
179        int newbuffer;
180
181        while(len>0) {
182                pthread_mutex_lock(&DATA(state)->mutex);
183               
184                /* Wait for the reader thread to provide us with some data */
185                while (INBUFFER(state).state == EMPTY) {
186                        pthread_cond_wait(&DATA(state)->data_ready, &DATA(state)->mutex);
187
188                }
189               
190                /* Check for errors and EOF */
191                if (INBUFFER(state).len <1) {
192
193                        if (copied<1) {
194                                errno=EIO; /* FIXME: Preserve the errno from the other thread */
195                                copied = INBUFFER(state).len;
196                        }
197
198                        pthread_mutex_unlock(&DATA(state)->mutex);
199                        return copied;
200                }
201
202                /* Copy the next available slice into the main buffer */
203                slice=min( INBUFFER(state).len-DATA(state)->offset,len);
204
205                pthread_mutex_unlock(&DATA(state)->mutex);
206                               
207                memcpy(
208                        buffer,
209                        INBUFFER(state).buffer+DATA(state)->offset,
210                        slice
211                        );
212
213                buffer+=slice;
214                len-=slice;
215                copied+=slice;
216
217                pthread_mutex_lock(&DATA(state)->mutex);
218                DATA(state)->offset+=slice;
219                newbuffer = DATA(state)->in_buffer;
220               
221                /* If we've read everything from the current slice, let the
222                 * read thread know that there is now more space available
223                 * and start reading from the next slice */
224                if (DATA(state)->offset >= INBUFFER(state).len) {
225                        INBUFFER(state).state = EMPTY;
226                        pthread_cond_signal(&DATA(state)->space_avail);
227                        newbuffer = (newbuffer+1) % BUFFERS;
228                        DATA(state)->offset = 0;
229                }
230
231                pthread_mutex_unlock(&DATA(state)->mutex);
232
233                DATA(state)->in_buffer = newbuffer;
234        }
235        return copied;
236}
237
238static void thread_close(io_t *io)
239{
240        pthread_mutex_lock(&DATA(io)->mutex);
241        DATA(io)->closing = true;
242        pthread_cond_signal(&DATA(io)->space_avail);
243        pthread_mutex_unlock(&DATA(io)->mutex);
244
245        /* Wait for the thread to exit */
246        pthread_join(DATA(io)->producer, NULL);
247        free(DATA(io));
248        free(io);
249}
250
251io_source_t thread_source = {
252        "thread",
253        thread_read,
254        NULL,   /* peek */
255        NULL,   /* tell */
256        NULL,   /* seek */
257        thread_close
258};
Note: See TracBrowser for help on using the repository browser.