source: lib/ior-thread.c @ b9dd49a

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivegetfragoffhelplibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since b9dd49a was b9dd49a, checked in by Perry Lorier <perry@…>, 12 years ago

Name threads on linux so we can tell them apart.

  • Property mode set to 100644
File size: 7.2 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Authors: Daniel Lawson
8 *          Perry Lorier
9 *          Shane Alcock
10 *         
11 * All rights reserved.
12 *
13 * This code has been developed by the University of Waikato WAND
14 * research group. For further information please see http://www.wand.net.nz/
15 *
16 * libtrace is free software; you can redistribute it and/or modify
17 * it under the terms of the GNU General Public License as published by
18 * the Free Software Foundation; either version 2 of the License, or
19 * (at your option) any later version.
20 *
21 * libtrace is distributed in the hope that it will be useful,
22 * but WITHOUT ANY WARRANTY; without even the implied warranty of
23 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24 * GNU General Public License for more details.
25 *
26 * You should have received a copy of the GNU General Public License
27 * along with libtrace; if not, write to the Free Software
28 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
29 *
30 * $Id$
31 *
32 */
33
34
35#include "wandio.h"
36#include "config.h"
37#include <sys/types.h>
38#include <sys/stat.h>
39#include <fcntl.h>
40#include <stdlib.h>
41#include <pthread.h>
42#include <string.h>
43#include <stdbool.h>
44#include <errno.h>
45#ifdef HAVE_SYS_PRCTL_H
46#include <sys/prctl.h>
47#endif
48
49/* Libtrace IO module implementing a threaded reader.
50 *
51 * This module enables another IO reader, called the "parent", to perform its
52 * reading using a separate thread. The reading thread reads data into a
53 * series of 1MB buffers. Once all the buffers are full, it waits for the
54 * main thread to free up some of the buffers by consuming data from them. The
55 * reading thread also uses a pthread condition to indicate to the main thread
56 * that there is data available in the buffers.
57 */
58
59/* 1MB Buffer */
60#define BUFFERSIZE (1024*1024)
61#define BUFFERS 100
62
63extern io_source_t thread_source;
64
65/* This structure defines a single buffer or "slice" */
66struct buffer_t {
67        char buffer[BUFFERSIZE];        /* The buffer itself */
68        int len;                        /* The size of the buffer */
69        enum { EMPTY = 0, FULL = 1 } state;     /* Is the buffer in use? */
70};
71
72struct state_t {
73        /* The collection of buffers (or slices) */
74        struct buffer_t buffer[BUFFERS];
75        /* The index of the buffer to read into next */
76        int in_buffer;
77        /* The read offset into the current buffer */
78        int offset;
79        /* The reading thread */
80        pthread_t producer;
81        /* Indicates that there is a free buffer to read into */
82        pthread_cond_t space_avail;
83        /* Indicates that there is data in one of the buffers */
84        pthread_cond_t data_ready;
85        /* The mutex for the read buffers */
86        pthread_mutex_t mutex;
87        /* The parent reader */
88        io_t *io;
89        /* Indicates whether the main thread is concluding */
90        bool closing;
91};
92
93#define DATA(x) ((struct state_t *)((x)->data))
94#define INBUFFER(x) (DATA(x)->buffer[DATA(x)->in_buffer])
95#define min(a,b) ((a)<(b) ? (a) : (b))
96
97/* The reading thread */
98static void *thread_producer(void* userdata)
99{
100        io_t *state = (io_t*) userdata;
101        int buffer=0;
102        bool running = true;
103        char namebuf[17];
104
105#ifdef PR_SET_NAME
106        if (prctl(PR_GET_NAME, namebuf, 0,0,0) == 0) {
107                namebuf[16] = '\0'; /* Make sure it's NUL terminated */
108                /* If the filename is too long, overwrite the last few bytes */
109                if (strlen(namebuf)>9) {
110                        strcpy(namebuf+10,"[ior]");
111                }
112                else {
113                        strncat(namebuf," [ior]",16);
114                }
115                prctl(PR_SET_NAME, namebuf, 0,0,0);
116        }
117#endif
118
119        pthread_mutex_lock(&DATA(state)->mutex);
120        do {
121                /* If all the buffers are full, we need to wait for one to
122                 * become free otherwise we have nowhere to write to! */
123                while (DATA(state)->buffer[buffer].state == FULL) {
124                        if (DATA(state)->closing)
125                                break;
126                        pthread_cond_wait(&DATA(state)->space_avail, &DATA(state)->mutex);
127                }
128
129                /* Don't bother reading any more data if we are shutting up
130                 * shop */
131                if (DATA(state)->closing) {
132                        break;
133                }
134                pthread_mutex_unlock(&DATA(state)->mutex);
135
136                /* Get the parent reader to fill the buffer */
137                DATA(state)->buffer[buffer].len=wandio_read(
138                                DATA(state)->io,
139                                DATA(state)->buffer[buffer].buffer,
140                                sizeof(DATA(state)->buffer[buffer].buffer));
141
142                pthread_mutex_lock(&DATA(state)->mutex);
143
144                DATA(state)->buffer[buffer].state = FULL;
145
146                /* If we've not reached the end of the file keep going */
147                running = (DATA(state)->buffer[buffer].len > 0 );
148
149                /* Signal that there is data available for the main thread */
150                pthread_cond_signal(&DATA(state)->data_ready);
151
152                /* Move on to the next buffer */
153                buffer=(buffer+1) % BUFFERS;
154
155        } while(running);
156
157        /* If we reach here, it's all over so start tidying up */
158        wandio_destroy(DATA(state)->io);
159
160        pthread_cond_signal(&DATA(state)->data_ready);
161        pthread_mutex_unlock(&DATA(state)->mutex);
162
163        return NULL;
164}
165
166io_t *thread_open(io_t *parent)
167{
168        io_t *state;
169
170        if (!parent) {
171                return NULL;
172        }
173       
174
175        state = malloc(sizeof(io_t));
176        state->data = calloc(1,sizeof(struct state_t));
177        state->source = &thread_source;
178
179        DATA(state)->in_buffer = 0;
180        DATA(state)->offset = 0;
181        pthread_mutex_init(&DATA(state)->mutex,NULL);
182        pthread_cond_init(&DATA(state)->data_ready,NULL);
183        pthread_cond_init(&DATA(state)->space_avail,NULL);
184
185        DATA(state)->io = parent;
186        DATA(state)->closing = false;
187
188        /* Create the reading thread */
189        pthread_create(&DATA(state)->producer,NULL,thread_producer,state);
190
191        return state;
192}
193
194static off_t thread_read(io_t *state, void *buffer, off_t len)
195{
196        int slice;
197        int copied=0;
198        int newbuffer;
199
200        while(len>0) {
201                pthread_mutex_lock(&DATA(state)->mutex);
202               
203                /* Wait for the reader thread to provide us with some data */
204                while (INBUFFER(state).state == EMPTY) {
205                        pthread_cond_wait(&DATA(state)->data_ready, &DATA(state)->mutex);
206
207                }
208               
209                /* Check for errors and EOF */
210                if (INBUFFER(state).len <1) {
211
212                        if (copied<1) {
213                                errno=EIO; /* FIXME: Preserve the errno from the other thread */
214                                copied = INBUFFER(state).len;
215                        }
216
217                        pthread_mutex_unlock(&DATA(state)->mutex);
218                        return copied;
219                }
220
221                /* Copy the next available slice into the main buffer */
222                slice=min( INBUFFER(state).len-DATA(state)->offset,len);
223
224                pthread_mutex_unlock(&DATA(state)->mutex);
225                               
226                memcpy(
227                        buffer,
228                        INBUFFER(state).buffer+DATA(state)->offset,
229                        slice
230                        );
231
232                buffer+=slice;
233                len-=slice;
234                copied+=slice;
235
236                pthread_mutex_lock(&DATA(state)->mutex);
237                DATA(state)->offset+=slice;
238                newbuffer = DATA(state)->in_buffer;
239               
240                /* If we've read everything from the current slice, let the
241                 * read thread know that there is now more space available
242                 * and start reading from the next slice */
243                if (DATA(state)->offset >= INBUFFER(state).len) {
244                        INBUFFER(state).state = EMPTY;
245                        pthread_cond_signal(&DATA(state)->space_avail);
246                        newbuffer = (newbuffer+1) % BUFFERS;
247                        DATA(state)->offset = 0;
248                }
249
250                pthread_mutex_unlock(&DATA(state)->mutex);
251
252                DATA(state)->in_buffer = newbuffer;
253        }
254        return copied;
255}
256
257static void thread_close(io_t *io)
258{
259        pthread_mutex_lock(&DATA(io)->mutex);
260        DATA(io)->closing = true;
261        pthread_cond_signal(&DATA(io)->space_avail);
262        pthread_mutex_unlock(&DATA(io)->mutex);
263
264        /* Wait for the thread to exit */
265        pthread_join(DATA(io)->producer, NULL);
266        free(DATA(io));
267        free(io);
268}
269
270io_source_t thread_source = {
271        "thread",
272        thread_read,
273        NULL,   /* peek */
274        NULL,   /* tell */
275        NULL,   /* seek */
276        thread_close
277};
Note: See TracBrowser for help on using the repository browser.