source: lib/ior-thread.c @ c66068d

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivegetfragoffhelplibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since c66068d was c66068d, checked in by Perry Lorier <perry@…>, 13 years ago

Rewrite the libtrace io subsystem to use the new wandio abstraction layer.

  • Property mode set to 100644
File size: 3.6 KB
Line 
1#include "wandio.h"
2#include <sys/types.h>
3#include <sys/stat.h>
4#include <fcntl.h>
5#include <stdlib.h>
6#include <pthread.h>
7#include <string.h>
8#include <stdbool.h>
9
10/* 1MB Buffer */
11#define BUFFERSIZE (1024*1024)
12#define BUFFERS 100
13
14extern io_source_t thread_source;
15
16struct buffer_t {
17        char buffer[BUFFERSIZE];
18        int len;
19        enum { EMPTY = 0, FULL = 1 } state;
20        pthread_cond_t dataready;
21        pthread_cond_t spaceavail;
22        pthread_mutex_t mutex;
23};
24
25struct state_t {
26        struct buffer_t buffer[BUFFERS];
27        int in_buffer;
28        int offset;
29        pthread_t producer;
30        bool closing;
31        io_t *io;
32};
33
34#define DATA(x) ((struct state_t *)((x)->data))
35#define INBUFFER(x) (DATA(x)->buffer[DATA(x)->in_buffer])
36#define min(a,b) ((a)<(b) ? (a) : (b))
37
38static void *thread_producer(void* userdata)
39{
40        io_t *state = (io_t*) userdata;
41        int buffer=0;
42        bool running = true;
43
44        do {
45                pthread_mutex_lock(&DATA(state)->buffer[buffer].mutex);
46                while (DATA(state)->buffer[buffer].state == FULL) {
47                        if (DATA(state)->closing)
48                                break;
49                        pthread_cond_wait(&DATA(state)->buffer[buffer].spaceavail,
50                                        &DATA(state)->buffer[buffer].mutex);
51                }
52                /* Fill the buffer */
53                DATA(state)->buffer[buffer].len=wandio_read(
54                                DATA(state)->io,
55                                DATA(state)->buffer[buffer].buffer,
56                                sizeof(DATA(state)->buffer[buffer].buffer));
57
58                DATA(state)->buffer[buffer].state = FULL;
59
60                /* if we've not reached the end of the file keep going */
61                running = (DATA(state)->buffer[buffer].len > 0 );
62
63                pthread_cond_signal(&DATA(state)->buffer[buffer].dataready);
64
65                pthread_mutex_unlock(&DATA(state)->buffer[buffer].mutex);
66
67                /* Flip buffers */
68                buffer=(buffer+1) % BUFFERS;
69
70        } while(running);
71
72        wandio_destroy(DATA(state)->io);
73
74        return NULL;
75}
76
77io_t *thread_open(io_t *parent)
78{
79        io_t *state;
80
81        if (!parent) {
82                return NULL;
83        }
84       
85
86        state = malloc(sizeof(io_t));
87        state->data = calloc(1,sizeof(struct state_t));
88        state->source = &thread_source;
89
90        DATA(state)->in_buffer = 0;
91        DATA(state)->offset = 0;
92        pthread_mutex_init(&DATA(state)->buffer[0].mutex,NULL);
93        pthread_cond_init(&DATA(state)->buffer[0].dataready,NULL);
94        pthread_cond_init(&DATA(state)->buffer[0].spaceavail,NULL);
95
96        DATA(state)->io = parent;
97        DATA(state)->closing = false;
98
99        pthread_create(&DATA(state)->producer,NULL,thread_producer,state);
100
101        return state;
102}
103
104static off_t thread_read(io_t *state, void *buffer, off_t len)
105{
106        int slice;
107        int copied=0;
108        int newbuffer;
109
110        while(len>0) {
111                pthread_mutex_lock(&INBUFFER(state).mutex);
112                while (INBUFFER(state).state == EMPTY) {
113                        pthread_cond_wait(&INBUFFER(state).dataready,
114                                        &INBUFFER(state).mutex);
115
116                }
117
118                if (INBUFFER(state).len <1) {
119
120                        if (copied<1)
121                                copied = INBUFFER(state).len;
122
123                        pthread_mutex_unlock(&INBUFFER(state).mutex);
124                        return copied;
125                }
126
127                slice=min( INBUFFER(state).len-DATA(state)->offset,len);
128                               
129                memcpy(
130                        buffer,
131                        INBUFFER(state).buffer+DATA(state)->offset,
132                        slice
133                        );
134
135                buffer+=slice;
136                len-=slice;
137                copied+=slice;
138                DATA(state)->offset+=slice;
139                newbuffer = DATA(state)->in_buffer;
140
141                if (DATA(state)->offset >= INBUFFER(state).len) {
142                        INBUFFER(state).state = EMPTY;
143                        pthread_cond_signal(&INBUFFER(state).spaceavail);
144                        newbuffer = (newbuffer+1) % BUFFERS;
145                        DATA(state)->offset = 0;
146                }
147
148                pthread_mutex_unlock(&INBUFFER(state).mutex);
149
150                DATA(state)->in_buffer = newbuffer;
151        }
152        return copied;
153}
154
155static void thread_close(io_t *io)
156{
157        pthread_mutex_lock(&INBUFFER(io).mutex);
158        DATA(io)->closing = true;
159        pthread_cond_signal(&INBUFFER(io).spaceavail);
160        pthread_mutex_unlock(&INBUFFER(io).mutex);
161        free(io);
162}
163
164io_source_t thread_source = {
165        "thread",
166        thread_read,
167        NULL,   /* peek */
168        NULL,   /* tell */
169        NULL,   /* seek */
170        thread_close
171};
Note: See TracBrowser for help on using the repository browser.