1 /* 2 This file is part of BioD. 3 Copyright (C) 2012-2013 Artem Tarasov <lomereiter@gmail.com> 4 5 Permission is hereby granted, free of charge, to any person obtaining a 6 copy of this software and associated documentation files (the "Software"), 7 to deal in the Software without restriction, including without limitation 8 the rights to use, copy, modify, merge, publish, distribute, sublicense, 9 and/or sell copies of the Software, and to permit persons to whom the 10 Software is furnished to do so, subject to the following conditions: 11 12 The above copyright notice and this permission notice shall be included in 13 all copies or substantial portions of the Software. 14 15 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 16 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 17 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 18 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 19 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 20 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER 21 DEALINGS IN THE SOFTWARE. 22 23 */ 24 module bio.core.bgzf.outputstream; 25 26 import bio.core.bgzf.constants; 27 import bio.core.bgzf.compress; 28 29 import bio.core.utils.roundbuf; 30 31 import contrib.undead.stream; 32 import std.exception; 33 import std.parallelism; 34 import std.array; 35 import std.algorithm : max; 36 import std.typecons; 37 import core.stdc.stdlib; 38 39 alias void delegate(ubyte[], ubyte[]) BlockWriteHandler; 40 41 Tuple!(ubyte[], ubyte[], BlockWriteHandler) 42 bgzfCompressFunc(ubyte[] input, int level, ubyte[] output_buffer, 43 BlockWriteHandler handler) 44 { 45 auto output = bgzfCompress(input, level, output_buffer); 46 return tuple(input, output, handler); 47 } 48 49 /// Class for BGZF compression 50 class BgzfOutputStream : Stream { 51 52 private { 53 Stream _stream = void; 54 TaskPool _task_pool = void; 55 56 ubyte[] _buffer; // a slice into _compression_buffer (uncompressed data) 57 ubyte[] _tmp; // a slice into _compression_buffer (compressed data) 58 size_t _current_size; 59 60 int _compression_level; 61 62 alias Task!(bgzfCompressFunc, 63 ubyte[], int, ubyte[], BlockWriteHandler) CompressionTask; 64 RoundBuf!(CompressionTask*) _compression_tasks; 65 ubyte[] _compression_buffer; 66 } 67 68 /// Create new BGZF output stream which will use 69 /// provided $(D task_pool) to do multithreaded compression. 70 this(Stream output_stream, 71 int compression_level=-1, 72 TaskPool task_pool=taskPool, 73 size_t buffer_size=0, 74 size_t max_block_size=BGZF_MAX_BLOCK_SIZE, 75 size_t block_size=BGZF_BLOCK_SIZE) 76 { 77 enforce(-1 <= compression_level && compression_level <= 9, 78 "Compression level must be a number in interval [-1, 9]"); 79 _stream = output_stream; 80 _task_pool = task_pool; 81 _compression_level = compression_level; 82 83 size_t n_tasks = max(task_pool.size, 1) * 16; 84 if (buffer_size > 0) { 85 n_tasks = max(n_tasks, buffer_size / max_block_size); 86 } 87 _compression_tasks = RoundBuf!(CompressionTask*)(n_tasks); 88 89 // 1 extra block to which we can write while n_tasks are executed 90 auto comp_buf_size = (2 * n_tasks + 2) * max_block_size; 91 auto p = cast(ubyte*)core.stdc.stdlib.malloc(comp_buf_size); 92 _compression_buffer = p[0 .. comp_buf_size]; 93 _buffer = _compression_buffer[0 .. block_size]; 94 _tmp = _compression_buffer[max_block_size .. max_block_size * 2]; 95 96 readable = false; 97 writeable = true; 98 seekable = false; 99 } 100 101 override size_t readBlock(void* buffer, size_t size) { 102 throw new ReadException("Stream is not readable"); 103 } 104 105 override ulong seek(long offset, SeekPos whence) { 106 throw new SeekException("Stream is not seekable"); 107 } 108 109 override size_t writeBlock(const void* buf, size_t size) { 110 if (size + _current_size >= _buffer.length) { 111 size_t room; 112 ubyte[] data = (cast(ubyte*)buf)[0 .. size]; 113 114 while (data.length + _current_size >= _buffer.length) { 115 room = _buffer.length - _current_size; 116 _buffer[$ - room .. $] = data[0 .. room]; 117 data = data[room .. $]; 118 119 _current_size = _buffer.length; 120 121 flushCurrentBlock(); 122 } 123 124 _buffer[0 .. data.length] = data[]; 125 _current_size = data.length; 126 } else { 127 _buffer[_current_size .. _current_size + size] = (cast(ubyte*)buf)[0 .. size]; 128 _current_size += size; 129 } 130 131 return size; 132 } 133 134 /// Force flushing current block, even if it is not yet filled. 135 /// Should be used when it's not desired to have records crossing block borders. 136 void flushCurrentBlock() { 137 138 if (_current_size == 0) 139 return; 140 141 Tuple!(ubyte[], ubyte[], BlockWriteHandler) front_result; 142 if (_compression_tasks.full) { 143 front_result = _compression_tasks.front.yieldForce(); 144 _compression_tasks.popFront(); 145 } 146 147 auto compression_task = task!bgzfCompressFunc(_buffer[0 .. _current_size], 148 _compression_level, _tmp, 149 _before_write); 150 _compression_tasks.put(compression_task); 151 _task_pool.put(compression_task); 152 153 size_t offset = _buffer.ptr - _compression_buffer.ptr; 154 immutable N = _tmp.length; 155 offset += 2 * N; 156 if (offset == _compression_buffer.length) 157 offset = 0; 158 _buffer = _compression_buffer[offset .. offset + _buffer.length]; 159 _tmp = _compression_buffer[offset + N .. offset + 2 * N]; 160 _current_size = 0; 161 162 if (front_result[0] !is null) 163 writeResult(front_result); 164 165 while (!_compression_tasks.empty) { 166 auto task = _compression_tasks.front; 167 if (!task.done()) 168 break; 169 auto result = task.yieldForce(); 170 writeResult(result); 171 _compression_tasks.popFront(); 172 } 173 } 174 175 private void delegate(ubyte[], ubyte[]) _before_write; 176 void setWriteHandler(void delegate(ubyte[], ubyte[]) handler) { 177 _before_write = handler; 178 } 179 180 private void writeResult(Tuple!(ubyte[], ubyte[], BlockWriteHandler) block) { 181 auto uncompressed = block[0]; 182 auto compressed = block[1]; 183 auto handler = block[2]; 184 if (handler) {// write handler enabled 185 handler(uncompressed, compressed); 186 } 187 _stream.writeExact(compressed.ptr, compressed.length); 188 } 189 190 /// Flush all remaining BGZF blocks and underlying stream. 191 override void flush() { 192 flushCurrentBlock(); 193 194 while (!_compression_tasks.empty) { 195 auto task = _compression_tasks.front; 196 auto block = task.yieldForce(); 197 writeResult(block); 198 _compression_tasks.popFront(); 199 } 200 201 _stream.flush(); 202 _current_size = 0; 203 } 204 205 /// Flush all remaining BGZF blocks and close source stream. 206 /// Automatically adds empty block at the end, serving as 207 /// indicator of end of stream. 208 override void close() { 209 flush(); 210 211 addEofBlock(); 212 213 _stream.close(); 214 215 writeable = false; 216 core.stdc.stdlib.free(_compression_buffer.ptr); 217 } 218 219 /// Adds EOF block. This function is called in close() method. 220 void addEofBlock() { 221 _stream.writeExact(BGZF_EOF.ptr, BGZF_EOF.length); 222 } 223 } 224 225 unittest { 226 import bio.core.bgzf.inputstream; 227 228 import std.array, std.range, std.stdio; 229 230 char[] data = "my very l" ~ array(repeat('o', 1000000)) ~ "ng string"; 231 232 foreach (level; [-1, 0, 1]) { 233 auto output_stream = new MemoryStream(); 234 auto bgzf_output_stream = new BgzfOutputStream(output_stream, 1); 235 bgzf_output_stream.writeExact(data.ptr, data.length); 236 bgzf_output_stream.close(); 237 238 auto input_stream = new MemoryStream(output_stream.data); 239 input_stream.seekSet(0); 240 241 auto block_supplier = new StreamSupplier(input_stream); 242 auto bgzf_input_stream = new BgzfInputStream(block_supplier); 243 char[] uncompressed_data = new char[data.length]; 244 bgzf_input_stream.readExact(uncompressed_data.ptr, data.length); 245 assert(uncompressed_data == data); 246 } 247 }