source: libwandio/ior-thread.c @ 954577b9

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivegetfragoffhelplibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 954577b9 was 954577b9, checked in by Shane Alcock <salcock@…>, 8 years ago
  • Tidy up some of the threading data structures in libwandio whenever we exit
  • Property mode set to 100644
File size: 7.5 KB
RevLine 
[8414770]1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Authors: Daniel Lawson
8 *          Perry Lorier
9 *          Shane Alcock
10 *         
11 * All rights reserved.
12 *
13 * This code has been developed by the University of Waikato WAND
14 * research group. For further information please see http://www.wand.net.nz/
15 *
16 * libtrace is free software; you can redistribute it and/or modify
17 * it under the terms of the GNU General Public License as published by
18 * the Free Software Foundation; either version 2 of the License, or
19 * (at your option) any later version.
20 *
21 * libtrace is distributed in the hope that it will be useful,
22 * but WITHOUT ANY WARRANTY; without even the implied warranty of
23 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24 * GNU General Public License for more details.
25 *
26 * You should have received a copy of the GNU General Public License
27 * along with libtrace; if not, write to the Free Software
28 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
29 *
[22a9ccc]30 * $Id$
[8414770]31 *
32 */
33
34
[b9dd49a]35#include "config.h"
[ef07202]36#include "wandio.h"
[c66068d]37#include <sys/types.h>
38#include <sys/stat.h>
39#include <fcntl.h>
40#include <stdlib.h>
41#include <pthread.h>
42#include <string.h>
43#include <stdbool.h>
[7f2612c]44#include <errno.h>
[b9dd49a]45#ifdef HAVE_SYS_PRCTL_H
46#include <sys/prctl.h>
47#endif
[c66068d]48
[8414770]49/* Libtrace IO module implementing a threaded reader.
50 *
51 * This module enables another IO reader, called the "parent", to perform its
52 * reading using a separate thread. The reading thread reads data into a
53 * series of 1MB buffers. Once all the buffers are full, it waits for the
54 * main thread to free up some of the buffers by consuming data from them. The
55 * reading thread also uses a pthread condition to indicate to the main thread
56 * that there is data available in the buffers.
57 */
58
[c66068d]59/* 1MB Buffer */
60#define BUFFERSIZE (1024*1024)
61
62extern io_source_t thread_source;
63
[8414770]64/* This structure defines a single buffer or "slice" */
[c66068d]65struct buffer_t {
[8414770]66        char buffer[BUFFERSIZE];        /* The buffer itself */
67        int len;                        /* The size of the buffer */
68        enum { EMPTY = 0, FULL = 1 } state;     /* Is the buffer in use? */
[c66068d]69};
70
71struct state_t {
[8414770]72        /* The collection of buffers (or slices) */
[f258932]73        struct buffer_t *buffer;
[8414770]74        /* The index of the buffer to read into next */
[c66068d]75        int in_buffer;
[8414770]76        /* The read offset into the current buffer */
[948ed9a]77        off_t offset;
[8414770]78        /* The reading thread */
[c66068d]79        pthread_t producer;
[8414770]80        /* Indicates that there is a free buffer to read into */
[cf30639]81        pthread_cond_t space_avail;
[8414770]82        /* Indicates that there is data in one of the buffers */
[cf30639]83        pthread_cond_t data_ready;
[8414770]84        /* The mutex for the read buffers */
[cf30639]85        pthread_mutex_t mutex;
[8414770]86        /* The parent reader */
[c66068d]87        io_t *io;
[8414770]88        /* Indicates whether the main thread is concluding */
[15e9390]89        bool closing;
[c66068d]90};
91
92#define DATA(x) ((struct state_t *)((x)->data))
93#define INBUFFER(x) (DATA(x)->buffer[DATA(x)->in_buffer])
94#define min(a,b) ((a)<(b) ? (a) : (b))
95
[8414770]96/* The reading thread */
[c66068d]97static void *thread_producer(void* userdata)
98{
99        io_t *state = (io_t*) userdata;
100        int buffer=0;
101        bool running = true;
[b9dd49a]102
103#ifdef PR_SET_NAME
[06586ba]104        char namebuf[17];
[b9dd49a]105        if (prctl(PR_GET_NAME, namebuf, 0,0,0) == 0) {
106                namebuf[16] = '\0'; /* Make sure it's NUL terminated */
107                /* If the filename is too long, overwrite the last few bytes */
108                if (strlen(namebuf)>9) {
109                        strcpy(namebuf+10,"[ior]");
110                }
111                else {
112                        strncat(namebuf," [ior]",16);
113                }
114                prctl(PR_SET_NAME, namebuf, 0,0,0);
115        }
116#endif
[c66068d]117
[cf30639]118        pthread_mutex_lock(&DATA(state)->mutex);
[c66068d]119        do {
[8414770]120                /* If all the buffers are full, we need to wait for one to
121                 * become free otherwise we have nowhere to write to! */
[c66068d]122                while (DATA(state)->buffer[buffer].state == FULL) {
123                        if (DATA(state)->closing)
124                                break;
[cf30639]125                        pthread_cond_wait(&DATA(state)->space_avail, &DATA(state)->mutex);
126                }
127
[8414770]128                /* Don't bother reading any more data if we are shutting up
129                 * shop */
[cf30639]130                if (DATA(state)->closing) {
131                        break;
[c66068d]132                }
[cf30639]133                pthread_mutex_unlock(&DATA(state)->mutex);
134
[8414770]135                /* Get the parent reader to fill the buffer */
[c66068d]136                DATA(state)->buffer[buffer].len=wandio_read(
137                                DATA(state)->io,
138                                DATA(state)->buffer[buffer].buffer,
139                                sizeof(DATA(state)->buffer[buffer].buffer));
140
[cf30639]141                pthread_mutex_lock(&DATA(state)->mutex);
142
[c66068d]143                DATA(state)->buffer[buffer].state = FULL;
144
[8414770]145                /* If we've not reached the end of the file keep going */
[c66068d]146                running = (DATA(state)->buffer[buffer].len > 0 );
147
[8414770]148                /* Signal that there is data available for the main thread */
[cf30639]149                pthread_cond_signal(&DATA(state)->data_ready);
[c66068d]150
[8414770]151                /* Move on to the next buffer */
[f258932]152                buffer=(buffer+1) % max_buffers;
[c66068d]153
154        } while(running);
155
[8414770]156        /* If we reach here, it's all over so start tidying up */
[c66068d]157        wandio_destroy(DATA(state)->io);
158
[cf30639]159        pthread_cond_signal(&DATA(state)->data_ready);
160        pthread_mutex_unlock(&DATA(state)->mutex);
161
[c66068d]162        return NULL;
163}
164
165io_t *thread_open(io_t *parent)
166{
167        io_t *state;
168
169        if (!parent) {
170                return NULL;
171        }
172       
173
174        state = malloc(sizeof(io_t));
175        state->data = calloc(1,sizeof(struct state_t));
176        state->source = &thread_source;
177
[f258932]178        DATA(state)->buffer = (struct buffer_t *)malloc(sizeof(struct buffer_t) * max_buffers);
[2cf2c2d]179        memset(DATA(state)->buffer, 0, sizeof(struct buffer_t) * max_buffers);
[c66068d]180        DATA(state)->in_buffer = 0;
181        DATA(state)->offset = 0;
[cf30639]182        pthread_mutex_init(&DATA(state)->mutex,NULL);
183        pthread_cond_init(&DATA(state)->data_ready,NULL);
184        pthread_cond_init(&DATA(state)->space_avail,NULL);
[c66068d]185
186        DATA(state)->io = parent;
187        DATA(state)->closing = false;
188
[8414770]189        /* Create the reading thread */
[c66068d]190        pthread_create(&DATA(state)->producer,NULL,thread_producer,state);
191
192        return state;
193}
194
[948ed9a]195static off_t thread_read(io_t *state, void *buffer, off_t len)
[c66068d]196{
197        int slice;
198        int copied=0;
199        int newbuffer;
200
201        while(len>0) {
[cf30639]202                pthread_mutex_lock(&DATA(state)->mutex);
[8414770]203               
204                /* Wait for the reader thread to provide us with some data */
[c66068d]205                while (INBUFFER(state).state == EMPTY) {
[29d4438]206                        ++read_waits;
[cf30639]207                        pthread_cond_wait(&DATA(state)->data_ready, &DATA(state)->mutex);
[c66068d]208
209                }
[8414770]210               
211                /* Check for errors and EOF */
[c66068d]212                if (INBUFFER(state).len <1) {
213
[4b6e1b2]214                        if (copied<1) {
215                                errno=EIO; /* FIXME: Preserve the errno from the other thread */
[c66068d]216                                copied = INBUFFER(state).len;
[4b6e1b2]217                        }
[c66068d]218
[cf30639]219                        pthread_mutex_unlock(&DATA(state)->mutex);
[c66068d]220                        return copied;
221                }
222
[8414770]223                /* Copy the next available slice into the main buffer */
[c66068d]224                slice=min( INBUFFER(state).len-DATA(state)->offset,len);
[cf30639]225
226                pthread_mutex_unlock(&DATA(state)->mutex);
[c66068d]227                               
228                memcpy(
229                        buffer,
230                        INBUFFER(state).buffer+DATA(state)->offset,
231                        slice
232                        );
233
234                buffer+=slice;
235                len-=slice;
236                copied+=slice;
[cf30639]237
238                pthread_mutex_lock(&DATA(state)->mutex);
[c66068d]239                DATA(state)->offset+=slice;
240                newbuffer = DATA(state)->in_buffer;
[8414770]241               
242                /* If we've read everything from the current slice, let the
243                 * read thread know that there is now more space available
244                 * and start reading from the next slice */
[c66068d]245                if (DATA(state)->offset >= INBUFFER(state).len) {
246                        INBUFFER(state).state = EMPTY;
[cf30639]247                        pthread_cond_signal(&DATA(state)->space_avail);
[f258932]248                        newbuffer = (newbuffer+1) % max_buffers;
[c66068d]249                        DATA(state)->offset = 0;
250                }
251
[cf30639]252                pthread_mutex_unlock(&DATA(state)->mutex);
[c66068d]253
254                DATA(state)->in_buffer = newbuffer;
255        }
256        return copied;
257}
258
259static void thread_close(io_t *io)
260{
[cf30639]261        pthread_mutex_lock(&DATA(io)->mutex);
[c66068d]262        DATA(io)->closing = true;
[cf30639]263        pthread_cond_signal(&DATA(io)->space_avail);
264        pthread_mutex_unlock(&DATA(io)->mutex);
[417a136]265
266        /* Wait for the thread to exit */
[398fa86]267        pthread_join(DATA(io)->producer, NULL);
[954577b9]268       
269        pthread_mutex_destroy(&DATA(io)->mutex);
270        pthread_cond_destroy(&DATA(io)->space_avail);
271        pthread_cond_destroy(&DATA(io)->data_ready);
272       
[2cf2c2d]273        free(DATA(io)->buffer);
[cf30639]274        free(DATA(io));
[c66068d]275        free(io);
276}
277
278io_source_t thread_source = {
279        "thread",
280        thread_read,
281        NULL,   /* peek */
282        NULL,   /* tell */
283        NULL,   /* seek */
284        thread_close
285};
Note: See TracBrowser for help on using the repository browser.