source: libwandio/ior-thread.c @ cd37cdc

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since cd37cdc was cd37cdc, checked in by Shane Alcock <salcock@…>, 6 years ago

Block all signals in libwandio threads.

It is probably better for the main processing thread(s) to do
signal handling, rather than libwandio threads. This fixes a potential
race condition reported by helgrind for tracestats (and presumably
other tools) where the libwandio thread could modify the 'done' flag
while the main thread was checking it.

  • Property mode set to 100644
File size: 7.7 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Authors: Daniel Lawson
8 *          Perry Lorier
9 *          Shane Alcock
10 *         
11 * All rights reserved.
12 *
13 * This code has been developed by the University of Waikato WAND
14 * research group. For further information please see http://www.wand.net.nz/
15 *
16 * libtrace is free software; you can redistribute it and/or modify
17 * it under the terms of the GNU General Public License as published by
18 * the Free Software Foundation; either version 2 of the License, or
19 * (at your option) any later version.
20 *
21 * libtrace is distributed in the hope that it will be useful,
22 * but WITHOUT ANY WARRANTY; without even the implied warranty of
23 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24 * GNU General Public License for more details.
25 *
26 * You should have received a copy of the GNU General Public License
27 * along with libtrace; if not, write to the Free Software
28 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
29 *
30 * $Id$
31 *
32 */
33
34
35#include "config.h"
36#include "wandio_internal.h"
37#include "wandio.h"
38#include <sys/types.h>
39#include <sys/stat.h>
40#include <fcntl.h>
41#include <stdlib.h>
42#include <pthread.h>
43#include <string.h>
44#include <stdbool.h>
45#include <errno.h>
46#include <signal.h>
47#ifdef HAVE_SYS_PRCTL_H
48#include <sys/prctl.h>
49#endif
50
51/* Libtrace IO module implementing a threaded reader.
52 *
53 * This module enables another IO reader, called the "parent", to perform its
54 * reading using a separate thread. The reading thread reads data into a
55 * series of 1MB buffers. Once all the buffers are full, it waits for the
56 * main thread to free up some of the buffers by consuming data from them. The
57 * reading thread also uses a pthread condition to indicate to the main thread
58 * that there is data available in the buffers.
59 */
60
61/* 1MB Buffer */
62#define BUFFERSIZE (1024*1024)
63
64extern io_source_t thread_source;
65
66/* This structure defines a single buffer or "slice" */
67struct buffer_t {
68        char buffer[BUFFERSIZE];        /* The buffer itself */
69        int len;                        /* The size of the buffer */
70        enum { EMPTY = 0, FULL = 1 } state;     /* Is the buffer in use? */
71};
72
73struct state_t {
74        /* The collection of buffers (or slices) */
75        struct buffer_t *buffer;
76        /* The index of the buffer to read into next */
77        int in_buffer;
78        /* The read offset into the current buffer */
79        off_t offset;
80        /* The reading thread */
81        pthread_t producer;
82        /* Indicates that there is a free buffer to read into */
83        pthread_cond_t space_avail;
84        /* Indicates that there is data in one of the buffers */
85        pthread_cond_t data_ready;
86        /* The mutex for the read buffers */
87        pthread_mutex_t mutex;
88        /* The parent reader */
89        io_t *io;
90        /* Indicates whether the main thread is concluding */
91        bool closing;
92};
93
94#define DATA(x) ((struct state_t *)((x)->data))
95#define INBUFFER(x) (DATA(x)->buffer[DATA(x)->in_buffer])
96#define min(a,b) ((a)<(b) ? (a) : (b))
97
98/* The reading thread */
99static void *thread_producer(void* userdata)
100{
101        io_t *state = (io_t*) userdata;
102        int buffer=0;
103        bool running = true;
104
105#ifdef PR_SET_NAME
106        char namebuf[17];
107        if (prctl(PR_GET_NAME, namebuf, 0,0,0) == 0) {
108                namebuf[16] = '\0'; /* Make sure it's NUL terminated */
109                /* If the filename is too long, overwrite the last few bytes */
110                if (strlen(namebuf)>9) {
111                        strcpy(namebuf+10,"[ior]");
112                }
113                else {
114                        strncat(namebuf," [ior]",16);
115                }
116                prctl(PR_SET_NAME, namebuf, 0,0,0);
117        }
118#endif
119
120        pthread_mutex_lock(&DATA(state)->mutex);
121        do {
122                /* If all the buffers are full, we need to wait for one to
123                 * become free otherwise we have nowhere to write to! */
124                while (DATA(state)->buffer[buffer].state == FULL) {
125                        if (DATA(state)->closing)
126                                break;
127                        pthread_cond_wait(&DATA(state)->space_avail, &DATA(state)->mutex);
128                }
129
130                /* Don't bother reading any more data if we are shutting up
131                 * shop */
132                if (DATA(state)->closing) {
133                        break;
134                }
135                pthread_mutex_unlock(&DATA(state)->mutex);
136
137                /* Get the parent reader to fill the buffer */
138                DATA(state)->buffer[buffer].len=wandio_read(
139                                DATA(state)->io,
140                                DATA(state)->buffer[buffer].buffer,
141                                sizeof(DATA(state)->buffer[buffer].buffer));
142
143                pthread_mutex_lock(&DATA(state)->mutex);
144
145                DATA(state)->buffer[buffer].state = FULL;
146
147                /* If we've not reached the end of the file keep going */
148                running = (DATA(state)->buffer[buffer].len > 0 );
149
150                /* Signal that there is data available for the main thread */
151                pthread_cond_signal(&DATA(state)->data_ready);
152
153                /* Move on to the next buffer */
154                buffer=(buffer+1) % max_buffers;
155
156        } while(running);
157
158        /* If we reach here, it's all over so start tidying up */
159        wandio_destroy(DATA(state)->io);
160
161        pthread_cond_signal(&DATA(state)->data_ready);
162        pthread_mutex_unlock(&DATA(state)->mutex);
163
164        return NULL;
165}
166
167io_t *thread_open(io_t *parent)
168{
169        io_t *state;
170        sigset_t set;
171        int s;
172
173        if (!parent) {
174                return NULL;
175        }
176       
177        sigfillset(&set);
178        state = malloc(sizeof(io_t));
179        state->data = calloc(1,sizeof(struct state_t));
180        state->source = &thread_source;
181
182        DATA(state)->buffer = (struct buffer_t *)malloc(sizeof(struct buffer_t) * max_buffers);
183        memset(DATA(state)->buffer, 0, sizeof(struct buffer_t) * max_buffers);
184        DATA(state)->in_buffer = 0;
185        DATA(state)->offset = 0;
186        pthread_mutex_init(&DATA(state)->mutex,NULL);
187        pthread_cond_init(&DATA(state)->data_ready,NULL);
188        pthread_cond_init(&DATA(state)->space_avail,NULL);
189
190        DATA(state)->io = parent;
191        DATA(state)->closing = false;
192
193        /* Create the reading thread */
194        s = pthread_sigmask(SIG_SETMASK, &set, NULL);
195        pthread_create(&DATA(state)->producer,NULL,thread_producer,state);
196        sigemptyset(&set);
197        s = pthread_sigmask(SIG_SETMASK, &set, NULL);
198
199        return state;
200}
201
202static off_t thread_read(io_t *state, void *buffer, off_t len)
203{
204        int slice;
205        int copied=0;
206        int newbuffer;
207
208        while(len>0) {
209                pthread_mutex_lock(&DATA(state)->mutex);
210               
211                /* Wait for the reader thread to provide us with some data */
212                while (INBUFFER(state).state == EMPTY) {
213                        ++read_waits;
214                        pthread_cond_wait(&DATA(state)->data_ready, &DATA(state)->mutex);
215
216                }
217               
218                /* Check for errors and EOF */
219                if (INBUFFER(state).len <1) {
220
221                        if (copied<1) {
222                                errno=EIO; /* FIXME: Preserve the errno from the other thread */
223                                copied = INBUFFER(state).len;
224                        }
225
226                        pthread_mutex_unlock(&DATA(state)->mutex);
227                        return copied;
228                }
229
230                /* Copy the next available slice into the main buffer */
231                slice=min( INBUFFER(state).len-DATA(state)->offset,len);
232
233                pthread_mutex_unlock(&DATA(state)->mutex);
234                               
235                memcpy(
236                        buffer,
237                        INBUFFER(state).buffer+DATA(state)->offset,
238                        slice
239                        );
240
241                buffer+=slice;
242                len-=slice;
243                copied+=slice;
244
245                pthread_mutex_lock(&DATA(state)->mutex);
246                DATA(state)->offset+=slice;
247                newbuffer = DATA(state)->in_buffer;
248               
249                /* If we've read everything from the current slice, let the
250                 * read thread know that there is now more space available
251                 * and start reading from the next slice */
252                if (DATA(state)->offset >= INBUFFER(state).len) {
253                        INBUFFER(state).state = EMPTY;
254                        pthread_cond_signal(&DATA(state)->space_avail);
255                        newbuffer = (newbuffer+1) % max_buffers;
256                        DATA(state)->offset = 0;
257                }
258
259                pthread_mutex_unlock(&DATA(state)->mutex);
260
261                DATA(state)->in_buffer = newbuffer;
262        }
263        return copied;
264}
265
266static void thread_close(io_t *io)
267{
268        pthread_mutex_lock(&DATA(io)->mutex);
269        DATA(io)->closing = true;
270        pthread_cond_signal(&DATA(io)->space_avail);
271        pthread_mutex_unlock(&DATA(io)->mutex);
272
273        /* Wait for the thread to exit */
274        pthread_join(DATA(io)->producer, NULL);
275       
276        pthread_mutex_destroy(&DATA(io)->mutex);
277        pthread_cond_destroy(&DATA(io)->space_avail);
278        pthread_cond_destroy(&DATA(io)->data_ready);
279       
280        free(DATA(io)->buffer);
281        free(DATA(io));
282        free(io);
283}
284
285io_source_t thread_source = {
286        "thread",
287        thread_read,
288        NULL,   /* peek */
289        NULL,   /* tell */
290        NULL,   /* seek */
291        thread_close
292};
Note: See TracBrowser for help on using the repository browser.