source: lib/ior-thread.c @ cf30639

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivegetfragoffhelplibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since cf30639 was cf30639, checked in by Perry Lorier <perry@…>, 13 years ago

Threading cleanups (and some memory leaks addressed)

  • Property mode set to 100644
File size: 3.9 KB
Line 
1#include "wandio.h"
2#include <sys/types.h>
3#include <sys/stat.h>
4#include <fcntl.h>
5#include <stdlib.h>
6#include <pthread.h>
7#include <string.h>
8#include <stdbool.h>
9
10/* 1MB Buffer */
11#define BUFFERSIZE (1024*1024)
12#define BUFFERS 100
13
14extern io_source_t thread_source;
15
16struct buffer_t {
17        char buffer[BUFFERSIZE];
18        int len;
19        enum { EMPTY = 0, FULL = 1 } state;
20};
21
22struct state_t {
23        struct buffer_t buffer[BUFFERS];
24        int in_buffer;
25        int offset;
26        pthread_t producer;
27        pthread_cond_t space_avail;
28        pthread_cond_t data_ready;
29        pthread_mutex_t mutex;
30        bool closing;
31        bool closed;
32        io_t *io;
33};
34
35#define DATA(x) ((struct state_t *)((x)->data))
36#define INBUFFER(x) (DATA(x)->buffer[DATA(x)->in_buffer])
37#define min(a,b) ((a)<(b) ? (a) : (b))
38
39static void *thread_producer(void* userdata)
40{
41        io_t *state = (io_t*) userdata;
42        int buffer=0;
43        bool running = true;
44
45        pthread_mutex_lock(&DATA(state)->mutex);
46        do {
47                while (DATA(state)->buffer[buffer].state == FULL) {
48                        if (DATA(state)->closing)
49                                break;
50                        pthread_cond_wait(&DATA(state)->space_avail, &DATA(state)->mutex);
51                }
52
53                if (DATA(state)->closing) {
54                        break;
55                }
56                pthread_mutex_unlock(&DATA(state)->mutex);
57
58                /* Fill the buffer */
59                DATA(state)->buffer[buffer].len=wandio_read(
60                                DATA(state)->io,
61                                DATA(state)->buffer[buffer].buffer,
62                                sizeof(DATA(state)->buffer[buffer].buffer));
63
64                pthread_mutex_lock(&DATA(state)->mutex);
65
66                DATA(state)->buffer[buffer].state = FULL;
67
68                /* if we've not reached the end of the file keep going */
69                running = (DATA(state)->buffer[buffer].len > 0 );
70
71                pthread_cond_signal(&DATA(state)->data_ready);
72
73                /* Flip buffers */
74                buffer=(buffer+1) % BUFFERS;
75
76        } while(running);
77
78
79        wandio_destroy(DATA(state)->io);
80
81        DATA(state)->closed = true;
82        pthread_cond_signal(&DATA(state)->data_ready);
83        pthread_mutex_unlock(&DATA(state)->mutex);
84
85        return NULL;
86}
87
88io_t *thread_open(io_t *parent)
89{
90        io_t *state;
91
92        if (!parent) {
93                return NULL;
94        }
95       
96
97        state = malloc(sizeof(io_t));
98        state->data = calloc(1,sizeof(struct state_t));
99        state->source = &thread_source;
100
101        DATA(state)->in_buffer = 0;
102        DATA(state)->offset = 0;
103        pthread_mutex_init(&DATA(state)->mutex,NULL);
104        pthread_cond_init(&DATA(state)->data_ready,NULL);
105        pthread_cond_init(&DATA(state)->space_avail,NULL);
106
107        DATA(state)->io = parent;
108        DATA(state)->closing = false;
109        DATA(state)->closed = false;
110
111        pthread_create(&DATA(state)->producer,NULL,thread_producer,state);
112
113        return state;
114}
115
116static off_t thread_read(io_t *state, void *buffer, off_t len)
117{
118        int slice;
119        int copied=0;
120        int newbuffer;
121
122        while(len>0) {
123                pthread_mutex_lock(&DATA(state)->mutex);
124                while (INBUFFER(state).state == EMPTY) {
125                        pthread_cond_wait(&DATA(state)->data_ready, &DATA(state)->mutex);
126
127                }
128
129                if (INBUFFER(state).len <1) {
130
131                        if (copied<1)
132                                copied = INBUFFER(state).len;
133
134                        pthread_mutex_unlock(&DATA(state)->mutex);
135                        return copied;
136                }
137
138                slice=min( INBUFFER(state).len-DATA(state)->offset,len);
139
140                pthread_mutex_unlock(&DATA(state)->mutex);
141                               
142                memcpy(
143                        buffer,
144                        INBUFFER(state).buffer+DATA(state)->offset,
145                        slice
146                        );
147
148                buffer+=slice;
149                len-=slice;
150                copied+=slice;
151
152                pthread_mutex_lock(&DATA(state)->mutex);
153                DATA(state)->offset+=slice;
154                newbuffer = DATA(state)->in_buffer;
155
156                if (DATA(state)->offset >= INBUFFER(state).len) {
157                        INBUFFER(state).state = EMPTY;
158                        pthread_cond_signal(&DATA(state)->space_avail);
159                        newbuffer = (newbuffer+1) % BUFFERS;
160                        DATA(state)->offset = 0;
161                }
162
163                pthread_mutex_unlock(&DATA(state)->mutex);
164
165                DATA(state)->in_buffer = newbuffer;
166        }
167        return copied;
168}
169
170static void thread_close(io_t *io)
171{
172        pthread_mutex_lock(&DATA(io)->mutex);
173        DATA(io)->closing = true;
174        pthread_cond_signal(&DATA(io)->space_avail);
175
176        /* Wait until the producer thread dies */
177        while (!DATA(io)->closed) {
178                pthread_cond_wait(&DATA(io)->data_ready, &DATA(io)->mutex);
179        }
180        pthread_mutex_unlock(&DATA(io)->mutex);
181        free(DATA(io));
182        free(io);
183}
184
185io_source_t thread_source = {
186        "thread",
187        thread_read,
188        NULL,   /* peek */
189        NULL,   /* tell */
190        NULL,   /* seek */
191        thread_close
192};
Note: See TracBrowser for help on using the repository browser.