1 /* 2 This file is part of BioD. 3 Copyright (C) 2012-2013 Artem Tarasov <lomereiter@gmail.com> 4 5 Permission is hereby granted, free of charge, to any person obtaining a 6 copy of this software and associated documentation files (the "Software"), 7 to deal in the Software without restriction, including without limitation 8 the rights to use, copy, modify, merge, publish, distribute, sublicense, 9 and/or sell copies of the Software, and to permit persons to whom the 10 Software is furnished to do so, subject to the following conditions: 11 12 The above copyright notice and this permission notice shall be included in 13 all copies or substantial portions of the Software. 14 15 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 16 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 17 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 18 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 19 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 20 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER 21 DEALINGS IN THE SOFTWARE. 22 23 */ 24 /// Writing a script/tool for processing BAM data often starts this way: 25 /// 26 /// ------------------------ 27 /// import bio.std.hts.bam.reader; 28 /// 29 /// void main(string[] args) { 30 /// auto bam = new BamReader(args[1]); // open BAM file 31 /// foreach (read; bam.reads) { // iterate through its reads 32 /// if (read.is_unmapped) 33 /// continue; // maybe skip unmapped ones 34 /// ... 35 /// } 36 /// } 37 /// ------------------------ 38 /// 39 /// Or, if a specific interval on the reference sequence is to be explored: 40 /// ------------------------ 41 /// import bio.std.hts.bam.pileup; 42 /// ... 43 /// auto reads = bam["chr7"][50_000 .. 60_000]; // BAI index is required 44 /// foreach (column; makePileup(reads)) { ... } // see $(PMODULE pileup) docs 45 /// ------------------------ 46 module bio.std.hts.bam.reader; 47 48 import bio.std.hts.bam.abstractreader; 49 public import bio.std.hts.sam.header; 50 public import bio.std.hts.bam.reference; 51 public import bio.std.hts.bam.region; 52 public import bio.std.hts.bam.read; 53 public import bio.std.hts.bam.tagvalue; 54 public import bio.std.hts.bam.readrange; 55 56 import bio.std.hts.bam.randomaccessmanager; 57 import bio.std.hts.bam.baifile; 58 import bio.std.hts.bam.bai.indexing; 59 60 import bio.core.utils.range; 61 import bio.core.utils.stream; 62 import bio.core.bgzf.inputstream; 63 public import bio.core.bgzf.virtualoffset; 64 65 import std.system; 66 import std.stdio; 67 import std.algorithm; 68 import std.range; 69 import std.conv; 70 import std.exception; 71 import std.parallelism; 72 import std.array; 73 import core.stdc.stdio; 74 import std..string; 75 import std.stdio; 76 77 /** 78 BAM file reader, featuring parallel decompression of BGZF blocks. 79 */ 80 class BamReader : IBamSamReader { 81 82 /** 83 Creates reader associated with file or stream. 84 (If stream constructor is used, no random access is possible.) 85 $(BR) 86 Optionally, task pool can be specified. 87 It will be used to unpack BGZF blocks in parallel. 88 89 Example: 90 ------------------------------------------- 91 import std.parallelism, bio.std.hts.bam.reader; 92 void main() { 93 auto pool = new TaskPool(4); // use 4 threads 94 scope (exit) pool.finish(); // don't forget! 95 auto bam = new BamReader("file.bam", pool); 96 ... 97 } 98 ------------------------------------------- 99 */ 100 this(contrib.undead.stream.Stream stream, 101 TaskPool task_pool = taskPool) { 102 _source_stream = new EndianStream(stream, Endian.littleEndian); 103 _task_pool = task_pool; 104 105 if (stream.seekable) { 106 _stream_is_seekable = true; 107 } 108 109 initializeStreams(); 110 111 auto magic = _bam.readString(4); 112 113 enforce(magic == "BAM\1", "Invalid file format: expected BAM\\1"); 114 115 readSamHeader(); 116 readReferenceSequencesInfo(); 117 118 // right after construction, we are at the beginning 119 // of the list of reads 120 121 if (_stream_is_seekable) { 122 _reads_start_voffset = _decompressed_stream.virtualTell(); 123 } 124 } 125 126 /// ditto 127 this(string filename, std.parallelism.TaskPool task_pool) { 128 129 _filename = filename; 130 _source_stream = getNativeEndianSourceStream(); 131 this(_source_stream, task_pool); 132 } 133 134 135 /// ditto 136 this(string filename) { 137 this(filename, std.parallelism.taskPool); 138 } 139 140 /** 141 True if BAI file was found for this BAM file. 142 This is necessary for any random-access operations. 143 $(BR) 144 Looks for files in the same directory which filename 145 is either the file name of BAM file with '.bai' appended, 146 or with the last extension replaced with '.bai' 147 (that is, for $(I file.bam) paths $(I file.bai) and 148 $(I file.bam.bai) will be checked) 149 */ 150 bool has_index() @property { 151 return _random_access_manager.found_index_file; 152 } 153 154 /** 155 Creates BAI file. If $(I overwrite) is false, it won't touch 156 existing index if it is already found. 157 */ 158 void createIndex(bool overwrite = false) { 159 if (has_index && !overwrite) 160 return; 161 Stream stream = new BufferedFile(filename ~ ".bai", FileMode.OutNew); 162 scope(exit) stream.close(); 163 bio.std.hts.bam.bai.indexing.createIndex(this, stream); 164 _bai_status = BaiStatus.notInitialized; 165 _rndaccssmgr = null; 166 } 167 168 /** Filename, if the object was created via file name constructor, 169 $(D null) otherwise. 170 */ 171 string filename() @property const { 172 return _filename; 173 } 174 175 /// If file ends with EOF block, returns virtual offset of the start of EOF block. 176 /// Otherwise, returns virtual offset of the physical end of file. 177 bio.core.bgzf.virtualoffset.VirtualOffset eofVirtualOffset() { 178 return _random_access_manager.eofVirtualOffset(); 179 } 180 181 /// Get BGZF block at a given file offset. 182 bio.core.bgzf.block.BgzfBlock getBgzfBlockAt(ulong offset) { 183 return _random_access_manager.getBgzfBlockAt(offset); 184 } 185 186 /** 187 Returns: SAM header of the BAM file 188 */ 189 bio.std.hts.sam.header.SamHeader header() @property { 190 if (_header is null) { 191 synchronized { 192 if (_header is null) { 193 _header = new SamHeader(_headertext); 194 _headertext = null; 195 } 196 } 197 } 198 return _header; 199 } 200 201 /** 202 Returns: information about reference sequences 203 */ 204 const(bio.std.hts.bam.referenceinfo.ReferenceSequenceInfo)[] reference_sequences() @property const nothrow { 205 return _reference_sequences; 206 } 207 208 /** 209 Range of all alignment records in the file. 210 $(BR) 211 Element type of the returned range depends on the policy. 212 Default one is $(DPREF2 bam, readrange, withoutOffsets), 213 in this case range element type is $(DPREF2 bam, read, BamRead). 214 $(BR) 215 The other option is $(DPREF2 bam, readrange, withOffsets), 216 which allows to track read virtual offsets in the file. 217 In this case range element type is $(DPREF2 bam, readrange, BamReadBlock). 218 219 Example: 220 ---------------------------------- 221 import bio.std.hts.bam.readrange; 222 ... 223 auto bam = new BamReader("file.bam"); 224 auto reads = bam.reads!withOffsets(); 225 writeln(reads.front.start_virtual_offset); 226 ---------------------------------- 227 */ 228 auto reads(alias IteratePolicy=bio.std.hts.bam.readrange.withoutOffsets)() @property { 229 auto _decompressed_stream = getDecompressedBamReadStream(); 230 return bamReadRange!IteratePolicy(_decompressed_stream, this); 231 } 232 233 static struct ReadsWithProgressResult(alias IteratePolicy, R, S) { 234 this(R range, S stream, 235 void delegate(lazy float p) progressBarFunc, 236 void delegate() finishFunc) 237 { 238 _range = range; 239 _stream = stream; 240 _progress_bar_func = progressBarFunc; 241 _finish_func = finishFunc; 242 } 243 244 static if (__traits(identifier, IteratePolicy) == "withOffsets") { 245 auto front() @property { 246 return _range.front; 247 } 248 } else static if (__traits(identifier, IteratePolicy) == "withoutOffsets") { 249 auto front() @property { 250 return _range.front.read; 251 } 252 } else static assert(0, __traits(identifier, IteratePolicy)); 253 254 bool empty() @property { 255 auto result = _range.empty; 256 if (_finish_func !is null && !_called_finish_func && result) { 257 _called_finish_func = true; 258 _finish_func(); 259 } 260 return result; 261 } 262 263 void popFront() { 264 _bytes_read += _range.front.read.size_in_bytes; 265 _range.popFront(); 266 if (_progress_bar_func !is null) { 267 _progress_bar_func(min(1.0, 268 cast(float)_bytes_read / (_stream.total_compressed_size * 269 _stream.average_compression_ratio))); 270 } 271 } 272 273 private R _range; 274 private S _stream; 275 private size_t _bytes_read; 276 private void delegate(lazy float p) _progress_bar_func; 277 private void delegate() _finish_func; 278 private bool _called_finish_func = false; 279 } 280 281 /** 282 Returns: range of all reads in the file, calling $(I progressBarFunc) 283 for each read. 284 $(BR) 285 $(I progressBarFunc) will be called 286 each time next alignment is read, with the argument being a number from [0.0, 1.0], 287 which is estimated progress percentage. 288 $(BR) 289 Notice that $(I progressBarFunc) takes $(D lazy) argument, 290 so that the number of relatively expensive float division operations 291 can be controlled by user. 292 293 Once the iteration is finished (call to $(D empty) returned true), 294 $(I finishFunc) will be called if provided. 295 296 Example: 297 ------------------------------------ 298 import std.functional, std.stdio, bio.std.hts.bam.reader; 299 void progress(lazy float p) { 300 static uint n; 301 if (++n % 63 == 0) writeln(p); // prints progress after every 63 records 302 } 303 ... 304 foreach (read; bam.readsWithProgress(toDelegate(&progress))) { 305 ... 306 } 307 ------------------------------------ 308 */ 309 auto readsWithProgress(alias IteratePolicy=bio.std.hts.bam.readrange.withoutOffsets) 310 (void delegate(lazy float p) progressBarFunc, 311 void delegate() finishFunc=null) 312 { 313 auto _decompressed_stream = getDecompressedBamReadStream(); 314 auto reads_with_offsets = bamReadRange!withOffsets(_decompressed_stream, this); 315 316 alias ReadsWithProgressResult!(IteratePolicy, 317 typeof(reads_with_offsets), BgzfInputStream) Result; 318 319 return Result(reads_with_offsets, _decompressed_stream, 320 progressBarFunc, finishFunc); 321 } 322 323 /// 324 void assumeSequentialProcessing() { 325 _seqprocmode = true; 326 } 327 328 /// Part of IBamSamReader interface 329 std.range.InputRange!(bio.std.hts.bam.read.BamRead) allReads() @property { 330 return inputRangeObject(reads!withoutOffsets()); 331 } 332 333 /** 334 Returns: the read which starts at a given virtual offset. 335 */ 336 bio.std.hts.bam.read.BamRead getReadAt(bio.core.bgzf.virtualoffset.VirtualOffset offset) { 337 enforce(_random_access_manager !is null); 338 return _random_access_manager.getReadAt(offset); 339 } 340 341 /** 342 Returns: all reads located between two virtual offsets in the BAM file. 343 344 $(BR) 345 First offset must point to the start of an alignment record, 346 and be strictly less than the second one. 347 $(BR) 348 For decompression, the task pool specified at the construction is used. 349 */ 350 auto getReadsBetween(bio.core.bgzf.virtualoffset.VirtualOffset from, 351 bio.core.bgzf.virtualoffset.VirtualOffset to) { 352 enforce(from <= to, "First offset must be less than second"); 353 enforce(_stream_is_seekable, "Stream is not seekable"); 354 355 return _random_access_manager.getReadsBetween(from, to); 356 } 357 358 /** 359 Returns: all reads overlapping any region from a set. 360 */ 361 auto getReadsOverlapping(BamRegion[] regions) { 362 return _random_access_manager.getReads(regions); 363 } 364 365 /** 366 Unmapped reads, i.e. reads at the end of file whose reference id is -1. 367 The file must be coordinate-sorted and indexed. 368 */ 369 auto unmappedReads() { 370 enforce(_random_access_manager !is null); 371 auto bai = _random_access_manager.getBai(); 372 373 VirtualOffset start; 374 start = eofVirtualOffset(); 375 376 auto all_reads = this.reads(); 377 if (!all_reads.empty && all_reads.front.ref_id == -1) 378 start = _reads_start_voffset; 379 380 auto ioffsets = bai.indices[0 .. reference_sequences.length].retro() 381 .map!(index => index.ioffsets.retro()).joiner(); 382 if (!ioffsets.empty) 383 start = ioffsets.front; 384 385 auto stream = _random_access_manager.createStreamStartingFrom(start); 386 auto r = bamReadRange!withOffsets(stream, this); 387 while (!r.empty && r.front.ref_id != -1) 388 r.popFront(); 389 return r; 390 } 391 392 /** 393 Get BAI chunks containing all reads that overlap specified region. 394 For $(I ref_id) = -1, use $(D unmappedReads) method. 395 */ 396 bio.core.bgzf.chunk.Chunk[] getChunks(uint ref_id, int beg, int end) { 397 enforce(_random_access_manager !is null); 398 enforce(beg < end); 399 400 return _random_access_manager.getChunks(BamRegion(ref_id, beg, end)); 401 } 402 403 /** 404 Returns reference sequence with id $(I ref_id). 405 */ 406 bio.std.hts.bam.reference.ReferenceSequence reference(int ref_id) { 407 enforce(ref_id < _reference_sequences.length, "Invalid reference index"); 408 return ReferenceSequence(_random_access_manager, 409 ref_id, 410 _reference_sequences[ref_id]); 411 } 412 413 /** 414 Returns reference sequence named $(I ref_name). 415 416 Example: 417 --------------------------- 418 import std.stdio, bio.std.hts.bam.reader; 419 ... 420 auto bam = new BamReader("file.bam"); 421 writeln(bam["chr2"].length); 422 --------------------------- 423 */ 424 bio.std.hts.bam.reference.ReferenceSequence opIndex(string ref_name) { 425 enforce(hasReference(ref_name), "Reference with name " ~ ref_name ~ " does not exist"); 426 auto ref_id = _reference_sequence_dict[ref_name]; 427 return reference(ref_id); 428 } 429 430 /** 431 Check if reference named $(I ref_name) is presented in BAM header. 432 */ 433 bool hasReference(string ref_name) { 434 return null != (ref_name in _reference_sequence_dict); 435 } 436 437 /** 438 Set buffer size for I/O operations. Values less than 4096 are disallowed. 439 $(BR) 440 This can help in multithreaded applications when several files are read 441 simultaneously (e.g. merging). 442 */ 443 void setBufferSize(size_t buffer_size) { 444 enforce(buffer_size >= 4096, "Buffer size must be >= 4096 bytes"); 445 _buffer_size = buffer_size; 446 _random_access_manager.setBufferSize(buffer_size); 447 } 448 449 package bool _seqprocmode; // available for bio.std.hts.bam.readrange; 450 451 private: 452 453 string _filename; // filename (if available) 454 Stream _source_stream; // compressed 455 BgzfInputStream _decompressed_stream; // decompressed 456 Stream _bam; // decompressed + endian conversion 457 bool _stream_is_seekable; 458 459 // Virtual offset at which alignment records start. 460 VirtualOffset _reads_start_voffset; 461 462 BaiFile _dont_access_me_directly_use_bai_file_for_that; 463 enum BaiStatus { 464 notInitialized, 465 initialized, 466 fileNotFound 467 } 468 BaiStatus _bai_status = BaiStatus.notInitialized; 469 470 void initBai() { 471 if (_bai_status == BaiStatus.notInitialized) { 472 synchronized { 473 try { 474 _dont_access_me_directly_use_bai_file_for_that = BaiFile(_filename); 475 _bai_status = BaiStatus.initialized; 476 } catch (Exception e) { 477 _bai_status = BaiStatus.fileNotFound; 478 } 479 } 480 } 481 } 482 483 // provides access to index file 484 @property ref BaiFile _bai_file() { // initialized lazily 485 initBai(); 486 return _dont_access_me_directly_use_bai_file_for_that; 487 }; 488 489 RandomAccessManager _rndaccssmgr; // unreadable for a purpose 490 @property RandomAccessManager _random_access_manager() { 491 if (_rndaccssmgr is null) { 492 synchronized { 493 initBai(); 494 495 if (_bai_status == BaiStatus.initialized) { 496 _rndaccssmgr = new RandomAccessManager(this, _bai_file); 497 } else { 498 _rndaccssmgr = new RandomAccessManager(this); 499 } 500 501 _rndaccssmgr.setTaskPool(_task_pool); 502 _rndaccssmgr.setBufferSize(_buffer_size); 503 } 504 } 505 return _rndaccssmgr; 506 } 507 508 SamHeader _header; 509 string _headertext; // for lazy SAM header parsing 510 ReferenceSequenceInfo[] _reference_sequences; 511 int[string] _reference_sequence_dict; /// name -> index mapping 512 513 TaskPool _task_pool; 514 size_t _buffer_size = 4096; // buffer size to be used for I/O 515 516 Stream getNativeEndianSourceStream() { 517 assert(_filename !is null); 518 Stream file = new bio.core.utils.stream.File(_filename); 519 return new BufferedStream(file, _buffer_size); 520 } 521 522 Stream getSeekableCompressedStream() { 523 if (_stream_is_seekable) { 524 if (_filename !is null) { 525 auto file = getNativeEndianSourceStream(); 526 version(development) 527 { 528 std.stdio.stderr.writeln("[info] file size: ", file.size); 529 } 530 return new EndianStream(file, Endian.littleEndian); 531 } else { 532 _source_stream.seekSet(0); 533 return _source_stream; 534 } 535 } else { 536 return null; 537 } 538 } 539 540 // get decompressed stream out of compressed BAM file 541 BgzfInputStream getDecompressedStream() { 542 auto compressed_stream = getSeekableCompressedStream(); 543 544 auto block_supplier = new StreamSupplier(compressed_stream is null ? 545 _source_stream : 546 compressed_stream); 547 548 return new BgzfInputStream(block_supplier, _task_pool, 549 _buffer_size); 550 } 551 552 // get decompressed stream starting from the first alignment record 553 BgzfInputStream getDecompressedBamReadStream() { 554 auto compressed_stream = getSeekableCompressedStream(); 555 556 if (compressed_stream !is null) { 557 enforce(_reads_start_voffset != 0UL); 558 559 compressed_stream.seekCur(_reads_start_voffset.coffset); 560 auto block_supplier = new StreamSupplier(compressed_stream); 561 auto stream = new BgzfInputStream(block_supplier, _task_pool, 562 _buffer_size); 563 stream.readString(_reads_start_voffset.uoffset); 564 return stream; 565 } else { 566 // must be initialized in initializeStreams() 567 return _decompressed_stream; 568 } 569 } 570 571 // sets up the streams and ranges 572 void initializeStreams() { 573 574 _decompressed_stream = getDecompressedStream(); 575 _bam = new EndianStream(_decompressed_stream, Endian.littleEndian); 576 } 577 578 // initializes _header 579 void readSamHeader() { 580 int l_text; 581 _bam.read(l_text); 582 583 _headertext = cast(string)(_bam.readString(l_text)); 584 } 585 586 // initialize _reference_sequences 587 void readReferenceSequencesInfo() { 588 int n_ref; 589 _bam.read(n_ref); 590 _reference_sequences = new ReferenceSequenceInfo[n_ref]; 591 foreach (i; 0..n_ref) { 592 _reference_sequences[i] = ReferenceSequenceInfo(_bam); 593 594 // provide mapping Name -> Index 595 _reference_sequence_dict[_reference_sequences[i].name] = i; 596 } 597 } 598 }