Changeset cf30639
- Timestamp:
- 12/17/08 15:50:25 (12 years ago)
- Branches:
- 4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, getfragoff, help, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
- Children:
- 417a136
- Parents:
- d4a1691
- Location:
- lib
- Files:
-
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
lib/ior-stdio.c
rc66068d rcf30639 52 52 { 53 53 close(DATA(io)->fd); 54 free(io->data); 54 55 free(io); 55 56 } -
lib/ior-thread.c
rc66068d rcf30639 18 18 int len; 19 19 enum { EMPTY = 0, FULL = 1 } state; 20 pthread_cond_t dataready;21 pthread_cond_t spaceavail;22 pthread_mutex_t mutex;23 20 }; 24 21 … … 28 25 int offset; 29 26 pthread_t producer; 27 pthread_cond_t space_avail; 28 pthread_cond_t data_ready; 29 pthread_mutex_t mutex; 30 30 bool closing; 31 bool closed; 31 32 io_t *io; 32 33 }; … … 42 43 bool running = true; 43 44 45 pthread_mutex_lock(&DATA(state)->mutex); 44 46 do { 45 pthread_mutex_lock(&DATA(state)->buffer[buffer].mutex);46 47 while (DATA(state)->buffer[buffer].state == FULL) { 47 48 if (DATA(state)->closing) 48 49 break; 49 pthread_cond_wait(&DATA(state)->buffer[buffer].spaceavail, 50 &DATA(state)->buffer[buffer].mutex); 50 pthread_cond_wait(&DATA(state)->space_avail, &DATA(state)->mutex); 51 51 } 52 53 if (DATA(state)->closing) { 54 break; 55 } 56 pthread_mutex_unlock(&DATA(state)->mutex); 57 52 58 /* Fill the buffer */ 53 59 DATA(state)->buffer[buffer].len=wandio_read( … … 56 62 sizeof(DATA(state)->buffer[buffer].buffer)); 57 63 64 pthread_mutex_lock(&DATA(state)->mutex); 65 58 66 DATA(state)->buffer[buffer].state = FULL; 59 67 … … 61 69 running = (DATA(state)->buffer[buffer].len > 0 ); 62 70 63 pthread_cond_signal(&DATA(state)->buffer[buffer].dataready); 64 65 pthread_mutex_unlock(&DATA(state)->buffer[buffer].mutex); 71 pthread_cond_signal(&DATA(state)->data_ready); 66 72 67 73 /* Flip buffers */ … … 70 76 } while(running); 71 77 78 72 79 wandio_destroy(DATA(state)->io); 80 81 DATA(state)->closed = true; 82 pthread_cond_signal(&DATA(state)->data_ready); 83 pthread_mutex_unlock(&DATA(state)->mutex); 73 84 74 85 return NULL; … … 90 101 DATA(state)->in_buffer = 0; 91 102 DATA(state)->offset = 0; 92 pthread_mutex_init(&DATA(state)-> buffer[0].mutex,NULL);93 pthread_cond_init(&DATA(state)-> buffer[0].dataready,NULL);94 pthread_cond_init(&DATA(state)-> buffer[0].spaceavail,NULL);103 pthread_mutex_init(&DATA(state)->mutex,NULL); 104 pthread_cond_init(&DATA(state)->data_ready,NULL); 105 pthread_cond_init(&DATA(state)->space_avail,NULL); 95 106 96 107 DATA(state)->io = parent; 97 108 DATA(state)->closing = false; 109 DATA(state)->closed = false; 98 110 99 111 pthread_create(&DATA(state)->producer,NULL,thread_producer,state); … … 109 121 110 122 while(len>0) { 111 pthread_mutex_lock(& INBUFFER(state).mutex);123 pthread_mutex_lock(&DATA(state)->mutex); 112 124 while (INBUFFER(state).state == EMPTY) { 113 pthread_cond_wait(&INBUFFER(state).dataready, 114 &INBUFFER(state).mutex); 125 pthread_cond_wait(&DATA(state)->data_ready, &DATA(state)->mutex); 115 126 116 127 } … … 121 132 copied = INBUFFER(state).len; 122 133 123 pthread_mutex_unlock(& INBUFFER(state).mutex);134 pthread_mutex_unlock(&DATA(state)->mutex); 124 135 return copied; 125 136 } 126 137 127 138 slice=min( INBUFFER(state).len-DATA(state)->offset,len); 139 140 pthread_mutex_unlock(&DATA(state)->mutex); 128 141 129 142 memcpy( … … 136 149 len-=slice; 137 150 copied+=slice; 151 152 pthread_mutex_lock(&DATA(state)->mutex); 138 153 DATA(state)->offset+=slice; 139 154 newbuffer = DATA(state)->in_buffer; … … 141 156 if (DATA(state)->offset >= INBUFFER(state).len) { 142 157 INBUFFER(state).state = EMPTY; 143 pthread_cond_signal(& INBUFFER(state).spaceavail);158 pthread_cond_signal(&DATA(state)->space_avail); 144 159 newbuffer = (newbuffer+1) % BUFFERS; 145 160 DATA(state)->offset = 0; 146 161 } 147 162 148 pthread_mutex_unlock(& INBUFFER(state).mutex);163 pthread_mutex_unlock(&DATA(state)->mutex); 149 164 150 165 DATA(state)->in_buffer = newbuffer; … … 155 170 static void thread_close(io_t *io) 156 171 { 157 pthread_mutex_lock(& INBUFFER(io).mutex);172 pthread_mutex_lock(&DATA(io)->mutex); 158 173 DATA(io)->closing = true; 159 pthread_cond_signal(&INBUFFER(io).spaceavail); 160 pthread_mutex_unlock(&INBUFFER(io).mutex); 174 pthread_cond_signal(&DATA(io)->space_avail); 175 176 /* Wait until the producer thread dies */ 177 while (!DATA(io)->closed) { 178 pthread_cond_wait(&DATA(io)->data_ready, &DATA(io)->mutex); 179 } 180 pthread_mutex_unlock(&DATA(io)->mutex); 181 free(DATA(io)); 161 182 free(io); 162 183 } -
lib/iow-thread.c
rc66068d rcf30639 157 157 pthread_mutex_unlock(&OUTBUFFER(iow).mutex); 158 158 pthread_join(DATA(iow)->consumer,NULL); 159 free(iow->data); 159 160 free(iow); 160 161 } -
lib/wandio.c
rc66068d rcf30639 18 18 io_t *io = peek_open(stdio_open(filename)); 19 19 char buffer[1024]; 20 int len; 20 21 if (!io) 21 22 return NULL; 22 wandio_peek(io, buffer, sizeof(buffer));23 len = wandio_peek(io, buffer, sizeof(buffer)); 23 24 #if HAVE_LIBZ 24 25 /* auto detect gzip compressed data */ 25 if ( buffer[0] == '\037' && buffer[1] == '\213') {26 if (len>=2 && buffer[0] == '\037' && buffer[1] == '\213') { 26 27 io = zlib_open(io); 27 28 } 28 29 /* auto detect compress(1) compressed data (gzip can read this) */ 29 if ( buffer[0] == '\037' && buffer[1] == '\235') {30 if (len>=2 && buffer[0] == '\037' && buffer[1] == '\235') { 30 31 io = zlib_open(io); 31 32 } … … 33 34 #if HAVE_LIBBZ2 34 35 /* auto detect bzip compressed data */ 35 else if (buffer[0] == 'B' && buffer[1] == 'Z' && buffer[2] == 'h') {36 if (len>=3 && buffer[0] == 'B' && buffer[1] == 'Z' && buffer[2] == 'h') { 36 37 io = bz_open(io); 37 38 }
Note: See TracChangeset
for help on using the changeset viewer.