source: lib/iow-thread.c @ 417a136

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivegetfragoffhelplibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 417a136 was 417a136, checked in by Perry Lorier <perry@…>, 13 years ago

Use pthread_join() to tell when we've finished cleaning up

  • Property mode set to 100644
File size: 3.7 KB
Line 
1#include "wandio.h"
2#include <sys/types.h>
3#include <sys/stat.h>
4#include <fcntl.h>
5#include <stdlib.h>
6#include <pthread.h>
7#include <string.h>
8#include <stdbool.h>
9
10/* 1MB Buffer */
11#define BUFFERSIZE (1024*1024)
12#define BUFFERS 100
13
14extern iow_source_t thread_wsource;
15
16struct buffer_t {
17        char buffer[BUFFERSIZE];
18        int len;
19        enum { EMPTY = 0, FULL = 1 } state;
20};
21
22struct state_t {
23        struct buffer_t buffer[BUFFERS];
24        int out_buffer;
25        off_t offset;
26        pthread_t consumer;
27        bool closing;
28        iow_t *iow;
29        pthread_cond_t data_ready;
30        pthread_cond_t space_avail;
31        pthread_mutex_t mutex;
32};
33
34#define DATA(x) ((struct state_t *)((x)->data))
35#define OUTBUFFER(x) (DATA(x)->buffer[DATA(x)->out_buffer])
36#define min(a,b) ((a)<(b) ? (a) : (b))
37
38static void *thread_consumer(void *userdata)
39{
40        int buffer=0;
41        bool running = true;
42        iow_t *state = (iow_t *) userdata;
43
44        pthread_mutex_lock(&DATA(state)->mutex);
45        do {
46                while (DATA(state)->buffer[buffer].state == EMPTY) {
47                        if (DATA(state)->closing)
48                                break;
49                        pthread_cond_wait(&DATA(state)->data_ready,
50                                        &DATA(state)->mutex);
51                }
52                /* Empty the buffer */
53
54                if (DATA(state)->closing)
55                        break;
56
57                pthread_mutex_unlock(&DATA(state)->mutex);
58                wandio_wwrite(
59                                DATA(state)->iow,
60                                DATA(state)->buffer[buffer].buffer,
61                                DATA(state)->buffer[buffer].len);
62                pthread_mutex_lock(&DATA(state)->mutex);
63
64                /* if we've not reached the end of the file keep going */
65                running = ( DATA(state)->buffer[buffer].len > 0 );
66                DATA(state)->buffer[buffer].len = 0;
67                DATA(state)->buffer[buffer].state = EMPTY;
68
69                pthread_cond_signal(&DATA(state)->space_avail);
70
71
72                /* Flip buffers */
73                buffer=(buffer+1) % BUFFERS;
74
75        } while(running);
76
77        fprintf(stderr,"Write thread leaving\n");
78
79        wandio_wdestroy(DATA(state)->iow);
80
81        pthread_mutex_unlock(&DATA(state)->mutex);
82        return NULL;
83}
84
85iow_t *thread_wopen(iow_t *child)
86{
87        iow_t *state;
88
89        if (!child) {
90                return NULL;
91        }
92       
93
94        state = malloc(sizeof(iow_t));
95        state->data = calloc(1,sizeof(struct state_t));
96        state->source = &thread_wsource;
97
98        DATA(state)->out_buffer = 0;
99        DATA(state)->offset = 0;
100        pthread_mutex_init(&DATA(state)->mutex,NULL);
101        pthread_cond_init(&DATA(state)->data_ready,NULL);
102        pthread_cond_init(&DATA(state)->space_avail,NULL);
103
104        DATA(state)->iow = child;
105        DATA(state)->closing = false;
106
107        pthread_create(&DATA(state)->consumer,NULL,thread_consumer,state);
108
109        return state;
110}
111
112static off_t thread_wwrite(iow_t *state, const char *buffer, off_t len)
113{
114        int slice;
115        int copied=0;
116        int newbuffer;
117
118        pthread_mutex_lock(&DATA(state)->mutex);
119        while(len>0) {
120                while (OUTBUFFER(state).state == FULL) {
121                        pthread_cond_wait(&DATA(state)->space_avail,
122                                        &DATA(state)->mutex);
123                }
124
125                slice=min( 
126                        (off_t)sizeof(OUTBUFFER(state).buffer)-DATA(state)->offset,
127                        len);
128                               
129                pthread_mutex_unlock(&DATA(state)->mutex);
130                memcpy(
131                        OUTBUFFER(state).buffer+DATA(state)->offset,
132                        buffer,
133                        slice
134                        );
135                pthread_mutex_lock(&DATA(state)->mutex);
136
137                DATA(state)->offset += slice;
138                OUTBUFFER(state).len += slice;
139
140                buffer += slice;
141                len -= slice;
142                copied += slice;
143                newbuffer = DATA(state)->out_buffer;
144
145                if (DATA(state)->offset >= (off_t)sizeof(OUTBUFFER(state).buffer)) {
146                        OUTBUFFER(state).state = FULL;
147                        pthread_cond_signal(&DATA(state)->data_ready);
148                        DATA(state)->offset = 0;
149                        newbuffer = (newbuffer+1) % BUFFERS;
150                }
151
152                DATA(state)->out_buffer = newbuffer;
153        }
154
155        pthread_mutex_unlock(&DATA(state)->mutex);
156        return copied;
157}
158
159static void thread_wclose(iow_t *iow)
160{
161        pthread_mutex_lock(&DATA(iow)->mutex);
162        DATA(iow)->closing = true;
163        pthread_cond_signal(&DATA(iow)->data_ready);
164        pthread_mutex_unlock(&DATA(iow)->mutex);
165        pthread_join(DATA(iow)->consumer,NULL);
166        free(iow->data);
167        free(iow);
168}
169
170iow_source_t thread_wsource = {
171        "threadw",
172        thread_wwrite,
173        thread_wclose
174};
Note: See TracBrowser for help on using the repository browser.