source: lib/iow-thread.c @ 15e9390

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivegetfragoffhelplibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 15e9390 was 15e9390, checked in by Perry Lorier <perry@…>, 11 years ago

Rearrange struct fields to get better packing on 64bit machines.

  • Property mode set to 100644
File size: 3.6 KB
Line 
1#include "wandio.h"
2#include <sys/types.h>
3#include <sys/stat.h>
4#include <fcntl.h>
5#include <stdlib.h>
6#include <pthread.h>
7#include <string.h>
8#include <stdbool.h>
9
10/* 1MB Buffer */
11#define BUFFERSIZE (1024*1024)
12#define BUFFERS 100
13
14extern iow_source_t thread_wsource;
15
16struct buffer_t {
17        char buffer[BUFFERSIZE];
18        int len;
19        enum { EMPTY = 0, FULL = 1 } state;
20};
21
22struct state_t {
23        struct buffer_t buffer[BUFFERS];
24        off_t offset;
25        pthread_t consumer;
26        iow_t *iow;
27        pthread_cond_t data_ready;
28        pthread_cond_t space_avail;
29        pthread_mutex_t mutex;
30        int out_buffer;
31        bool closing;
32};
33
34#define DATA(x) ((struct state_t *)((x)->data))
35#define OUTBUFFER(x) (DATA(x)->buffer[DATA(x)->out_buffer])
36#define min(a,b) ((a)<(b) ? (a) : (b))
37
38static void *thread_consumer(void *userdata)
39{
40        int buffer=0;
41        bool running = true;
42        iow_t *state = (iow_t *) userdata;
43
44        pthread_mutex_lock(&DATA(state)->mutex);
45        do {
46                while (DATA(state)->buffer[buffer].state == EMPTY) {
47                        if (DATA(state)->closing)
48                                break;
49                        pthread_cond_wait(&DATA(state)->data_ready,
50                                        &DATA(state)->mutex);
51                }
52                /* Empty the buffer */
53
54                pthread_mutex_unlock(&DATA(state)->mutex);
55                wandio_wwrite(
56                                DATA(state)->iow,
57                                DATA(state)->buffer[buffer].buffer,
58                                DATA(state)->buffer[buffer].len);
59                pthread_mutex_lock(&DATA(state)->mutex);
60
61                /* if we've not reached the end of the file keep going */
62                running = ( DATA(state)->buffer[buffer].len > 0 );
63                DATA(state)->buffer[buffer].len = 0;
64                DATA(state)->buffer[buffer].state = EMPTY;
65
66                pthread_cond_signal(&DATA(state)->space_avail);
67
68
69                /* Flip buffers */
70                buffer=(buffer+1) % BUFFERS;
71
72        } while(running);
73
74        fprintf(stderr,"Write thread leaving\n");
75
76        wandio_wdestroy(DATA(state)->iow);
77
78        pthread_mutex_unlock(&DATA(state)->mutex);
79        return NULL;
80}
81
82iow_t *thread_wopen(iow_t *child)
83{
84        iow_t *state;
85
86        if (!child) {
87                return NULL;
88        }
89       
90
91        state = malloc(sizeof(iow_t));
92        state->data = calloc(1,sizeof(struct state_t));
93        state->source = &thread_wsource;
94
95        DATA(state)->out_buffer = 0;
96        DATA(state)->offset = 0;
97        pthread_mutex_init(&DATA(state)->mutex,NULL);
98        pthread_cond_init(&DATA(state)->data_ready,NULL);
99        pthread_cond_init(&DATA(state)->space_avail,NULL);
100
101        DATA(state)->iow = child;
102        DATA(state)->closing = false;
103
104        pthread_create(&DATA(state)->consumer,NULL,thread_consumer,state);
105
106        return state;
107}
108
109static off_t thread_wwrite(iow_t *state, const char *buffer, off_t len)
110{
111        int slice;
112        int copied=0;
113        int newbuffer;
114
115        pthread_mutex_lock(&DATA(state)->mutex);
116        while(len>0) {
117                while (OUTBUFFER(state).state == FULL) {
118                        pthread_cond_wait(&DATA(state)->space_avail,
119                                        &DATA(state)->mutex);
120                }
121
122                slice=min( 
123                        (off_t)sizeof(OUTBUFFER(state).buffer)-DATA(state)->offset,
124                        len);
125                               
126                pthread_mutex_unlock(&DATA(state)->mutex);
127                memcpy(
128                        OUTBUFFER(state).buffer+DATA(state)->offset,
129                        buffer,
130                        slice
131                        );
132                pthread_mutex_lock(&DATA(state)->mutex);
133
134                DATA(state)->offset += slice;
135                OUTBUFFER(state).len += slice;
136
137                buffer += slice;
138                len -= slice;
139                copied += slice;
140                newbuffer = DATA(state)->out_buffer;
141
142                if (DATA(state)->offset >= (off_t)sizeof(OUTBUFFER(state).buffer)) {
143                        OUTBUFFER(state).state = FULL;
144                        pthread_cond_signal(&DATA(state)->data_ready);
145                        DATA(state)->offset = 0;
146                        newbuffer = (newbuffer+1) % BUFFERS;
147                }
148
149                DATA(state)->out_buffer = newbuffer;
150        }
151
152        pthread_mutex_unlock(&DATA(state)->mutex);
153        return copied;
154}
155
156static void thread_wclose(iow_t *iow)
157{
158        pthread_mutex_lock(&DATA(iow)->mutex);
159        DATA(iow)->closing = true;
160        pthread_cond_signal(&DATA(iow)->data_ready);
161        pthread_mutex_unlock(&DATA(iow)->mutex);
162        pthread_join(DATA(iow)->consumer,NULL);
163        free(iow->data);
164        free(iow);
165}
166
167iow_source_t thread_wsource = {
168        "threadw",
169        thread_wwrite,
170        thread_wclose
171};
Note: See TracBrowser for help on using the repository browser.