source: lib/iow-thread.c @ cf30639

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivegetfragoffhelplibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since cf30639 was cf30639, checked in by Perry Lorier <perry@…>, 13 years ago

Threading cleanups (and some memory leaks addressed)

  • Property mode set to 100644
File size: 3.6 KB
Line 
1#include "wandio.h"
2#include <sys/types.h>
3#include <sys/stat.h>
4#include <fcntl.h>
5#include <stdlib.h>
6#include <pthread.h>
7#include <string.h>
8#include <stdbool.h>
9
10/* 1MB Buffer */
11#define BUFFERSIZE (1024*1024)
12#define BUFFERS 100
13
14extern iow_source_t thread_wsource;
15
16struct buffer_t {
17        char buffer[BUFFERSIZE];
18        int len;
19        enum { EMPTY = 0, FULL = 1 } state;
20        pthread_cond_t dataready;
21        pthread_cond_t spaceavail;
22        pthread_mutex_t mutex;
23};
24
25struct state_t {
26        struct buffer_t buffer[BUFFERS];
27        int out_buffer;
28        off_t offset;
29        pthread_t consumer;
30        bool closing;
31        iow_t *iow;
32};
33
34#define DATA(x) ((struct state_t *)((x)->data))
35#define OUTBUFFER(x) (DATA(x)->buffer[DATA(x)->out_buffer])
36#define min(a,b) ((a)<(b) ? (a) : (b))
37
38static void *thread_consumer(void *userdata)
39{
40        int buffer=0;
41        bool running = true;
42        iow_t *state = (iow_t *) userdata;
43
44        do {
45                pthread_mutex_lock(&DATA(state)->buffer[buffer].mutex);
46                while (DATA(state)->buffer[buffer].state == EMPTY) {
47                        if (DATA(state)->closing)
48                                break;
49                        pthread_cond_wait(&DATA(state)->buffer[buffer].dataready,
50                                        &DATA(state)->buffer[buffer].mutex);
51                }
52                /* Empty the buffer */
53                wandio_wwrite(
54                                DATA(state)->iow,
55                                DATA(state)->buffer[buffer].buffer,
56                                DATA(state)->buffer[buffer].len);
57
58
59                /* if we've not reached the end of the file keep going */
60                running = ( DATA(state)->buffer[buffer].len > 0 );
61                DATA(state)->buffer[buffer].len = 0;
62                DATA(state)->buffer[buffer].state = EMPTY;
63
64                pthread_cond_signal(&DATA(state)->buffer[buffer].spaceavail);
65
66                pthread_mutex_unlock(&DATA(state)->buffer[buffer].mutex);
67
68                /* Flip buffers */
69                buffer=(buffer+1) % BUFFERS;
70
71        } while(running);
72
73        fprintf(stderr,"Write thread leaving\n");
74
75        wandio_wdestroy(DATA(state)->iow);
76
77        return NULL;
78}
79
80iow_t *thread_wopen(iow_t *child)
81{
82        iow_t *state;
83
84        if (!child) {
85                return NULL;
86        }
87       
88
89        state = malloc(sizeof(iow_t));
90        state->data = calloc(1,sizeof(struct state_t));
91        state->source = &thread_wsource;
92
93        DATA(state)->out_buffer = 0;
94        DATA(state)->offset = 0;
95        pthread_mutex_init(&DATA(state)->buffer[0].mutex,NULL);
96        pthread_cond_init(&DATA(state)->buffer[0].dataready,NULL);
97        pthread_cond_init(&DATA(state)->buffer[0].spaceavail,NULL);
98
99        DATA(state)->iow = child;
100        DATA(state)->closing = false;
101
102        pthread_create(&DATA(state)->consumer,NULL,thread_consumer,state);
103
104        return state;
105}
106
107static off_t thread_wwrite(iow_t *state, const char *buffer, off_t len)
108{
109        int slice;
110        int copied=0;
111        int newbuffer;
112
113        while(len>0) {
114                pthread_mutex_lock(&OUTBUFFER(state).mutex);
115                while (OUTBUFFER(state).state == FULL) {
116                        pthread_cond_wait(&OUTBUFFER(state).spaceavail,
117                                        &OUTBUFFER(state).mutex);
118                }
119
120                slice=min( 
121                        (off_t)sizeof(OUTBUFFER(state).buffer)-DATA(state)->offset,
122                        len);
123                               
124                memcpy(
125                        OUTBUFFER(state).buffer+DATA(state)->offset,
126                        buffer,
127                        slice
128                        );
129
130                DATA(state)->offset += slice;
131                OUTBUFFER(state).len += slice;
132
133                buffer += slice;
134                len -= slice;
135                copied += slice;
136                newbuffer = DATA(state)->out_buffer;
137
138                if (DATA(state)->offset >= (off_t)sizeof(OUTBUFFER(state).buffer)) {
139                        OUTBUFFER(state).state = FULL;
140                        pthread_cond_signal(&OUTBUFFER(state).dataready);
141                        DATA(state)->offset = 0;
142                        newbuffer = (newbuffer+1) % BUFFERS;
143                }
144
145                pthread_mutex_unlock(&OUTBUFFER(state).mutex);
146
147                DATA(state)->out_buffer = newbuffer;
148        }
149        return copied;
150}
151
152static void thread_wclose(iow_t *iow)
153{
154        pthread_mutex_lock(&OUTBUFFER(iow).mutex);
155        DATA(iow)->closing = true;
156        pthread_cond_signal(&OUTBUFFER(iow).dataready);
157        pthread_mutex_unlock(&OUTBUFFER(iow).mutex);
158        pthread_join(DATA(iow)->consumer,NULL);
159        free(iow->data);
160        free(iow);
161}
162
163iow_source_t thread_wsource = {
164        "threadw",
165        thread_wwrite,
166        thread_wclose
167};
Note: See TracBrowser for help on using the repository browser.