1 /*
2     This file is part of BioD.
3     Copyright (C) 2013-2016    Artem Tarasov <lomereiter@gmail.com>
4 
5     Permission is hereby granted, free of charge, to any person obtaining a
6     copy of this software and associated documentation files (the "Software"),
7     to deal in the Software without restriction, including without limitation
8     the rights to use, copy, modify, merge, publish, distribute, sublicense,
9     and/or sell copies of the Software, and to permit persons to whom the
10     Software is furnished to do so, subject to the following conditions:
11 
12     The above copyright notice and this permission notice shall be included in
13     all copies or substantial portions of the Software.
14 
15     THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16     IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17     FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18     AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19     LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20     FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21     DEALINGS IN THE SOFTWARE.
22 
23 */
24 module bio.core.bgzf.inputstream;
25 
26 import bio.core.bgzf.block;
27 import bio.core.bgzf.virtualoffset;
28 import bio.core.bgzf.constants;
29 import bio.core.bgzf.chunk;
30 import bio.bam.constants;
31 import bio.core.utils.roundbuf;
32 
33 import undead.stream;
34 import std.exception;
35 import std.conv;
36 import std.parallelism;
37 import std.array;
38 import std.algorithm : min, max;
39 
40 /// Exception type, thrown in case of encountering corrupt BGZF blocks
41 class BgzfException : Exception {
42     this(string msg) { super(msg); }
43 }
44 
45 bool fillBgzfBufferFromStream(Stream stream, bool is_seekable,
46                               BgzfBlock* block, ubyte* buffer,
47                               size_t *number_of_bytes_read=null)
48 {
49     if (stream.eof())
50         return false;
51 
52     ulong start_offset;
53     void throwBgzfException(string msg) {
54         throw new BgzfException("Error reading BGZF block starting from offset " ~
55                                 to!string(start_offset) ~ ": " ~ msg);
56     }
57 
58     if (is_seekable)
59         start_offset = stream.position;
60 
61     try {
62         ubyte[4] bgzf_magic = void;
63 
64         size_t bytes_read;
65         while (bytes_read < 4) {
66             auto buf = bgzf_magic.ptr + bytes_read;
67             auto read_ = stream.read(buf[0 .. 4 - bytes_read]);
68             if (read_ == 0)
69                 return false;
70             bytes_read += read_;
71         }
72 
73         if (bgzf_magic != BGZF_MAGIC) {
74             throwBgzfException("wrong BGZF magic");
75         }
76 
77         ushort gzip_extra_length = void;
78 
79         if (is_seekable) {
80             stream.seekCur(uint.sizeof + 2 * ubyte.sizeof);
81         } else {
82             uint gzip_mod_time = void;
83             ubyte gzip_extra_flags = void;
84             ubyte gzip_os = void;
85             stream.read(gzip_mod_time);
86             stream.read(gzip_extra_flags);
87             stream.read(gzip_os);
88         }
89 
90         stream.read(gzip_extra_length);
91 
92         ushort bsize = void; // total Block SIZE minus 1
93         bool found_block_size = false;
94 
95         // read extra subfields
96         size_t len = 0;
97         while (len < gzip_extra_length) {
98             ubyte si1 = void;    // Subfield Identifier1
99             ubyte si2 = void;    // Subfield Identifier2
100             ushort slen = void;  // Subfield LENgth
101 
102             stream.read(si1);
103             stream.read(si2);
104             stream.read(slen);
105 
106             if (si1 == BAM_SI1 && si2 == BAM_SI2) {
107                 // found 'BC' as subfield identifier
108 
109                 if (slen != 2) {
110                     throwBgzfException("wrong BC subfield length: " ~
111                                        to!string(slen) ~ "; expected 2");
112                 }
113 
114                 if (found_block_size) {
115                     throwBgzfException("duplicate field with block size");
116                 }
117 
118                 // read block size
119                 stream.read(bsize);
120                 found_block_size = true;
121 
122                 // skip the rest
123                 if (is_seekable) {
124                     stream.seekCur(slen - bsize.sizeof);
125                 } else {
126                     stream.readString(slen - bsize.sizeof);
127                 }
128             } else {
129                 // this subfield has nothing to do with block size, just skip
130                 if (is_seekable) {
131                     stream.seekCur(slen);
132                 } else {
133                     stream.readString(slen);
134                 }
135             }
136 
137             auto nbytes = si1.sizeof + si2.sizeof + slen.sizeof + slen;
138             if (number_of_bytes_read !is null)
139                 *number_of_bytes_read += nbytes;
140             len += nbytes;
141         }
142 
143         if (len != gzip_extra_length) {
144             throwBgzfException("total length of subfields in bytes (" ~
145                                to!string(len) ~
146                                ") is not equal to gzip_extra_length (" ~
147                                to!string(gzip_extra_length) ~ ")");
148         }
149 
150         if (!found_block_size) {
151             throwBgzfException("block size was not found in any subfield");
152         }
153 
154         // read compressed data
155         auto cdata_size = bsize - gzip_extra_length - 19;
156         if (cdata_size > BGZF_MAX_BLOCK_SIZE) {
157             throwBgzfException("compressed data size is more than " ~
158                                to!string(BGZF_MAX_BLOCK_SIZE) ~
159                                " bytes, which is not allowed by " ~
160                                "current BAM specification");
161         }
162 
163         block.bsize = bsize;
164         block.cdata_size = cast(ushort)cdata_size;
165 
166         version(extraVerbose) {
167             import std.stdio;
168             // stderr.writeln("[compressed] reading ", cdata_size, " bytes starting from ", start_offset);
169         }
170         stream.readExact(buffer, cdata_size);
171         version(extraVerbose) {
172             stderr.writeln("[  compressed] [write] range: ", buffer, " - ", buffer + cdata_size);
173         }
174         // version(extraVerbose) {stderr.writeln("[compressed] reading block crc32 and input size...");}
175         stream.read(block.crc32);
176         stream.read(block.input_size);
177 
178         if (number_of_bytes_read !is null)
179             *number_of_bytes_read += 12 + cdata_size + block.crc32.sizeof + block.input_size.sizeof;
180 
181         // version(extraVerbose) {stderr.writeln("[compressed] read block input size: ", block.input_size);}
182         block._buffer = buffer[0 .. max(block.input_size, cdata_size)];
183         block.start_offset = start_offset;
184         block.dirty = false;
185     } catch (ReadException e) {
186         throwBgzfException("stream error: " ~ e.msg);
187     }
188 
189     return true;
190 }
191 
192 ///
193 interface BgzfBlockSupplier {
194     /// Fills $(D buffer) with compressed data and points $(D block) to it.
195     /// Return value is false if there is no next block.
196     ///
197     /// The implementation may assume that there's enough space in the buffer.
198     bool getNextBgzfBlock(BgzfBlock* block, ubyte* buffer,
199                           ushort* skip_start, ushort* skip_end);
200 
201     /// Total compressed size of the supplied blocks in bytes.
202     /// If unknown, should return 0.
203     size_t totalCompressedSize() const;
204 }
205 
206 ///
207 class StreamSupplier : BgzfBlockSupplier {
208     private {
209         Stream _stream;
210         bool _seekable;
211         size_t _start_offset;
212         size_t _size;
213         ushort _skip_start;
214     }
215 
216     ///
217     this(Stream stream, ushort skip_start=0) {
218         _stream = stream;
219         _seekable = _stream.seekable;
220         _skip_start = skip_start;
221         if (_seekable)
222             _size = cast(size_t)(_stream.size);
223     }
224 
225     ///
226     bool getNextBgzfBlock(BgzfBlock* block, ubyte* buffer,
227                           ushort* skip_start, ushort* skip_end) {
228         auto curr_start_offset = _start_offset;
229 
230         // updates _start_offset
231         auto result = fillBgzfBufferFromStream(_stream, _seekable, block, buffer,
232                                                &_start_offset);
233         if (!_seekable)
234             block.start_offset = curr_start_offset;
235 
236         *skip_start = _skip_start;
237         _skip_start = 0;
238         *skip_end = 0;
239         return result;
240     }
241 
242     /// Stream size if available
243     size_t totalCompressedSize() const {
244         return _size;
245     }
246 }
247 
248 class StreamChunksSupplier : BgzfBlockSupplier {
249     private {
250         Stream _stream;
251         Chunk[] _chunks;
252 
253         void moveToNextChunk() {
254             if (_chunks.length == 0)
255                 return;
256             size_t i = 1;
257             auto beg = _chunks[0].beg;
258             for ( ; i < _chunks.length; ++i)
259                 if (_chunks[i].beg.coffset > _chunks[0].beg.coffset)
260                     break;
261             _chunks = _chunks[i - 1 .. $];
262             _chunks[0].beg = beg;
263             _stream.seekSet(cast(size_t)_chunks[0].beg.coffset);
264             version(extraVerbose) {
265                 import std.stdio; stderr.writeln("started processing chunk ", beg, " - ", _chunks[0].end);
266             }
267         }
268     }
269 
270     this(Stream stream, bio.core.bgzf.chunk.Chunk[] chunks) {
271         _stream = stream;
272         assert(_stream.seekable);
273         _chunks = chunks;
274         moveToNextChunk();
275     }
276 
277     ///
278     bool getNextBgzfBlock(BgzfBlock* block, ubyte* buffer,
279                           ushort* skip_start, ushort* skip_end)
280     {
281         if (_chunks.length == 0)
282             return false;
283 
284         // Usually there can't be two or more chunks overlapping a
285         // single block -- in such cases they are merged during
286         // indexing in most implementations.
287         // If this is not the case, the algorithm should still work,
288         // but it might decompress the same block several times.
289         //
290         // On each call of this method, one of these things happen:
291         // 1) We remain in the current chunk, but read next block
292         // 2) We finish dealing with the current chunk, so we move to
293         //    the next one. If this was the last one, false is returned.
294         //
295         // moveToNextChunk moves stream pointer to chunk.beg.coffset,
296         // in which case skip_start should be set to chunk.beg.uoffset
297 
298         auto result = fillBgzfBufferFromStream(_stream, true, block, buffer);
299         auto offset = block.start_offset;
300 
301         if (!result)
302             return false;
303 
304         if (offset == _chunks[0].beg.coffset)
305             *skip_start = _chunks[0].beg.uoffset; // first block in a chunk
306         else
307             *skip_start = 0;
308 
309         long _skip_end; // may be equal to 65536!
310         if (offset == _chunks[0].end.coffset) // last block in a chunk
311             _skip_end = block.input_size - _chunks[0].end.uoffset;
312         else
313             _skip_end = 0;
314 
315         *skip_end = cast(ushort)_skip_end;
316 
317         if (offset >= _chunks[0].end.coffset) {
318             _chunks = _chunks[1 .. $];
319             moveToNextChunk();
320         }
321 
322         // special case: it's not actually the last block in a chunk,
323         // but rather that chunk ended on the edge of two blocks
324         if (block.input_size > 0 && _skip_end == block.input_size) {
325             version(extraVerbose) { import std.stdio; stderr.writeln("skip_end == input size"); }
326             return getNextBgzfBlock(block, buffer, skip_start, skip_end);
327         }
328 
329         return true;
330     }
331 
332     /// Always zero (unknown)
333     size_t totalCompressedSize() const {
334         return 0;
335     }
336 }
337 
338 ///
339 class BgzfInputStream : Stream {
340     private {
341         BgzfBlockSupplier _supplier;
342         ubyte[] _data;
343 
344         BgzfBlockCache _cache;
345 
346         ubyte[] _read_buffer;
347         VirtualOffset _current_vo;
348         VirtualOffset _end_vo;
349 
350         size_t _compressed_size;
351 
352         // for estimating compression ratio
353         size_t _compressed_read, _uncompressed_read;
354 
355         TaskPool _pool;
356         enum _max_block_size = BGZF_MAX_BLOCK_SIZE * 2;
357 
358         alias Task!(decompressBgzfBlock, BgzfBlock, BgzfBlockCache)
359             DecompressionTask;
360         DecompressionTask[] _task_buf;
361 
362         static struct BlockAux {
363             BgzfBlock block;
364             ushort skip_start;
365             ushort skip_end;
366 
367             DecompressionTask* task;
368             alias task this;
369         }
370 
371         RoundBuf!BlockAux _tasks = void;
372 
373         size_t _offset;
374 
375         bool fillNextBlock() {
376             ubyte* p = _data.ptr + _offset;
377             BlockAux b = void;
378             if (_supplier.getNextBgzfBlock(&b.block, p,
379                                            &b.skip_start, &b.skip_end))
380             {
381                 if (b.block.input_size == 0) // BGZF EOF block
382                     return false;
383 
384                 _compressed_read += b.block.end_offset - b.block.start_offset;
385                 _uncompressed_read += b.block.input_size;
386                 version(extraVerbose) {
387                     import std.stdio;
388                     stderr.writeln("[creating task] ", b.block.start_offset, " / ", b.skip_start, " / ", b.skip_end);
389                 }
390 
391                 DecompressionTask tmp = void;
392                 tmp = scopedTask!decompressBgzfBlock(b.block, _cache);
393                 auto t = _task_buf.ptr + _offset / _max_block_size;
394                 import core.stdc.string : memcpy;
395                 memcpy(t, &tmp, DecompressionTask.sizeof);
396                 b.task = t;
397                 _tasks.put(b);
398                 _pool.put(b.task);
399 
400                 _offset += _max_block_size;
401                 if (_offset == _data.length)
402                     _offset = 0;
403                 return true;
404             }
405             return false;
406         }
407 
408         void setupReadBuffer() {
409             auto b = _tasks.front;
410             auto decompressed_block = b.task.yieldForce();
411             auto from = b.skip_start;
412             auto to = b.block.input_size - b.skip_end;
413             _read_buffer = b.block._buffer.ptr[from .. to];
414 
415             if (from == to) {
416                 assert(from == 0);
417                 setEOF();
418             }
419 
420             _current_vo = VirtualOffset(b.block.start_offset, from);
421             version(extraVerbose) {
422                 import std.stdio; stderr.writeln("[setup read buffer] ", _current_vo);
423             }
424             if (b.skip_end > 0)
425                 _end_vo = VirtualOffset(b.block.start_offset, cast(ushort)to);
426             else
427                 _end_vo = VirtualOffset(b.block.end_offset, 0);
428             _tasks.popFront();
429         }
430 
431         void setEOF() {
432             _current_vo = _end_vo;
433             readEOF = true;
434         }
435     }
436 
437     this(BgzfBlockSupplier supplier,
438          TaskPool pool=taskPool,
439          BgzfBlockCache cache=null,
440          size_t buffer_size=0)
441     {
442         _supplier = supplier;
443         _compressed_size = _supplier.totalCompressedSize();
444         _pool = pool;
445         _cache = cache;
446 
447         size_t n_tasks = max(pool.size, 1) * 2;
448         if (buffer_size > 0)
449             n_tasks = max(n_tasks, buffer_size / BGZF_MAX_BLOCK_SIZE);
450 
451         _tasks = RoundBuf!BlockAux(n_tasks);
452         _task_buf = uninitializedArray!(DecompressionTask[])(n_tasks);
453 
454         _data = uninitializedArray!(ubyte[])(n_tasks * _max_block_size);
455 
456         for (size_t i = 0; i < n_tasks; ++i)
457             if (!fillNextBlock())
458                 break;
459 
460         if (!_tasks.empty) {
461             setupReadBuffer();
462         }
463     }
464 
465     VirtualOffset virtualTell() const {
466         return _current_vo;
467     }
468 
469     override ulong seek(long offset, SeekPos whence) {
470         throw new SeekException("Stream is not seekable");
471     }
472 
473     override size_t writeBlock(const void* buf, size_t size) {
474         throw new WriteException("Stream is not writeable");
475     }
476 
477     override size_t readBlock(void* buf, size_t size) {
478         version(extraVerbose) {
479             import std.stdio;
480             // stderr.writeln("[uncompressed] reading ", size, " bytes to address ", buf);
481         }
482         if (_read_buffer.length == 0) {
483             assert(_tasks.empty);
484             setEOF();
485             return 0;
486         }
487 
488         auto buffer = cast(ubyte*)buf;
489 
490         auto len = min(size, _read_buffer.length);
491         buffer[0 .. len] = _read_buffer[0 .. len];
492         version(extraVerbose) {
493             // stderr.writeln("[uncompressed] [read] range: ", _read_buffer.ptr, " - ", _read_buffer.ptr + len);
494         }
495         _read_buffer = _read_buffer[len .. $];
496         _current_vo = VirtualOffset(cast(ulong)_current_vo + len);
497 
498         if (_read_buffer.length == 0) {
499             _current_vo = _end_vo;
500             if (!_tasks.empty) {
501                 setupReadBuffer();
502                 if (!readEOF)
503                     fillNextBlock();
504             }
505             else
506                 setEOF();
507         }
508 
509         return len;
510     }
511 
512     size_t total_compressed_size() @property const {
513         return _compressed_size;
514     }
515 
516     float average_compression_ratio() @property const {
517         if (_compressed_read == 0)
518             return 0.0;
519         return cast(float)_uncompressed_read / _compressed_read;
520     }
521 }