1 /* 2 This file is part of BioD. 3 Copyright (C) 2013-2016 Artem Tarasov <lomereiter@gmail.com> 4 5 Permission is hereby granted, free of charge, to any person obtaining a 6 copy of this software and associated documentation files (the "Software"), 7 to deal in the Software without restriction, including without limitation 8 the rights to use, copy, modify, merge, publish, distribute, sublicense, 9 and/or sell copies of the Software, and to permit persons to whom the 10 Software is furnished to do so, subject to the following conditions: 11 12 The above copyright notice and this permission notice shall be included in 13 all copies or substantial portions of the Software. 14 15 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 16 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 17 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 18 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 19 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 20 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER 21 DEALINGS IN THE SOFTWARE. 22 23 */ 24 module bio.core.bgzf.inputstream; 25 26 import bio.core.bgzf.block; 27 import bio.core.bgzf.virtualoffset; 28 import bio.core.bgzf.constants; 29 import bio.core.bgzf.chunk; 30 import bio.std.hts.bam.constants; 31 import bio.core.utils.roundbuf; 32 33 import contrib.undead.stream; 34 import std.exception; 35 import std.conv; 36 import std.parallelism; 37 import std.array; 38 import std.algorithm : min, max; 39 40 /// Exception type, thrown in case of encountering corrupt BGZF blocks 41 class BgzfException : Exception { 42 this(string msg) { super(msg); } 43 } 44 45 /* 46 47 Called by 48 49 randomaccessmanager.d: fillBgzfBufferFromStream(stream, true, &block, buf.ptr); 50 inputstream.d: auto result = fillBgzfBufferFromStream(_stream, _seekable, block, buffer, 51 52 */ 53 54 bool fillBgzfBufferFromStream(Stream stream, bool is_seekable, 55 BgzfBlock* block, ubyte* buffer, 56 size_t *number_of_bytes_read=null) 57 { 58 if (stream.eof()) 59 return false; 60 61 ulong start_offset; 62 void throwBgzfException(string msg) { 63 throw new BgzfException("Error reading BGZF block starting from offset " ~ 64 to!string(start_offset) ~ ": " ~ msg); 65 } 66 67 if (is_seekable) 68 start_offset = stream.position; 69 70 try { 71 ubyte[4] bgzf_magic = void; 72 73 size_t bytes_read; 74 while (bytes_read < 4) { 75 auto buf = bgzf_magic.ptr + bytes_read; 76 auto read_ = stream.read(buf[0 .. 4 - bytes_read]); 77 if (read_ == 0) 78 return false; 79 bytes_read += read_; 80 } 81 82 if (bgzf_magic != BGZF_MAGIC) { 83 throwBgzfException("wrong BGZF magic"); 84 } 85 86 ushort gzip_extra_length = void; 87 88 if (is_seekable) { 89 stream.seekCur(uint.sizeof + 2 * ubyte.sizeof); 90 } else { 91 uint gzip_mod_time = void; 92 ubyte gzip_extra_flags = void; 93 ubyte gzip_os = void; 94 stream.read(gzip_mod_time); 95 stream.read(gzip_extra_flags); 96 stream.read(gzip_os); 97 } 98 99 stream.read(gzip_extra_length); 100 101 ushort bsize = void; // total Block SIZE minus 1 102 bool found_block_size = false; 103 104 // read extra subfields 105 size_t len = 0; 106 while (len < gzip_extra_length) { 107 ubyte si1 = void; // Subfield Identifier1 108 ubyte si2 = void; // Subfield Identifier2 109 ushort slen = void; // Subfield LENgth 110 111 stream.read(si1); 112 stream.read(si2); 113 stream.read(slen); 114 115 if (si1 == BAM_SI1 && si2 == BAM_SI2) { 116 // found 'BC' as subfield identifier 117 118 if (slen != 2) { 119 throwBgzfException("wrong BC subfield length: " ~ 120 to!string(slen) ~ "; expected 2"); 121 } 122 123 if (found_block_size) { 124 throwBgzfException("duplicate field with block size"); 125 } 126 127 // read block size 128 stream.read(bsize); 129 found_block_size = true; 130 131 // skip the rest 132 if (is_seekable) { 133 stream.seekCur(slen - bsize.sizeof); 134 } else { 135 stream.readString(slen - bsize.sizeof); 136 } 137 } else { 138 // this subfield has nothing to do with block size, just skip 139 if (is_seekable) { 140 stream.seekCur(slen); 141 } else { 142 stream.readString(slen); 143 } 144 } 145 146 auto nbytes = si1.sizeof + si2.sizeof + slen.sizeof + slen; 147 if (number_of_bytes_read !is null) 148 *number_of_bytes_read += nbytes; 149 len += nbytes; 150 } 151 152 if (len != gzip_extra_length) { 153 throwBgzfException("total length of subfields in bytes (" ~ 154 to!string(len) ~ 155 ") is not equal to gzip_extra_length (" ~ 156 to!string(gzip_extra_length) ~ ")"); 157 } 158 159 if (!found_block_size) { 160 throwBgzfException("block size was not found in any subfield"); 161 } 162 163 // read compressed data 164 auto cdata_size = bsize - gzip_extra_length - 19; 165 if (cdata_size > BGZF_MAX_BLOCK_SIZE) { 166 throwBgzfException("compressed data size is more than " ~ 167 to!string(BGZF_MAX_BLOCK_SIZE) ~ 168 " bytes, which is not allowed by " ~ 169 "current BAM specification"); 170 } 171 172 block.bsize = bsize; 173 block.cdata_size = cast(ushort)cdata_size; 174 175 version(extraVerbose) { 176 import std.stdio; 177 // stderr.writeln("[compressed] reading ", cdata_size, " bytes starting from ", start_offset); 178 } 179 stream.readExact(buffer, cdata_size); 180 version(extraVerbose) { 181 stderr.writeln("[ compressed] [write] range: ", buffer, " - ", buffer + cdata_size); 182 } 183 // version(extraVerbose) {stderr.writeln("[compressed] reading block crc32 and input size...");} 184 stream.read(block.crc32); 185 stream.read(block.input_size); 186 187 if (number_of_bytes_read !is null) 188 *number_of_bytes_read += 12 + cdata_size + block.crc32.sizeof + block.input_size.sizeof; 189 190 // version(extraVerbose) {stderr.writeln("[compressed] read block input size: ", block.input_size);} 191 block._buffer = buffer[0 .. max(block.input_size, cdata_size)]; 192 block.start_offset = start_offset; 193 block.dirty = false; 194 } catch (ReadException e) { 195 throwBgzfException("stream error: " ~ e.msg); 196 } 197 198 return true; 199 } 200 201 /// 202 interface BgzfBlockSupplier { 203 /// Fills $(D buffer) with compressed data and points $(D block) to it. 204 /// Return value is false if there is no next block. 205 /// 206 /// The implementation may assume that there's enough space in the buffer. 207 bool getNextBgzfBlock(BgzfBlock* block, ubyte* buffer, 208 ushort* skip_start, ushort* skip_end); 209 210 /// Total compressed size of the supplied blocks in bytes. 211 /// If unknown, should return 0. 212 size_t totalCompressedSize() const; 213 } 214 215 /// 216 class StreamSupplier : BgzfBlockSupplier { 217 private { 218 Stream _stream; 219 bool _seekable; 220 size_t _start_offset; 221 size_t _size; 222 ushort _skip_start; 223 } 224 225 /// 226 this(Stream stream, ushort skip_start=0) { 227 _stream = stream; 228 _seekable = _stream.seekable; 229 _skip_start = skip_start; 230 if (_seekable) 231 _size = cast(size_t)(_stream.size); 232 } 233 234 /// 235 bool getNextBgzfBlock(BgzfBlock* block, ubyte* buffer, 236 ushort* skip_start, ushort* skip_end) { 237 auto curr_start_offset = _start_offset; 238 239 // updates _start_offset 240 auto result = fillBgzfBufferFromStream(_stream, _seekable, block, buffer, 241 &_start_offset); 242 if (!_seekable) 243 block.start_offset = curr_start_offset; 244 245 *skip_start = _skip_start; 246 _skip_start = 0; 247 *skip_end = 0; 248 return result; 249 } 250 251 /// Stream size if available 252 size_t totalCompressedSize() const { 253 return _size; 254 } 255 } 256 257 class StreamChunksSupplier : BgzfBlockSupplier { 258 private { 259 Stream _stream; 260 Chunk[] _chunks; 261 262 void moveToNextChunk() { 263 if (_chunks.length == 0) 264 return; 265 size_t i = 1; 266 auto beg = _chunks[0].beg; 267 for ( ; i < _chunks.length; ++i) 268 if (_chunks[i].beg.coffset > _chunks[0].beg.coffset) 269 break; 270 _chunks = _chunks[i - 1 .. $]; 271 _chunks[0].beg = beg; 272 _stream.seekSet(cast(size_t)_chunks[0].beg.coffset); 273 version(extraVerbose) { 274 import std.stdio; stderr.writeln("started processing chunk ", beg, " - ", _chunks[0].end); 275 } 276 } 277 } 278 279 this(Stream stream, bio.core.bgzf.chunk.Chunk[] chunks) { 280 _stream = stream; 281 assert(_stream.seekable); 282 _chunks = chunks; 283 moveToNextChunk(); 284 } 285 286 /// 287 bool getNextBgzfBlock(BgzfBlock* block, ubyte* buffer, 288 ushort* skip_start, ushort* skip_end) 289 { 290 if (_chunks.length == 0) 291 return false; 292 293 // Usually there can't be two or more chunks overlapping a 294 // single block -- in such cases they are merged during 295 // indexing in most implementations. 296 // If this is not the case, the algorithm should still work, 297 // but it might decompress the same block several times. 298 // 299 // On each call of this method, one of these things happen: 300 // 1) We remain in the current chunk, but read next block 301 // 2) We finish dealing with the current chunk, so we move to 302 // the next one. If this was the last one, false is returned. 303 // 304 // moveToNextChunk moves stream pointer to chunk.beg.coffset, 305 // in which case skip_start should be set to chunk.beg.uoffset 306 307 auto result = fillBgzfBufferFromStream(_stream, true, block, buffer); 308 auto offset = block.start_offset; 309 310 if (!result) 311 return false; 312 313 if (offset == _chunks[0].beg.coffset) 314 *skip_start = _chunks[0].beg.uoffset; // first block in a chunk 315 else 316 *skip_start = 0; 317 318 long _skip_end; // may be equal to 65536! 319 if (offset == _chunks[0].end.coffset) // last block in a chunk 320 _skip_end = block.input_size - _chunks[0].end.uoffset; 321 else 322 _skip_end = 0; 323 324 *skip_end = cast(ushort)_skip_end; 325 326 if (offset >= _chunks[0].end.coffset) { 327 _chunks = _chunks[1 .. $]; 328 moveToNextChunk(); 329 } 330 331 // special case: it's not actually the last block in a chunk, 332 // but rather that chunk ended on the edge of two blocks 333 if (block.input_size > 0 && _skip_end == block.input_size) { 334 version(extraVerbose) { import std.stdio; stderr.writeln("skip_end == input size"); } 335 return getNextBgzfBlock(block, buffer, skip_start, skip_end); 336 } 337 338 return true; 339 } 340 341 /// Always zero (unknown) 342 size_t totalCompressedSize() const { 343 return 0; 344 } 345 } 346 347 /// 348 // Provided an uncompressed stream block by class 349 class BgzfInputStream : Stream { 350 private { 351 BgzfBlockSupplier _supplier; 352 ubyte[] _data; 353 354 // BgzfBlockCache _cache; 355 356 ubyte[] _read_buffer; 357 VirtualOffset _current_vo; 358 VirtualOffset _end_vo; 359 360 size_t _compressed_size; 361 362 // for estimating compression ratio 363 size_t _compressed_read, _uncompressed_read; 364 365 TaskPool _pool; 366 enum _max_block_size = BGZF_MAX_BLOCK_SIZE * 2; 367 368 alias Task!(decompressBgzfBlock, BgzfBlock) DecompressionTask; 369 // DecompressionTask[] _task_buf; 370 371 // static here means that BlockAux has no access to 372 // its surrounding scope https://dlang.org/spec/struct.html 373 static struct BlockAux { 374 BgzfBlock block; 375 ushort skip_start; 376 ushort skip_end; 377 378 DecompressionTask* task; 379 // alias task this; // https://dlang.org/spec/class.html#AliasThis 380 } 381 382 RoundBuf!BlockAux _tasks = void; 383 384 size_t _offset; 385 386 bool fillNextBlock() { 387 // Sets up a decompression task and pushes it on the roundbuf _tasks 388 ubyte* p = _data.ptr + _offset; 389 BlockAux b = void; 390 if (_supplier.getNextBgzfBlock(&b.block, p, 391 &b.skip_start, &b.skip_end)) 392 { 393 if (b.block.input_size == 0) // BGZF EOF block 394 return false; 395 396 _compressed_read += b.block.end_offset - b.block.start_offset; 397 _uncompressed_read += b.block.input_size; 398 version(extraVerbose) { 399 import std.stdio; 400 stderr.writeln("[creating task] ", b.block.start_offset, " / ", b.skip_start, " / ", b.skip_end); 401 } 402 403 /* 404 DecompressionTask tmp = void; 405 tmp = scopedTask!decompressBgzfBlock(b.block); 406 auto t = _task_buf.ptr + _offset / _max_block_size; 407 import core.stdc.string : memcpy; 408 memcpy(t, &tmp, DecompressionTask.sizeof); 409 b.task = t; 410 _tasks.put(b); 411 _pool.put(b.task); 412 */ 413 // tmp = scopedTask!decompressBgzfBlock(b.block); 414 auto task = task!decompressBgzfBlock(b.block); 415 b.task = task; 416 _tasks.put(b); // _tasks is roundbuf 417 _pool.put(b.task); // _pool is thread pool 418 _offset += _max_block_size; 419 if (_offset == _data.length) 420 _offset = 0; 421 return true; 422 } 423 return false; 424 } 425 426 void setupReadBuffer() { 427 auto b = _tasks.front; 428 auto decompressed_block = b.task.yieldForce(); 429 auto from = b.skip_start; 430 auto to = b.block.input_size - b.skip_end; 431 _read_buffer = b.block._buffer.ptr[from .. to]; 432 433 if (from == to) { 434 assert(from == 0); 435 setEOF(); 436 } 437 438 _current_vo = VirtualOffset(b.block.start_offset, from); 439 version(extraVerbose) { 440 import std.stdio; stderr.writeln("[setup read buffer] ", _current_vo); 441 } 442 if (b.skip_end > 0) 443 _end_vo = VirtualOffset(b.block.start_offset, cast(ushort)to); 444 else 445 _end_vo = VirtualOffset(b.block.end_offset, 0); 446 _tasks.popFront(); 447 } 448 449 void setEOF() { 450 _current_vo = _end_vo; 451 readEOF = true; 452 } 453 } 454 455 this(BgzfBlockSupplier supplier, 456 TaskPool pool=taskPool, 457 // BgzfBlockCache cache=null, 458 size_t buffer_size=0) 459 { 460 _supplier = supplier; 461 _compressed_size = _supplier.totalCompressedSize(); 462 _pool = pool; 463 // _cache = cache; 464 465 // The roundbuf size (n_tasks) should be at least 466 // the number of threads 467 size_t n_tasks = max(pool.size, 1) * 2; 468 if (buffer_size > 0) 469 n_tasks = max(n_tasks, buffer_size / BGZF_MAX_BLOCK_SIZE); 470 // n_tasks is 13 on my machine 471 _tasks = RoundBuf!BlockAux(n_tasks); 472 // _task_buf = uninitializedArray!(DecompressionTask[])(n_tasks); 473 474 _data = uninitializedArray!(ubyte[])(n_tasks * _max_block_size); 475 476 for (size_t i = 0; i < n_tasks; ++i) 477 if (!fillNextBlock()) 478 break; 479 480 if (!_tasks.empty) { 481 setupReadBuffer(); 482 } 483 } 484 485 VirtualOffset virtualTell() const { 486 return _current_vo; 487 } 488 489 override ulong seek(long offset, SeekPos whence) { 490 throw new SeekException("Stream is not seekable"); 491 } 492 493 override size_t writeBlock(const void* buf, size_t size) { 494 throw new WriteException("Stream is not writeable"); 495 } 496 497 override size_t readBlock(void* buf, size_t size) { 498 version(extraVerbose) { 499 import std.stdio; 500 // stderr.writeln("[uncompressed] reading ", size, " bytes to address ", buf); 501 } 502 if (_read_buffer.length == 0) { 503 assert(_tasks.empty); 504 setEOF(); 505 return 0; 506 } 507 508 auto buffer = cast(ubyte*)buf; 509 510 auto len = min(size, _read_buffer.length); 511 buffer[0 .. len] = _read_buffer[0 .. len]; 512 version(extraVerbose) { 513 // stderr.writeln("[uncompressed] [read] range: ", _read_buffer.ptr, " - ", _read_buffer.ptr + len); 514 } 515 _read_buffer = _read_buffer[len .. $]; 516 _current_vo = VirtualOffset(cast(ulong)_current_vo + len); 517 518 if (_read_buffer.length == 0) { 519 _current_vo = _end_vo; 520 if (!_tasks.empty) { 521 setupReadBuffer(); 522 if (!readEOF) 523 fillNextBlock(); 524 } 525 else 526 setEOF(); 527 } 528 529 return len; 530 } 531 532 size_t total_compressed_size() @property const { 533 return _compressed_size; 534 } 535 536 float average_compression_ratio() @property const { 537 if (_compressed_read == 0) 538 return 0.0; 539 return cast(float)_uncompressed_read / _compressed_read; 540 } 541 }