source: libwandio/iow-thread.c @ 10f924c

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivegetfragoffhelplibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 10f924c was 10f924c, checked in by Shane Alcock <salcock@…>, 6 years ago

Fixed broken -fvisibility check in configure

  • Added a m4 script that does this properly rather than our hax gcc version check.
  • Tidied up CFLAGS in configure so we aren't adding the same sets of flags multiple times
  • Created a wandio_internal.h file for storing global variables that shouldn't be made public

Thanks to Alistair King, whose patch to try and make this work for
non-gcc systems brought my attention to just how broken this was :)

  • Property mode set to 100644
File size: 7.0 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Authors: Daniel Lawson
8 *          Perry Lorier
9 *          Shane Alcock
10 *         
11 * All rights reserved.
12 *
13 * This code has been developed by the University of Waikato WAND
14 * research group. For further information please see http://www.wand.net.nz/
15 *
16 * libtrace is free software; you can redistribute it and/or modify
17 * it under the terms of the GNU General Public License as published by
18 * the Free Software Foundation; either version 2 of the License, or
19 * (at your option) any later version.
20 *
21 * libtrace is distributed in the hope that it will be useful,
22 * but WITHOUT ANY WARRANTY; without even the implied warranty of
23 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24 * GNU General Public License for more details.
25 *
26 * You should have received a copy of the GNU General Public License
27 * along with libtrace; if not, write to the Free Software
28 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
29 *
30 * $Id$
31 *
32 */
33
34#include "config.h"
35#include "wandio.h"
36#include "wandio_internal.h"
37#include <sys/types.h>
38#include <sys/stat.h>
39#include <fcntl.h>
40#include <stdlib.h>
41#include <pthread.h>
42#include <string.h>
43#include <stdbool.h>
44#ifdef HAVE_SYS_PRCTL_H
45#include <sys/prctl.h>
46#endif
47
48/* Libtrace IO module implementing a threaded writer.
49 *
50 * This module enables another IO writer, called the "child", to perform its
51 * writing using a separate thread. The main thread writes data into a series
52 * of 1MB buffers. Meanwhile, the writing thread writes out of these buffers
53 * using the callback for the child reader. pthread conditions are used to
54 * communicate between the two threads, e.g. when there are buffers available
55 * for the main thread to copy data into or when there is data available for
56 * the write thread to write.
57 */
58
59/* 1MB Buffer */
60#define BUFFERSIZE (1024*1024)
61#define BUFFERS 5
62
63extern iow_source_t thread_wsource;
64
65/* This structure defines a single buffer or "slice" */
66struct buffer_t {
67        char buffer[BUFFERSIZE];        /* The buffer itself */
68        int len;                        /* The size of the buffer */
69        enum { EMPTY = 0, FULL = 1 } state;     /* Is the buffer in use? */
70};
71
72struct state_t {
73        /* The collection of buffers (or slices) */
74        struct buffer_t buffer[BUFFERS];
75        /* The write offset into the current buffer */
76        off_t offset;
77        /* The writing thread */
78        pthread_t consumer;
79        /* The child writer */
80        iow_t *iow;
81        /* Indicates that there is data in one of the buffers */
82        pthread_cond_t data_ready;
83        /* Indicates that there is a free buffer to write into */
84        pthread_cond_t space_avail;
85        /* The mutex for the write buffers */
86        pthread_mutex_t mutex;
87        /* The index of the buffer to write into next */
88        int out_buffer;
89        /* Indicates whether the main thread is concluding */
90        bool closing;
91};
92
93#define DATA(x) ((struct state_t *)((x)->data))
94#define OUTBUFFER(x) (DATA(x)->buffer[DATA(x)->out_buffer])
95#define min(a,b) ((a)<(b) ? (a) : (b))
96
97/* The writing thread */
98static void *thread_consumer(void *userdata)
99{
100        int buffer=0;
101        bool running = true;
102        iow_t *state = (iow_t *) userdata;
103
104#ifdef PR_SET_NAME
105        char namebuf[17];
106        if (prctl(PR_GET_NAME, namebuf, 0,0,0) == 0) {
107                namebuf[16] = '\0'; /* Make sure it's NUL terminated */
108                /* If the filename is too long, overwrite the last few bytes */
109                if (strlen(namebuf)>9) {
110                        strcpy(namebuf+10,"[iow]");
111                }
112                else {
113                        strncat(namebuf," [iow]",16);
114                }
115                prctl(PR_SET_NAME, namebuf, 0,0,0);
116        }
117#endif
118
119        pthread_mutex_lock(&DATA(state)->mutex);
120        do {
121                /* Wait for data that we can write */
122                while (DATA(state)->buffer[buffer].state == EMPTY) {
123                        /* Unless, of course, the program is over! */
124                        if (DATA(state)->closing)
125                                break;
126                        pthread_cond_wait(&DATA(state)->data_ready,
127                                        &DATA(state)->mutex);
128                }
129               
130                /* Empty the buffer using the child writer */
131                pthread_mutex_unlock(&DATA(state)->mutex);
132                wandio_wwrite(
133                                DATA(state)->iow,
134                                DATA(state)->buffer[buffer].buffer,
135                                DATA(state)->buffer[buffer].len);
136                pthread_mutex_lock(&DATA(state)->mutex);
137
138                /* If we've not reached the end of the file keep going */
139                running = ( DATA(state)->buffer[buffer].len > 0 );
140                DATA(state)->buffer[buffer].len = 0;
141                DATA(state)->buffer[buffer].state = EMPTY;
142
143                /* Signal that we've freed up another buffer for the main
144                 * thread to copy data into */
145                pthread_cond_signal(&DATA(state)->space_avail);
146
147
148                /* Move on to the next buffer */
149                buffer=(buffer+1) % BUFFERS;
150
151        } while(running);
152
153        /* If we reach here, it's all over so start tidying up */
154        wandio_wdestroy(DATA(state)->iow);
155
156        pthread_mutex_unlock(&DATA(state)->mutex);
157        return NULL;
158}
159
160iow_t *thread_wopen(iow_t *child)
161{
162        iow_t *state;
163
164        if (!child) {
165                return NULL;
166        }
167       
168
169        state = malloc(sizeof(iow_t));
170        state->data = calloc(1,sizeof(struct state_t));
171        state->source = &thread_wsource;
172
173        DATA(state)->out_buffer = 0;
174        DATA(state)->offset = 0;
175        pthread_mutex_init(&DATA(state)->mutex,NULL);
176        pthread_cond_init(&DATA(state)->data_ready,NULL);
177        pthread_cond_init(&DATA(state)->space_avail,NULL);
178
179        DATA(state)->iow = child;
180        DATA(state)->closing = false;
181
182        /* Start the writer thread */
183        pthread_create(&DATA(state)->consumer,NULL,thread_consumer,state);
184
185        return state;
186}
187
188static off_t thread_wwrite(iow_t *state, const char *buffer, off_t len)
189{
190        int slice;
191        int copied=0;
192        int newbuffer;
193
194        pthread_mutex_lock(&DATA(state)->mutex);
195        while(len>0) {
196
197                /* Wait for there to be space available for us to write into */
198                while (OUTBUFFER(state).state == FULL) {
199                        write_waits++;
200                        pthread_cond_wait(&DATA(state)->space_avail,
201                                        &DATA(state)->mutex);
202                }
203
204                /* Copy out of our main buffer into the next available slice */
205                slice=min( 
206                        (off_t)sizeof(OUTBUFFER(state).buffer)-DATA(state)->offset,
207                        len);
208                               
209                pthread_mutex_unlock(&DATA(state)->mutex);
210                memcpy(
211                        OUTBUFFER(state).buffer+DATA(state)->offset,
212                        buffer,
213                        slice
214                        );
215                pthread_mutex_lock(&DATA(state)->mutex);
216
217                DATA(state)->offset += slice;
218                OUTBUFFER(state).len += slice;
219
220                buffer += slice;
221                len -= slice;
222                copied += slice;
223                newbuffer = DATA(state)->out_buffer;
224
225                /* If we've filled a buffer, move on to the next one and
226                 * signal to the write thread that there is something for it
227                 * to do */
228                if (DATA(state)->offset >= (off_t)sizeof(OUTBUFFER(state).buffer)) {
229                        OUTBUFFER(state).state = FULL;
230                        pthread_cond_signal(&DATA(state)->data_ready);
231                        DATA(state)->offset = 0;
232                        newbuffer = (newbuffer+1) % BUFFERS;
233                }
234
235                DATA(state)->out_buffer = newbuffer;
236        }
237
238        pthread_mutex_unlock(&DATA(state)->mutex);
239        return copied;
240}
241
242static void thread_wclose(iow_t *iow)
243{
244        pthread_mutex_lock(&DATA(iow)->mutex);
245        DATA(iow)->closing = true;
246        pthread_cond_signal(&DATA(iow)->data_ready);
247        pthread_mutex_unlock(&DATA(iow)->mutex);
248        pthread_join(DATA(iow)->consumer,NULL);
249       
250        pthread_mutex_destroy(&DATA(iow)->mutex);
251        pthread_cond_destroy(&DATA(iow)->data_ready);
252        pthread_cond_destroy(&DATA(iow)->space_avail);
253       
254        free(iow->data);
255        free(iow);
256}
257
258iow_source_t thread_wsource = {
259        "threadw",
260        thread_wwrite,
261        thread_wclose
262};
Note: See TracBrowser for help on using the repository browser.