source: lib/ior-thread.c @ 15e9390

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivegetfragoffhelplibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 15e9390 was 15e9390, checked in by Perry Lorier <perry@…>, 11 years ago

Rearrange struct fields to get better packing on 64bit machines.

  • Property mode set to 100644
File size: 3.8 KB
Line 
1#include "wandio.h"
2#include <sys/types.h>
3#include <sys/stat.h>
4#include <fcntl.h>
5#include <stdlib.h>
6#include <pthread.h>
7#include <string.h>
8#include <stdbool.h>
9
10/* 1MB Buffer */
11#define BUFFERSIZE (1024*1024)
12#define BUFFERS 100
13
14extern io_source_t thread_source;
15
16struct buffer_t {
17        char buffer[BUFFERSIZE];
18        int len;
19        enum { EMPTY = 0, FULL = 1 } state;
20};
21
22struct state_t {
23        struct buffer_t buffer[BUFFERS];
24        int in_buffer;
25        int offset;
26        pthread_t producer;
27        pthread_cond_t space_avail;
28        pthread_cond_t data_ready;
29        pthread_mutex_t mutex;
30        io_t *io;
31        bool closing;
32};
33
34#define DATA(x) ((struct state_t *)((x)->data))
35#define INBUFFER(x) (DATA(x)->buffer[DATA(x)->in_buffer])
36#define min(a,b) ((a)<(b) ? (a) : (b))
37
38static void *thread_producer(void* userdata)
39{
40        io_t *state = (io_t*) userdata;
41        int buffer=0;
42        bool running = true;
43
44        pthread_mutex_lock(&DATA(state)->mutex);
45        do {
46                while (DATA(state)->buffer[buffer].state == FULL) {
47                        if (DATA(state)->closing)
48                                break;
49                        pthread_cond_wait(&DATA(state)->space_avail, &DATA(state)->mutex);
50                }
51
52                if (DATA(state)->closing) {
53                        break;
54                }
55                pthread_mutex_unlock(&DATA(state)->mutex);
56
57                /* Fill the buffer */
58                DATA(state)->buffer[buffer].len=wandio_read(
59                                DATA(state)->io,
60                                DATA(state)->buffer[buffer].buffer,
61                                sizeof(DATA(state)->buffer[buffer].buffer));
62
63                pthread_mutex_lock(&DATA(state)->mutex);
64
65                DATA(state)->buffer[buffer].state = FULL;
66
67                /* if we've not reached the end of the file keep going */
68                running = (DATA(state)->buffer[buffer].len > 0 );
69
70                pthread_cond_signal(&DATA(state)->data_ready);
71
72                /* Flip buffers */
73                buffer=(buffer+1) % BUFFERS;
74
75        } while(running);
76
77
78        wandio_destroy(DATA(state)->io);
79
80        pthread_cond_signal(&DATA(state)->data_ready);
81        pthread_mutex_unlock(&DATA(state)->mutex);
82
83        return NULL;
84}
85
86io_t *thread_open(io_t *parent)
87{
88        io_t *state;
89
90        if (!parent) {
91                return NULL;
92        }
93       
94
95        state = malloc(sizeof(io_t));
96        state->data = calloc(1,sizeof(struct state_t));
97        state->source = &thread_source;
98
99        DATA(state)->in_buffer = 0;
100        DATA(state)->offset = 0;
101        pthread_mutex_init(&DATA(state)->mutex,NULL);
102        pthread_cond_init(&DATA(state)->data_ready,NULL);
103        pthread_cond_init(&DATA(state)->space_avail,NULL);
104
105        DATA(state)->io = parent;
106        DATA(state)->closing = false;
107
108        pthread_create(&DATA(state)->producer,NULL,thread_producer,state);
109
110        return state;
111}
112
113static off_t thread_read(io_t *state, void *buffer, off_t len)
114{
115        int slice;
116        int copied=0;
117        int newbuffer;
118
119        while(len>0) {
120                pthread_mutex_lock(&DATA(state)->mutex);
121                while (INBUFFER(state).state == EMPTY) {
122                        pthread_cond_wait(&DATA(state)->data_ready, &DATA(state)->mutex);
123
124                }
125
126                if (INBUFFER(state).len <1) {
127
128                        if (copied<1)
129                                copied = INBUFFER(state).len;
130
131                        pthread_mutex_unlock(&DATA(state)->mutex);
132                        return copied;
133                }
134
135                slice=min( INBUFFER(state).len-DATA(state)->offset,len);
136
137                pthread_mutex_unlock(&DATA(state)->mutex);
138                               
139                memcpy(
140                        buffer,
141                        INBUFFER(state).buffer+DATA(state)->offset,
142                        slice
143                        );
144
145                buffer+=slice;
146                len-=slice;
147                copied+=slice;
148
149                pthread_mutex_lock(&DATA(state)->mutex);
150                DATA(state)->offset+=slice;
151                newbuffer = DATA(state)->in_buffer;
152
153                if (DATA(state)->offset >= INBUFFER(state).len) {
154                        INBUFFER(state).state = EMPTY;
155                        pthread_cond_signal(&DATA(state)->space_avail);
156                        newbuffer = (newbuffer+1) % BUFFERS;
157                        DATA(state)->offset = 0;
158                }
159
160                pthread_mutex_unlock(&DATA(state)->mutex);
161
162                DATA(state)->in_buffer = newbuffer;
163        }
164        return copied;
165}
166
167static void thread_close(io_t *io)
168{
169        pthread_mutex_lock(&DATA(io)->mutex);
170        DATA(io)->closing = true;
171        pthread_cond_signal(&DATA(io)->space_avail);
172        pthread_mutex_unlock(&DATA(io)->mutex);
173
174        /* Wait for the thread to exit */
175        pthread_join(DATA(io)->producer, NULL);
176        free(DATA(io));
177        free(io);
178}
179
180io_source_t thread_source = {
181        "thread",
182        thread_read,
183        NULL,   /* peek */
184        NULL,   /* tell */
185        NULL,   /* seek */
186        thread_close
187};
Note: See TracBrowser for help on using the repository browser.