1 /*
2     This file is part of BioD.
3     Copyright (C) 2012-2013    Artem Tarasov <lomereiter@gmail.com>
4 
5     Permission is hereby granted, free of charge, to any person obtaining a
6     copy of this software and associated documentation files (the "Software"),
7     to deal in the Software without restriction, including without limitation
8     the rights to use, copy, modify, merge, publish, distribute, sublicense,
9     and/or sell copies of the Software, and to permit persons to whom the
10     Software is furnished to do so, subject to the following conditions:
11 
12     The above copyright notice and this permission notice shall be included in
13     all copies or substantial portions of the Software.
14 
15     THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16     IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17     FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18     AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19     LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20     FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21     DEALINGS IN THE SOFTWARE.
22 
23 */
24 module bio.core.bgzf.outputstream;
25 
26 import bio.core.bgzf.constants;
27 import bio.core.bgzf.compress;
28 
29 import bio.core.utils.roundbuf;
30 
31 import contrib.undead.stream;
32 import std.exception;
33 import std.parallelism;
34 import std.array;
35 import std.algorithm : max;
36 import std.typecons;
37 import core.stdc.stdlib;
38 
39 alias void delegate(ubyte[], ubyte[]) BlockWriteHandler;
40 
41 Tuple!(ubyte[], ubyte[], BlockWriteHandler)
42 bgzfCompressFunc(ubyte[] input, int level, ubyte[] output_buffer,
43 		 BlockWriteHandler handler)
44 {
45     auto output = bgzfCompress(input, level, output_buffer);
46     return tuple(input, output, handler);
47 }
48 
49 /// Class for BGZF compression
50 class BgzfOutputStream : Stream {
51 
52     private {
53         Stream _stream = void;
54         TaskPool _task_pool = void;
55 
56         ubyte[] _buffer; // a slice into _compression_buffer (uncompressed data)
57         ubyte[] _tmp;    // a slice into _compression_buffer (compressed data)
58         size_t _current_size;
59 
60         int _compression_level;
61 
62         alias Task!(bgzfCompressFunc,
63 		    ubyte[], int, ubyte[], BlockWriteHandler) CompressionTask;
64         RoundBuf!(CompressionTask*) _compression_tasks;
65         ubyte[] _compression_buffer;
66     }
67 
68     /// Create new BGZF output stream which will use
69     /// provided $(D task_pool) to do multithreaded compression.
70     this(Stream output_stream,
71          int compression_level=-1,
72          TaskPool task_pool=taskPool,
73          size_t buffer_size=0,
74          size_t max_block_size=BGZF_MAX_BLOCK_SIZE,
75          size_t block_size=BGZF_BLOCK_SIZE)
76     {
77         enforce(-1 <= compression_level && compression_level <= 9,
78                 "Compression level must be a number in interval [-1, 9]");
79         _stream = output_stream;
80         _task_pool = task_pool;
81         _compression_level = compression_level;
82 
83         size_t n_tasks = max(task_pool.size, 1) * 16;
84         if (buffer_size > 0) {
85             n_tasks = max(n_tasks, buffer_size / max_block_size);
86         }
87         _compression_tasks = RoundBuf!(CompressionTask*)(n_tasks);
88 
89         // 1 extra block to which we can write while n_tasks are executed
90         auto comp_buf_size = (2 * n_tasks + 2) * max_block_size;
91         auto p = cast(ubyte*)core.stdc.stdlib.malloc(comp_buf_size);
92         _compression_buffer = p[0 .. comp_buf_size];
93         _buffer = _compression_buffer[0 .. block_size];
94         _tmp = _compression_buffer[max_block_size .. max_block_size * 2];
95 
96         readable = false;
97         writeable = true;
98         seekable = false;
99     }
100 
101     override size_t readBlock(void* buffer, size_t size) {
102         throw new ReadException("Stream is not readable");
103     }
104 
105     override ulong seek(long offset, SeekPos whence) {
106         throw new SeekException("Stream is not seekable");
107     }
108 
109     override size_t writeBlock(const void* buf, size_t size) {
110         if (size + _current_size >= _buffer.length) {
111             size_t room;
112             ubyte[] data = (cast(ubyte*)buf)[0 .. size];
113 
114             while (data.length + _current_size >= _buffer.length) {
115                 room = _buffer.length - _current_size;
116                 _buffer[$ - room .. $] = data[0 .. room];
117                 data = data[room .. $];
118 
119                 _current_size = _buffer.length;
120 
121                 flushCurrentBlock();
122             }
123 
124             _buffer[0 .. data.length] = data[];
125             _current_size = data.length;
126         } else {
127             _buffer[_current_size .. _current_size + size] = (cast(ubyte*)buf)[0 .. size];
128             _current_size += size;
129         }
130 
131         return size;
132     }
133 
134     /// Force flushing current block, even if it is not yet filled.
135     /// Should be used when it's not desired to have records crossing block borders.
136     void flushCurrentBlock() {
137 
138         if (_current_size == 0)
139             return;
140 
141         Tuple!(ubyte[], ubyte[], BlockWriteHandler) front_result;
142         if (_compression_tasks.full) {
143             front_result = _compression_tasks.front.yieldForce();
144             _compression_tasks.popFront();
145         }
146 
147         auto compression_task = task!bgzfCompressFunc(_buffer[0 .. _current_size],
148 						      _compression_level, _tmp,
149 						      _before_write);
150         _compression_tasks.put(compression_task);
151         _task_pool.put(compression_task);
152 
153         size_t offset = _buffer.ptr - _compression_buffer.ptr;
154         immutable N = _tmp.length;
155         offset += 2 * N;
156         if (offset == _compression_buffer.length)
157             offset = 0;
158         _buffer = _compression_buffer[offset .. offset + _buffer.length];
159         _tmp = _compression_buffer[offset + N .. offset + 2 * N];
160         _current_size = 0;
161 
162         if (front_result[0] !is null)
163 	    writeResult(front_result);
164 
165         while (!_compression_tasks.empty) {
166             auto task = _compression_tasks.front;
167             if (!task.done())
168                 break;
169             auto result = task.yieldForce();
170 	    writeResult(result);
171             _compression_tasks.popFront();
172         }
173     }
174 
175     private void delegate(ubyte[], ubyte[]) _before_write;
176     void setWriteHandler(void delegate(ubyte[], ubyte[]) handler) {
177 	_before_write = handler;
178     }
179 
180     private void writeResult(Tuple!(ubyte[], ubyte[], BlockWriteHandler) block) {
181 	auto uncompressed = block[0];
182 	auto compressed = block[1];
183 	auto handler = block[2];
184 	if (handler) {// write handler enabled
185 	    handler(uncompressed, compressed);
186 	}
187 	_stream.writeExact(compressed.ptr, compressed.length);
188     }
189 
190     /// Flush all remaining BGZF blocks and underlying stream.
191     override void flush() {
192         flushCurrentBlock();
193 
194 	while (!_compression_tasks.empty) {
195             auto task = _compression_tasks.front;
196             auto block = task.yieldForce();
197             writeResult(block);
198             _compression_tasks.popFront();
199         }
200 
201         _stream.flush();
202         _current_size = 0;
203     }
204 
205     /// Flush all remaining BGZF blocks and close source stream.
206     /// Automatically adds empty block at the end, serving as
207     /// indicator of end of stream.
208     override void close() {
209         flush();
210 
211         addEofBlock();
212 
213         _stream.close();
214 
215         writeable = false;
216         core.stdc.stdlib.free(_compression_buffer.ptr);
217     }
218 
219     /// Adds EOF block. This function is called in close() method.
220     void addEofBlock() {
221         _stream.writeExact(BGZF_EOF.ptr, BGZF_EOF.length);
222     }
223 }
224 
225 unittest {
226     import bio.core.bgzf.inputstream;
227 
228     import std.array, std.range, std.stdio;
229 
230     char[] data = "my very l" ~ array(repeat('o', 1000000)) ~ "ng string";
231 
232     foreach (level; [-1, 0, 1]) {
233         auto output_stream = new MemoryStream();
234         auto bgzf_output_stream = new BgzfOutputStream(output_stream, 1);
235         bgzf_output_stream.writeExact(data.ptr, data.length);
236         bgzf_output_stream.close();
237 
238         auto input_stream = new MemoryStream(output_stream.data);
239         input_stream.seekSet(0);
240 
241         auto block_supplier = new StreamSupplier(input_stream);
242         auto bgzf_input_stream = new BgzfInputStream(block_supplier);
243         char[] uncompressed_data = new char[data.length];
244         bgzf_input_stream.readExact(uncompressed_data.ptr, data.length);
245         assert(uncompressed_data == data);
246     }
247 }