1 /* 2 This file is part of BioD. 3 Copyright (C) 2013-2016 Artem Tarasov <lomereiter@gmail.com> 4 5 Permission is hereby granted, free of charge, to any person obtaining a 6 copy of this software and associated documentation files (the "Software"), 7 to deal in the Software without restriction, including without limitation 8 the rights to use, copy, modify, merge, publish, distribute, sublicense, 9 and/or sell copies of the Software, and to permit persons to whom the 10 Software is furnished to do so, subject to the following conditions: 11 12 The above copyright notice and this permission notice shall be included in 13 all copies or substantial portions of the Software. 14 15 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 16 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 17 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 18 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 19 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 20 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER 21 DEALINGS IN THE SOFTWARE. 22 23 */ 24 module bio.core.bgzf.inputstream; 25 26 import bio.core.bgzf.block; 27 import bio.core.bgzf.virtualoffset; 28 import bio.core.bgzf.constants; 29 import bio.core.bgzf.chunk; 30 import bio.bam.constants; 31 import bio.core.utils.roundbuf; 32 33 import undead.stream; 34 import std.exception; 35 import std.conv; 36 import std.parallelism; 37 import std.array; 38 import std.algorithm : min, max; 39 40 /// Exception type, thrown in case of encountering corrupt BGZF blocks 41 class BgzfException : Exception { 42 this(string msg) { super(msg); } 43 } 44 45 bool fillBgzfBufferFromStream(Stream stream, bool is_seekable, 46 BgzfBlock* block, ubyte* buffer, 47 size_t *number_of_bytes_read=null) 48 { 49 if (stream.eof()) 50 return false; 51 52 ulong start_offset; 53 void throwBgzfException(string msg) { 54 throw new BgzfException("Error reading BGZF block starting from offset " ~ 55 to!string(start_offset) ~ ": " ~ msg); 56 } 57 58 if (is_seekable) 59 start_offset = stream.position; 60 61 try { 62 ubyte[4] bgzf_magic = void; 63 64 size_t bytes_read; 65 while (bytes_read < 4) { 66 auto buf = bgzf_magic.ptr + bytes_read; 67 auto read_ = stream.read(buf[0 .. 4 - bytes_read]); 68 if (read_ == 0) 69 return false; 70 bytes_read += read_; 71 } 72 73 if (bgzf_magic != BGZF_MAGIC) { 74 throwBgzfException("wrong BGZF magic"); 75 } 76 77 ushort gzip_extra_length = void; 78 79 if (is_seekable) { 80 stream.seekCur(uint.sizeof + 2 * ubyte.sizeof); 81 } else { 82 uint gzip_mod_time = void; 83 ubyte gzip_extra_flags = void; 84 ubyte gzip_os = void; 85 stream.read(gzip_mod_time); 86 stream.read(gzip_extra_flags); 87 stream.read(gzip_os); 88 } 89 90 stream.read(gzip_extra_length); 91 92 ushort bsize = void; // total Block SIZE minus 1 93 bool found_block_size = false; 94 95 // read extra subfields 96 size_t len = 0; 97 while (len < gzip_extra_length) { 98 ubyte si1 = void; // Subfield Identifier1 99 ubyte si2 = void; // Subfield Identifier2 100 ushort slen = void; // Subfield LENgth 101 102 stream.read(si1); 103 stream.read(si2); 104 stream.read(slen); 105 106 if (si1 == BAM_SI1 && si2 == BAM_SI2) { 107 // found 'BC' as subfield identifier 108 109 if (slen != 2) { 110 throwBgzfException("wrong BC subfield length: " ~ 111 to!string(slen) ~ "; expected 2"); 112 } 113 114 if (found_block_size) { 115 throwBgzfException("duplicate field with block size"); 116 } 117 118 // read block size 119 stream.read(bsize); 120 found_block_size = true; 121 122 // skip the rest 123 if (is_seekable) { 124 stream.seekCur(slen - bsize.sizeof); 125 } else { 126 stream.readString(slen - bsize.sizeof); 127 } 128 } else { 129 // this subfield has nothing to do with block size, just skip 130 if (is_seekable) { 131 stream.seekCur(slen); 132 } else { 133 stream.readString(slen); 134 } 135 } 136 137 auto nbytes = si1.sizeof + si2.sizeof + slen.sizeof + slen; 138 if (number_of_bytes_read !is null) 139 *number_of_bytes_read += nbytes; 140 len += nbytes; 141 } 142 143 if (len != gzip_extra_length) { 144 throwBgzfException("total length of subfields in bytes (" ~ 145 to!string(len) ~ 146 ") is not equal to gzip_extra_length (" ~ 147 to!string(gzip_extra_length) ~ ")"); 148 } 149 150 if (!found_block_size) { 151 throwBgzfException("block size was not found in any subfield"); 152 } 153 154 // read compressed data 155 auto cdata_size = bsize - gzip_extra_length - 19; 156 if (cdata_size > BGZF_MAX_BLOCK_SIZE) { 157 throwBgzfException("compressed data size is more than " ~ 158 to!string(BGZF_MAX_BLOCK_SIZE) ~ 159 " bytes, which is not allowed by " ~ 160 "current BAM specification"); 161 } 162 163 block.bsize = bsize; 164 block.cdata_size = cast(ushort)cdata_size; 165 166 version(extraVerbose) { 167 import std.stdio; 168 // stderr.writeln("[compressed] reading ", cdata_size, " bytes starting from ", start_offset); 169 } 170 stream.readExact(buffer, cdata_size); 171 version(extraVerbose) { 172 stderr.writeln("[ compressed] [write] range: ", buffer, " - ", buffer + cdata_size); 173 } 174 // version(extraVerbose) {stderr.writeln("[compressed] reading block crc32 and input size...");} 175 stream.read(block.crc32); 176 stream.read(block.input_size); 177 178 if (number_of_bytes_read !is null) 179 *number_of_bytes_read += 12 + cdata_size + block.crc32.sizeof + block.input_size.sizeof; 180 181 // version(extraVerbose) {stderr.writeln("[compressed] read block input size: ", block.input_size);} 182 block._buffer = buffer[0 .. max(block.input_size, cdata_size)]; 183 block.start_offset = start_offset; 184 block.dirty = false; 185 } catch (ReadException e) { 186 throwBgzfException("stream error: " ~ e.msg); 187 } 188 189 return true; 190 } 191 192 /// 193 interface BgzfBlockSupplier { 194 /// Fills $(D buffer) with compressed data and points $(D block) to it. 195 /// Return value is false if there is no next block. 196 /// 197 /// The implementation may assume that there's enough space in the buffer. 198 bool getNextBgzfBlock(BgzfBlock* block, ubyte* buffer, 199 ushort* skip_start, ushort* skip_end); 200 201 /// Total compressed size of the supplied blocks in bytes. 202 /// If unknown, should return 0. 203 size_t totalCompressedSize() const; 204 } 205 206 /// 207 class StreamSupplier : BgzfBlockSupplier { 208 private { 209 Stream _stream; 210 bool _seekable; 211 size_t _start_offset; 212 size_t _size; 213 ushort _skip_start; 214 } 215 216 /// 217 this(Stream stream, ushort skip_start=0) { 218 _stream = stream; 219 _seekable = _stream.seekable; 220 _skip_start = skip_start; 221 if (_seekable) 222 _size = cast(size_t)(_stream.size); 223 } 224 225 /// 226 bool getNextBgzfBlock(BgzfBlock* block, ubyte* buffer, 227 ushort* skip_start, ushort* skip_end) { 228 auto curr_start_offset = _start_offset; 229 230 // updates _start_offset 231 auto result = fillBgzfBufferFromStream(_stream, _seekable, block, buffer, 232 &_start_offset); 233 if (!_seekable) 234 block.start_offset = curr_start_offset; 235 236 *skip_start = _skip_start; 237 _skip_start = 0; 238 *skip_end = 0; 239 return result; 240 } 241 242 /// Stream size if available 243 size_t totalCompressedSize() const { 244 return _size; 245 } 246 } 247 248 class StreamChunksSupplier : BgzfBlockSupplier { 249 private { 250 Stream _stream; 251 Chunk[] _chunks; 252 253 void moveToNextChunk() { 254 if (_chunks.length == 0) 255 return; 256 size_t i = 1; 257 auto beg = _chunks[0].beg; 258 for ( ; i < _chunks.length; ++i) 259 if (_chunks[i].beg.coffset > _chunks[0].beg.coffset) 260 break; 261 _chunks = _chunks[i - 1 .. $]; 262 _chunks[0].beg = beg; 263 _stream.seekSet(cast(size_t)_chunks[0].beg.coffset); 264 version(extraVerbose) { 265 import std.stdio; stderr.writeln("started processing chunk ", beg, " - ", _chunks[0].end); 266 } 267 } 268 } 269 270 this(Stream stream, bio.core.bgzf.chunk.Chunk[] chunks) { 271 _stream = stream; 272 assert(_stream.seekable); 273 _chunks = chunks; 274 moveToNextChunk(); 275 } 276 277 /// 278 bool getNextBgzfBlock(BgzfBlock* block, ubyte* buffer, 279 ushort* skip_start, ushort* skip_end) 280 { 281 if (_chunks.length == 0) 282 return false; 283 284 // Usually there can't be two or more chunks overlapping a 285 // single block -- in such cases they are merged during 286 // indexing in most implementations. 287 // If this is not the case, the algorithm should still work, 288 // but it might decompress the same block several times. 289 // 290 // On each call of this method, one of these things happen: 291 // 1) We remain in the current chunk, but read next block 292 // 2) We finish dealing with the current chunk, so we move to 293 // the next one. If this was the last one, false is returned. 294 // 295 // moveToNextChunk moves stream pointer to chunk.beg.coffset, 296 // in which case skip_start should be set to chunk.beg.uoffset 297 298 auto result = fillBgzfBufferFromStream(_stream, true, block, buffer); 299 auto offset = block.start_offset; 300 301 if (!result) 302 return false; 303 304 if (offset == _chunks[0].beg.coffset) 305 *skip_start = _chunks[0].beg.uoffset; // first block in a chunk 306 else 307 *skip_start = 0; 308 309 long _skip_end; // may be equal to 65536! 310 if (offset == _chunks[0].end.coffset) // last block in a chunk 311 _skip_end = block.input_size - _chunks[0].end.uoffset; 312 else 313 _skip_end = 0; 314 315 *skip_end = cast(ushort)_skip_end; 316 317 if (offset >= _chunks[0].end.coffset) { 318 _chunks = _chunks[1 .. $]; 319 moveToNextChunk(); 320 } 321 322 // special case: it's not actually the last block in a chunk, 323 // but rather that chunk ended on the edge of two blocks 324 if (block.input_size > 0 && _skip_end == block.input_size) { 325 version(extraVerbose) { import std.stdio; stderr.writeln("skip_end == input size"); } 326 return getNextBgzfBlock(block, buffer, skip_start, skip_end); 327 } 328 329 return true; 330 } 331 332 /// Always zero (unknown) 333 size_t totalCompressedSize() const { 334 return 0; 335 } 336 } 337 338 /// 339 class BgzfInputStream : Stream { 340 private { 341 BgzfBlockSupplier _supplier; 342 ubyte[] _data; 343 344 BgzfBlockCache _cache; 345 346 ubyte[] _read_buffer; 347 VirtualOffset _current_vo; 348 VirtualOffset _end_vo; 349 350 size_t _compressed_size; 351 352 // for estimating compression ratio 353 size_t _compressed_read, _uncompressed_read; 354 355 TaskPool _pool; 356 enum _max_block_size = BGZF_MAX_BLOCK_SIZE * 2; 357 358 alias Task!(decompressBgzfBlock, BgzfBlock, BgzfBlockCache) 359 DecompressionTask; 360 DecompressionTask[] _task_buf; 361 362 static struct BlockAux { 363 BgzfBlock block; 364 ushort skip_start; 365 ushort skip_end; 366 367 DecompressionTask* task; 368 alias task this; 369 } 370 371 RoundBuf!BlockAux _tasks = void; 372 373 size_t _offset; 374 375 bool fillNextBlock() { 376 ubyte* p = _data.ptr + _offset; 377 BlockAux b = void; 378 if (_supplier.getNextBgzfBlock(&b.block, p, 379 &b.skip_start, &b.skip_end)) 380 { 381 if (b.block.input_size == 0) // BGZF EOF block 382 return false; 383 384 _compressed_read += b.block.end_offset - b.block.start_offset; 385 _uncompressed_read += b.block.input_size; 386 version(extraVerbose) { 387 import std.stdio; 388 stderr.writeln("[creating task] ", b.block.start_offset, " / ", b.skip_start, " / ", b.skip_end); 389 } 390 391 DecompressionTask tmp = void; 392 tmp = scopedTask!decompressBgzfBlock(b.block, _cache); 393 auto t = _task_buf.ptr + _offset / _max_block_size; 394 import core.stdc.string : memcpy; 395 memcpy(t, &tmp, DecompressionTask.sizeof); 396 b.task = t; 397 _tasks.put(b); 398 _pool.put(b.task); 399 400 _offset += _max_block_size; 401 if (_offset == _data.length) 402 _offset = 0; 403 return true; 404 } 405 return false; 406 } 407 408 void setupReadBuffer() { 409 auto b = _tasks.front; 410 auto decompressed_block = b.task.yieldForce(); 411 auto from = b.skip_start; 412 auto to = b.block.input_size - b.skip_end; 413 _read_buffer = b.block._buffer.ptr[from .. to]; 414 415 if (from == to) { 416 assert(from == 0); 417 setEOF(); 418 } 419 420 _current_vo = VirtualOffset(b.block.start_offset, from); 421 version(extraVerbose) { 422 import std.stdio; stderr.writeln("[setup read buffer] ", _current_vo); 423 } 424 if (b.skip_end > 0) 425 _end_vo = VirtualOffset(b.block.start_offset, cast(ushort)to); 426 else 427 _end_vo = VirtualOffset(b.block.end_offset, 0); 428 _tasks.popFront(); 429 } 430 431 void setEOF() { 432 _current_vo = _end_vo; 433 readEOF = true; 434 } 435 } 436 437 this(BgzfBlockSupplier supplier, 438 TaskPool pool=taskPool, 439 BgzfBlockCache cache=null, 440 size_t buffer_size=0) 441 { 442 _supplier = supplier; 443 _compressed_size = _supplier.totalCompressedSize(); 444 _pool = pool; 445 _cache = cache; 446 447 size_t n_tasks = max(pool.size, 1) * 2; 448 if (buffer_size > 0) 449 n_tasks = max(n_tasks, buffer_size / BGZF_MAX_BLOCK_SIZE); 450 451 _tasks = RoundBuf!BlockAux(n_tasks); 452 _task_buf = uninitializedArray!(DecompressionTask[])(n_tasks); 453 454 _data = uninitializedArray!(ubyte[])(n_tasks * _max_block_size); 455 456 for (size_t i = 0; i < n_tasks; ++i) 457 if (!fillNextBlock()) 458 break; 459 460 if (!_tasks.empty) { 461 setupReadBuffer(); 462 } 463 } 464 465 VirtualOffset virtualTell() const { 466 return _current_vo; 467 } 468 469 override ulong seek(long offset, SeekPos whence) { 470 throw new SeekException("Stream is not seekable"); 471 } 472 473 override size_t writeBlock(const void* buf, size_t size) { 474 throw new WriteException("Stream is not writeable"); 475 } 476 477 override size_t readBlock(void* buf, size_t size) { 478 version(extraVerbose) { 479 import std.stdio; 480 // stderr.writeln("[uncompressed] reading ", size, " bytes to address ", buf); 481 } 482 if (_read_buffer.length == 0) { 483 assert(_tasks.empty); 484 setEOF(); 485 return 0; 486 } 487 488 auto buffer = cast(ubyte*)buf; 489 490 auto len = min(size, _read_buffer.length); 491 buffer[0 .. len] = _read_buffer[0 .. len]; 492 version(extraVerbose) { 493 // stderr.writeln("[uncompressed] [read] range: ", _read_buffer.ptr, " - ", _read_buffer.ptr + len); 494 } 495 _read_buffer = _read_buffer[len .. $]; 496 _current_vo = VirtualOffset(cast(ulong)_current_vo + len); 497 498 if (_read_buffer.length == 0) { 499 _current_vo = _end_vo; 500 if (!_tasks.empty) { 501 setupReadBuffer(); 502 if (!readEOF) 503 fillNextBlock(); 504 } 505 else 506 setEOF(); 507 } 508 509 return len; 510 } 511 512 size_t total_compressed_size() @property const { 513 return _compressed_size; 514 } 515 516 float average_compression_ratio() @property const { 517 if (_compressed_read == 0) 518 return 0.0; 519 return cast(float)_uncompressed_read / _compressed_read; 520 } 521 }