1 /*
2     This file is part of BioD.
3     Copyright (C) 2013-2016    Artem Tarasov <lomereiter@gmail.com>
4 
5     Permission is hereby granted, free of charge, to any person obtaining a
6     copy of this software and associated documentation files (the "Software"),
7     to deal in the Software without restriction, including without limitation
8     the rights to use, copy, modify, merge, publish, distribute, sublicense,
9     and/or sell copies of the Software, and to permit persons to whom the
10     Software is furnished to do so, subject to the following conditions:
11 
12     The above copyright notice and this permission notice shall be included in
13     all copies or substantial portions of the Software.
14 
15     THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16     IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17     FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18     AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19     LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20     FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21     DEALINGS IN THE SOFTWARE.
22 
23 */
24 module bio.core.bgzf.inputstream;
25 
26 import bio.core.bgzf.block;
27 import bio.core.bgzf.virtualoffset;
28 import bio.core.bgzf.constants;
29 import bio.core.bgzf.chunk;
30 import bio.std.hts.bam.constants;
31 import bio.core.utils.roundbuf;
32 
33 import contrib.undead.stream;
34 import std.exception;
35 import std.conv;
36 import std.parallelism;
37 import std.array;
38 import std.algorithm : min, max;
39 
40 /// Exception type, thrown in case of encountering corrupt BGZF blocks
41 class BgzfException : Exception {
42     this(string msg) { super(msg); }
43 }
44 
45 /*
46 
47 Called by
48 
49 randomaccessmanager.d: fillBgzfBufferFromStream(stream, true, &block, buf.ptr);
50 inputstream.d: auto result = fillBgzfBufferFromStream(_stream, _seekable, block, buffer,
51 
52 */
53 
54 bool fillBgzfBufferFromStream(Stream stream, bool is_seekable,
55                               BgzfBlock* block, ubyte* buffer,
56                               size_t *number_of_bytes_read=null)
57 {
58     if (stream.eof())
59         return false;
60 
61     ulong start_offset;
62     void throwBgzfException(string msg) {
63         throw new BgzfException("Error reading BGZF block starting from offset " ~
64                                 to!string(start_offset) ~ ": " ~ msg);
65     }
66 
67     if (is_seekable)
68         start_offset = stream.position;
69 
70     try {
71         ubyte[4] bgzf_magic = void;
72 
73         size_t bytes_read;
74         while (bytes_read < 4) {
75             auto buf = bgzf_magic.ptr + bytes_read;
76             auto read_ = stream.read(buf[0 .. 4 - bytes_read]);
77             if (read_ == 0)
78                 return false;
79             bytes_read += read_;
80         }
81 
82         if (bgzf_magic != BGZF_MAGIC) {
83             throwBgzfException("wrong BGZF magic");
84         }
85 
86         ushort gzip_extra_length = void;
87 
88         if (is_seekable) {
89             stream.seekCur(uint.sizeof + 2 * ubyte.sizeof);
90         } else {
91             uint gzip_mod_time = void;
92             ubyte gzip_extra_flags = void;
93             ubyte gzip_os = void;
94             stream.read(gzip_mod_time);
95             stream.read(gzip_extra_flags);
96             stream.read(gzip_os);
97         }
98 
99         stream.read(gzip_extra_length);
100 
101         ushort bsize = void; // total Block SIZE minus 1
102         bool found_block_size = false;
103 
104         // read extra subfields
105         size_t len = 0;
106         while (len < gzip_extra_length) {
107             ubyte si1 = void;    // Subfield Identifier1
108             ubyte si2 = void;    // Subfield Identifier2
109             ushort slen = void;  // Subfield LENgth
110 
111             stream.read(si1);
112             stream.read(si2);
113             stream.read(slen);
114 
115             if (si1 == BAM_SI1 && si2 == BAM_SI2) {
116                 // found 'BC' as subfield identifier
117 
118                 if (slen != 2) {
119                     throwBgzfException("wrong BC subfield length: " ~
120                                        to!string(slen) ~ "; expected 2");
121                 }
122 
123                 if (found_block_size) {
124                     throwBgzfException("duplicate field with block size");
125                 }
126 
127                 // read block size
128                 stream.read(bsize);
129                 found_block_size = true;
130 
131                 // skip the rest
132                 if (is_seekable) {
133                     stream.seekCur(slen - bsize.sizeof);
134                 } else {
135                     stream.readString(slen - bsize.sizeof);
136                 }
137             } else {
138                 // this subfield has nothing to do with block size, just skip
139                 if (is_seekable) {
140                     stream.seekCur(slen);
141                 } else {
142                     stream.readString(slen);
143                 }
144             }
145 
146             auto nbytes = si1.sizeof + si2.sizeof + slen.sizeof + slen;
147             if (number_of_bytes_read !is null)
148                 *number_of_bytes_read += nbytes;
149             len += nbytes;
150         }
151 
152         if (len != gzip_extra_length) {
153             throwBgzfException("total length of subfields in bytes (" ~
154                                to!string(len) ~
155                                ") is not equal to gzip_extra_length (" ~
156                                to!string(gzip_extra_length) ~ ")");
157         }
158 
159         if (!found_block_size) {
160             throwBgzfException("block size was not found in any subfield");
161         }
162 
163         // read compressed data
164         auto cdata_size = bsize - gzip_extra_length - 19;
165         if (cdata_size > BGZF_MAX_BLOCK_SIZE) {
166             throwBgzfException("compressed data size is more than " ~
167                                to!string(BGZF_MAX_BLOCK_SIZE) ~
168                                " bytes, which is not allowed by " ~
169                                "current BAM specification");
170         }
171 
172         block.bsize = bsize;
173         block.cdata_size = cast(ushort)cdata_size;
174 
175         version(extraVerbose) {
176             import std.stdio;
177             // stderr.writeln("[compressed] reading ", cdata_size, " bytes starting from ", start_offset);
178         }
179         stream.readExact(buffer, cdata_size);
180         version(extraVerbose) {
181             stderr.writeln("[  compressed] [write] range: ", buffer, " - ", buffer + cdata_size);
182         }
183         // version(extraVerbose) {stderr.writeln("[compressed] reading block crc32 and input size...");}
184         stream.read(block.crc32);
185         stream.read(block.input_size);
186 
187         if (number_of_bytes_read !is null)
188             *number_of_bytes_read += 12 + cdata_size + block.crc32.sizeof + block.input_size.sizeof;
189 
190         // version(extraVerbose) {stderr.writeln("[compressed] read block input size: ", block.input_size);}
191         block._buffer = buffer[0 .. max(block.input_size, cdata_size)];
192         block.start_offset = start_offset;
193         block.dirty = false;
194     } catch (ReadException e) {
195         throwBgzfException("stream error: " ~ e.msg);
196     }
197 
198     return true;
199 }
200 
201 ///
202 interface BgzfBlockSupplier {
203     /// Fills $(D buffer) with compressed data and points $(D block) to it.
204     /// Return value is false if there is no next block.
205     ///
206     /// The implementation may assume that there's enough space in the buffer.
207     bool getNextBgzfBlock(BgzfBlock* block, ubyte* buffer,
208                           ushort* skip_start, ushort* skip_end);
209 
210     /// Total compressed size of the supplied blocks in bytes.
211     /// If unknown, should return 0.
212     size_t totalCompressedSize() const;
213 }
214 
215 ///
216 class StreamSupplier : BgzfBlockSupplier {
217     private {
218         Stream _stream;
219         bool _seekable;
220         size_t _start_offset;
221         size_t _size;
222         ushort _skip_start;
223     }
224 
225     ///
226     this(Stream stream, ushort skip_start=0) {
227         _stream = stream;
228         _seekable = _stream.seekable;
229         _skip_start = skip_start;
230         if (_seekable)
231             _size = cast(size_t)(_stream.size);
232     }
233 
234     ///
235     bool getNextBgzfBlock(BgzfBlock* block, ubyte* buffer,
236                           ushort* skip_start, ushort* skip_end) {
237         auto curr_start_offset = _start_offset;
238 
239         // updates _start_offset
240         auto result = fillBgzfBufferFromStream(_stream, _seekable, block, buffer,
241                                                &_start_offset);
242         if (!_seekable)
243             block.start_offset = curr_start_offset;
244 
245         *skip_start = _skip_start;
246         _skip_start = 0;
247         *skip_end = 0;
248         return result;
249     }
250 
251     /// Stream size if available
252     size_t totalCompressedSize() const {
253         return _size;
254     }
255 }
256 
257 class StreamChunksSupplier : BgzfBlockSupplier {
258     private {
259         Stream _stream;
260         Chunk[] _chunks;
261 
262         void moveToNextChunk() {
263             if (_chunks.length == 0)
264                 return;
265             size_t i = 1;
266             auto beg = _chunks[0].beg;
267             for ( ; i < _chunks.length; ++i)
268                 if (_chunks[i].beg.coffset > _chunks[0].beg.coffset)
269                     break;
270             _chunks = _chunks[i - 1 .. $];
271             _chunks[0].beg = beg;
272             _stream.seekSet(cast(size_t)_chunks[0].beg.coffset);
273             version(extraVerbose) {
274                 import std.stdio; stderr.writeln("started processing chunk ", beg, " - ", _chunks[0].end);
275             }
276         }
277     }
278 
279     this(Stream stream, bio.core.bgzf.chunk.Chunk[] chunks) {
280         _stream = stream;
281         assert(_stream.seekable);
282         _chunks = chunks;
283         moveToNextChunk();
284     }
285 
286     ///
287     bool getNextBgzfBlock(BgzfBlock* block, ubyte* buffer,
288                           ushort* skip_start, ushort* skip_end)
289     {
290         if (_chunks.length == 0)
291             return false;
292 
293         // Usually there can't be two or more chunks overlapping a
294         // single block -- in such cases they are merged during
295         // indexing in most implementations.
296         // If this is not the case, the algorithm should still work,
297         // but it might decompress the same block several times.
298         //
299         // On each call of this method, one of these things happen:
300         // 1) We remain in the current chunk, but read next block
301         // 2) We finish dealing with the current chunk, so we move to
302         //    the next one. If this was the last one, false is returned.
303         //
304         // moveToNextChunk moves stream pointer to chunk.beg.coffset,
305         // in which case skip_start should be set to chunk.beg.uoffset
306 
307         auto result = fillBgzfBufferFromStream(_stream, true, block, buffer);
308         auto offset = block.start_offset;
309 
310         if (!result)
311             return false;
312 
313         if (offset == _chunks[0].beg.coffset)
314             *skip_start = _chunks[0].beg.uoffset; // first block in a chunk
315         else
316             *skip_start = 0;
317 
318         long _skip_end; // may be equal to 65536!
319         if (offset == _chunks[0].end.coffset) // last block in a chunk
320             _skip_end = block.input_size - _chunks[0].end.uoffset;
321         else
322             _skip_end = 0;
323 
324         *skip_end = cast(ushort)_skip_end;
325 
326         if (offset >= _chunks[0].end.coffset) {
327             _chunks = _chunks[1 .. $];
328             moveToNextChunk();
329         }
330 
331         // special case: it's not actually the last block in a chunk,
332         // but rather that chunk ended on the edge of two blocks
333         if (block.input_size > 0 && _skip_end == block.input_size) {
334             version(extraVerbose) { import std.stdio; stderr.writeln("skip_end == input size"); }
335             return getNextBgzfBlock(block, buffer, skip_start, skip_end);
336         }
337 
338         return true;
339     }
340 
341     /// Always zero (unknown)
342     size_t totalCompressedSize() const {
343         return 0;
344     }
345 }
346 
347 ///
348 // Provided an uncompressed stream block by class
349 class BgzfInputStream : Stream {
350     private {
351         BgzfBlockSupplier _supplier;
352         ubyte[] _data;
353 
354         // BgzfBlockCache _cache;
355 
356         ubyte[] _read_buffer;
357         VirtualOffset _current_vo;
358         VirtualOffset _end_vo;
359 
360         size_t _compressed_size;
361 
362         // for estimating compression ratio
363         size_t _compressed_read, _uncompressed_read;
364 
365         TaskPool _pool;
366         enum _max_block_size = BGZF_MAX_BLOCK_SIZE * 2;
367 
368         alias Task!(decompressBgzfBlock, BgzfBlock) DecompressionTask;
369         // DecompressionTask[] _task_buf;
370 
371         // static here means that BlockAux has no access to
372         // its surrounding scope https://dlang.org/spec/struct.html
373         static struct BlockAux {
374           BgzfBlock block;
375           ushort skip_start;
376           ushort skip_end;
377 
378           DecompressionTask* task;
379           // alias task this; // https://dlang.org/spec/class.html#AliasThis
380         }
381 
382         RoundBuf!BlockAux _tasks = void;
383 
384         size_t _offset;
385 
386         bool fillNextBlock() {
387           // Sets up a decompression task and pushes it on the roundbuf _tasks
388             ubyte* p = _data.ptr + _offset;
389             BlockAux b = void;
390             if (_supplier.getNextBgzfBlock(&b.block, p,
391                                            &b.skip_start, &b.skip_end))
392             {
393                 if (b.block.input_size == 0) // BGZF EOF block
394                     return false;
395 
396                 _compressed_read += b.block.end_offset - b.block.start_offset;
397                 _uncompressed_read += b.block.input_size;
398                 version(extraVerbose) {
399                     import std.stdio;
400                     stderr.writeln("[creating task] ", b.block.start_offset, " / ", b.skip_start, " / ", b.skip_end);
401                 }
402 
403                 /*
404                 DecompressionTask tmp = void;
405                 tmp = scopedTask!decompressBgzfBlock(b.block);
406                 auto t = _task_buf.ptr + _offset / _max_block_size;
407                 import core.stdc.string : memcpy;
408                 memcpy(t, &tmp, DecompressionTask.sizeof);
409                 b.task = t;
410                 _tasks.put(b);
411                 _pool.put(b.task);
412                 */
413                 // tmp = scopedTask!decompressBgzfBlock(b.block);
414                 auto task = task!decompressBgzfBlock(b.block);
415                 b.task = task;
416                 _tasks.put(b); // _tasks is roundbuf
417                 _pool.put(b.task); // _pool is thread pool
418                 _offset += _max_block_size;
419                 if (_offset == _data.length)
420                     _offset = 0;
421                 return true;
422             }
423             return false;
424         }
425 
426         void setupReadBuffer() {
427             auto b = _tasks.front;
428             auto decompressed_block = b.task.yieldForce();
429             auto from = b.skip_start;
430             auto to = b.block.input_size - b.skip_end;
431             _read_buffer = b.block._buffer.ptr[from .. to];
432 
433             if (from == to) {
434                 assert(from == 0);
435                 setEOF();
436             }
437 
438             _current_vo = VirtualOffset(b.block.start_offset, from);
439             version(extraVerbose) {
440                 import std.stdio; stderr.writeln("[setup read buffer] ", _current_vo);
441             }
442             if (b.skip_end > 0)
443                 _end_vo = VirtualOffset(b.block.start_offset, cast(ushort)to);
444             else
445                 _end_vo = VirtualOffset(b.block.end_offset, 0);
446             _tasks.popFront();
447         }
448 
449         void setEOF() {
450             _current_vo = _end_vo;
451             readEOF = true;
452         }
453     }
454 
455     this(BgzfBlockSupplier supplier,
456          TaskPool pool=taskPool,
457          // BgzfBlockCache cache=null,
458          size_t buffer_size=0)
459     {
460         _supplier = supplier;
461         _compressed_size = _supplier.totalCompressedSize();
462         _pool = pool;
463         // _cache = cache;
464 
465         // The roundbuf size (n_tasks) should be at least
466         // the number of threads
467         size_t n_tasks = max(pool.size, 1) * 2;
468         if (buffer_size > 0)
469             n_tasks = max(n_tasks, buffer_size / BGZF_MAX_BLOCK_SIZE);
470         // n_tasks is 13 on my machine
471         _tasks = RoundBuf!BlockAux(n_tasks);
472         // _task_buf = uninitializedArray!(DecompressionTask[])(n_tasks);
473 
474         _data = uninitializedArray!(ubyte[])(n_tasks * _max_block_size);
475 
476         for (size_t i = 0; i < n_tasks; ++i)
477             if (!fillNextBlock())
478                 break;
479 
480         if (!_tasks.empty) {
481             setupReadBuffer();
482         }
483     }
484 
485     VirtualOffset virtualTell() const {
486         return _current_vo;
487     }
488 
489     override ulong seek(long offset, SeekPos whence) {
490         throw new SeekException("Stream is not seekable");
491     }
492 
493     override size_t writeBlock(const void* buf, size_t size) {
494         throw new WriteException("Stream is not writeable");
495     }
496 
497     override size_t readBlock(void* buf, size_t size) {
498         version(extraVerbose) {
499             import std.stdio;
500             // stderr.writeln("[uncompressed] reading ", size, " bytes to address ", buf);
501         }
502         if (_read_buffer.length == 0) {
503             assert(_tasks.empty);
504             setEOF();
505             return 0;
506         }
507 
508         auto buffer = cast(ubyte*)buf;
509 
510         auto len = min(size, _read_buffer.length);
511         buffer[0 .. len] = _read_buffer[0 .. len];
512         version(extraVerbose) {
513             // stderr.writeln("[uncompressed] [read] range: ", _read_buffer.ptr, " - ", _read_buffer.ptr + len);
514         }
515         _read_buffer = _read_buffer[len .. $];
516         _current_vo = VirtualOffset(cast(ulong)_current_vo + len);
517 
518         if (_read_buffer.length == 0) {
519             _current_vo = _end_vo;
520             if (!_tasks.empty) {
521                 setupReadBuffer();
522                 if (!readEOF)
523                     fillNextBlock();
524             }
525             else
526                 setEOF();
527         }
528 
529         return len;
530     }
531 
532     size_t total_compressed_size() @property const {
533         return _compressed_size;
534     }
535 
536     float average_compression_ratio() @property const {
537         if (_compressed_read == 0)
538             return 0.0;
539         return cast(float)_uncompressed_read / _compressed_read;
540     }
541 }