source: lib/ior-thread.c @ 7f2612c

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivegetfragoffhelplibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 7f2612c was 7f2612c, checked in by Shane Alcock <salcock@…>, 12 years ago
  • Can't use errno without errno.h!
  • Property mode set to 100644
File size: 3.9 KB
Line 
1#include "wandio.h"
2#include <sys/types.h>
3#include <sys/stat.h>
4#include <fcntl.h>
5#include <stdlib.h>
6#include <pthread.h>
7#include <string.h>
8#include <stdbool.h>
9#include <errno.h>
10
11/* 1MB Buffer */
12#define BUFFERSIZE (1024*1024)
13#define BUFFERS 100
14
15extern io_source_t thread_source;
16
17struct buffer_t {
18        char buffer[BUFFERSIZE];
19        int len;
20        enum { EMPTY = 0, FULL = 1 } state;
21};
22
23struct state_t {
24        struct buffer_t buffer[BUFFERS];
25        int in_buffer;
26        int offset;
27        pthread_t producer;
28        pthread_cond_t space_avail;
29        pthread_cond_t data_ready;
30        pthread_mutex_t mutex;
31        io_t *io;
32        bool closing;
33};
34
35#define DATA(x) ((struct state_t *)((x)->data))
36#define INBUFFER(x) (DATA(x)->buffer[DATA(x)->in_buffer])
37#define min(a,b) ((a)<(b) ? (a) : (b))
38
39static void *thread_producer(void* userdata)
40{
41        io_t *state = (io_t*) userdata;
42        int buffer=0;
43        bool running = true;
44
45        pthread_mutex_lock(&DATA(state)->mutex);
46        do {
47                while (DATA(state)->buffer[buffer].state == FULL) {
48                        if (DATA(state)->closing)
49                                break;
50                        pthread_cond_wait(&DATA(state)->space_avail, &DATA(state)->mutex);
51                }
52
53                if (DATA(state)->closing) {
54                        break;
55                }
56                pthread_mutex_unlock(&DATA(state)->mutex);
57
58                /* Fill the buffer */
59                DATA(state)->buffer[buffer].len=wandio_read(
60                                DATA(state)->io,
61                                DATA(state)->buffer[buffer].buffer,
62                                sizeof(DATA(state)->buffer[buffer].buffer));
63
64                pthread_mutex_lock(&DATA(state)->mutex);
65
66                DATA(state)->buffer[buffer].state = FULL;
67
68                /* if we've not reached the end of the file keep going */
69                running = (DATA(state)->buffer[buffer].len > 0 );
70
71                pthread_cond_signal(&DATA(state)->data_ready);
72
73                /* Flip buffers */
74                buffer=(buffer+1) % BUFFERS;
75
76        } while(running);
77
78
79        wandio_destroy(DATA(state)->io);
80
81        pthread_cond_signal(&DATA(state)->data_ready);
82        pthread_mutex_unlock(&DATA(state)->mutex);
83
84        return NULL;
85}
86
87io_t *thread_open(io_t *parent)
88{
89        io_t *state;
90
91        if (!parent) {
92                return NULL;
93        }
94       
95
96        state = malloc(sizeof(io_t));
97        state->data = calloc(1,sizeof(struct state_t));
98        state->source = &thread_source;
99
100        DATA(state)->in_buffer = 0;
101        DATA(state)->offset = 0;
102        pthread_mutex_init(&DATA(state)->mutex,NULL);
103        pthread_cond_init(&DATA(state)->data_ready,NULL);
104        pthread_cond_init(&DATA(state)->space_avail,NULL);
105
106        DATA(state)->io = parent;
107        DATA(state)->closing = false;
108
109        pthread_create(&DATA(state)->producer,NULL,thread_producer,state);
110
111        return state;
112}
113
114static off_t thread_read(io_t *state, void *buffer, off_t len)
115{
116        int slice;
117        int copied=0;
118        int newbuffer;
119
120        while(len>0) {
121                pthread_mutex_lock(&DATA(state)->mutex);
122                while (INBUFFER(state).state == EMPTY) {
123                        pthread_cond_wait(&DATA(state)->data_ready, &DATA(state)->mutex);
124
125                }
126
127                if (INBUFFER(state).len <1) {
128
129                        if (copied<1) {
130                                errno=EIO; /* FIXME: Preserve the errno from the other thread */
131                                copied = INBUFFER(state).len;
132                        }
133
134                        pthread_mutex_unlock(&DATA(state)->mutex);
135                        return copied;
136                }
137
138                slice=min( INBUFFER(state).len-DATA(state)->offset,len);
139
140                pthread_mutex_unlock(&DATA(state)->mutex);
141                               
142                memcpy(
143                        buffer,
144                        INBUFFER(state).buffer+DATA(state)->offset,
145                        slice
146                        );
147
148                buffer+=slice;
149                len-=slice;
150                copied+=slice;
151
152                pthread_mutex_lock(&DATA(state)->mutex);
153                DATA(state)->offset+=slice;
154                newbuffer = DATA(state)->in_buffer;
155
156                if (DATA(state)->offset >= INBUFFER(state).len) {
157                        INBUFFER(state).state = EMPTY;
158                        pthread_cond_signal(&DATA(state)->space_avail);
159                        newbuffer = (newbuffer+1) % BUFFERS;
160                        DATA(state)->offset = 0;
161                }
162
163                pthread_mutex_unlock(&DATA(state)->mutex);
164
165                DATA(state)->in_buffer = newbuffer;
166        }
167        return copied;
168}
169
170static void thread_close(io_t *io)
171{
172        pthread_mutex_lock(&DATA(io)->mutex);
173        DATA(io)->closing = true;
174        pthread_cond_signal(&DATA(io)->space_avail);
175        pthread_mutex_unlock(&DATA(io)->mutex);
176
177        /* Wait for the thread to exit */
178        pthread_join(DATA(io)->producer, NULL);
179        free(DATA(io));
180        free(io);
181}
182
183io_source_t thread_source = {
184        "thread",
185        thread_read,
186        NULL,   /* peek */
187        NULL,   /* tell */
188        NULL,   /* seek */
189        thread_close
190};
Note: See TracBrowser for help on using the repository browser.