1 /* 2 This file is part of BioD. 3 Copyright (C) 2012-2013 Artem Tarasov <lomereiter@gmail.com> 4 5 Permission is hereby granted, free of charge, to any person obtaining a 6 copy of this software and associated documentation files (the "Software"), 7 to deal in the Software without restriction, including without limitation 8 the rights to use, copy, modify, merge, publish, distribute, sublicense, 9 and/or sell copies of the Software, and to permit persons to whom the 10 Software is furnished to do so, subject to the following conditions: 11 12 The above copyright notice and this permission notice shall be included in 13 all copies or substantial portions of the Software. 14 15 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 16 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 17 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 18 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 19 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 20 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER 21 DEALINGS IN THE SOFTWARE. 22 23 */ 24 /// Writing a script/tool for processing BAM data often starts this way: 25 /// 26 /// ------------------------ 27 /// import bio.bam.reader; 28 /// 29 /// void main(string[] args) { 30 /// auto bam = new BamReader(args[1]); // open BAM file 31 /// foreach (read; bam.reads) { // iterate through its reads 32 /// if (read.is_unmapped) 33 /// continue; // maybe skip unmapped ones 34 /// ... 35 /// } 36 /// } 37 /// ------------------------ 38 /// 39 /// Or, if a specific interval on the reference sequence is to be explored: 40 /// ------------------------ 41 /// import bio.bam.pileup; 42 /// ... 43 /// auto reads = bam["chr7"][50_000 .. 60_000]; // BAI index is required 44 /// foreach (column; makePileup(reads)) { ... } // see $(PMODULE pileup) docs 45 /// ------------------------ 46 module bio.bam.reader; 47 48 import bio.bam.abstractreader; 49 public import bio.sam.header; 50 public import bio.bam.reference; 51 public import bio.bam.region; 52 public import bio.bam.read; 53 public import bio.bam.tagvalue; 54 public import bio.bam.readrange; 55 import bio.bam.randomaccessmanager; 56 import bio.bam.baifile; 57 import bio.bam.bai.indexing; 58 import bio.core.utils.range; 59 import bio.core.utils.stream; 60 import bio.core.bgzf.inputstream; 61 public import bio.core.bgzf.virtualoffset; 62 63 import std.system; 64 import std.stdio; 65 import std.algorithm; 66 import std.range; 67 import std.conv; 68 import std.exception; 69 import std.parallelism; 70 import std.array; 71 import core.stdc.stdio; 72 import std.string; 73 74 /** 75 BAM file reader, featuring parallel decompression of BGZF blocks. 76 */ 77 class BamReader : IBamSamReader { 78 79 /** 80 Creates reader associated with file or stream. 81 (If stream constructor is used, no random access is possible.) 82 $(BR) 83 Optionally, task pool can be specified. 84 It will be used to unpack BGZF blocks in parallel. 85 86 Example: 87 ------------------------------------------- 88 import std.parallelism, bio.bam.reader; 89 void main() { 90 auto pool = new TaskPool(4); // use 4 threads 91 scope (exit) pool.finish(); // don't forget! 92 auto bam = new BamReader("file.bam", pool); 93 ... 94 } 95 ------------------------------------------- 96 */ 97 this(undead.stream.Stream stream, 98 TaskPool task_pool = taskPool) { 99 _source_stream = new EndianStream(stream, Endian.littleEndian); 100 _task_pool = task_pool; 101 102 if (stream.seekable) { 103 _stream_is_seekable = true; 104 } 105 106 initializeStreams(); 107 108 auto magic = _bam.readString(4); 109 110 enforce(magic == "BAM\1", "Invalid file format: expected BAM\\1"); 111 112 readSamHeader(); 113 readReferenceSequencesInfo(); 114 115 // right after construction, we are at the beginning 116 // of the list of reads 117 118 if (_stream_is_seekable) { 119 _reads_start_voffset = _decompressed_stream.virtualTell(); 120 } 121 } 122 123 /// ditto 124 this(string filename, std.parallelism.TaskPool task_pool) { 125 126 _filename = filename; 127 _source_stream = getNativeEndianSourceStream(); 128 this(_source_stream, task_pool); 129 } 130 131 /// ditto 132 this(string filename) { 133 this(filename, std.parallelism.taskPool); 134 } 135 136 /** 137 True if BAI file was found for this BAM file. 138 This is necessary for any random-access operations. 139 $(BR) 140 Looks for files in the same directory which filename 141 is either the file name of BAM file with '.bai' appended, 142 or with the last extension replaced with '.bai' 143 (that is, for $(I file.bam) paths $(I file.bai) and 144 $(I file.bam.bai) will be checked) 145 */ 146 bool has_index() @property { 147 return _random_access_manager.found_index_file; 148 } 149 150 /** 151 Creates BAI file. If $(I overwrite) is false, it won't touch 152 existing index if it is already found. 153 */ 154 void createIndex(bool overwrite = false) { 155 if (has_index && !overwrite) 156 return; 157 Stream stream = new BufferedFile(filename ~ ".bai", FileMode.OutNew); 158 scope(exit) stream.close(); 159 bio.bam.bai.indexing.createIndex(this, stream); 160 _bai_status = BaiStatus.notInitialized; 161 _rndaccssmgr = null; 162 } 163 164 /** Filename, if the object was created via file name constructor, 165 $(D null) otherwise. 166 */ 167 string filename() @property const { 168 return _filename; 169 } 170 171 /// If file ends with EOF block, returns virtual offset of the start of EOF block. 172 /// Otherwise, returns virtual offset of the physical end of file. 173 bio.core.bgzf.virtualoffset.VirtualOffset eofVirtualOffset() { 174 return _random_access_manager.eofVirtualOffset(); 175 } 176 177 /// Get BGZF block at a given file offset. 178 bio.core.bgzf.block.BgzfBlock getBgzfBlockAt(ulong offset) { 179 return _random_access_manager.getBgzfBlockAt(offset); 180 } 181 182 /** 183 Returns: SAM header of the BAM file 184 */ 185 bio.sam.header.SamHeader header() @property { 186 if (_header is null) { 187 synchronized { 188 if (_header is null) { 189 _header = new SamHeader(_headertext); 190 _headertext = null; 191 } 192 } 193 } 194 return _header; 195 } 196 197 /** 198 Returns: information about reference sequences 199 */ 200 const(bio.bam.referenceinfo.ReferenceSequenceInfo)[] reference_sequences() @property const nothrow { 201 return _reference_sequences; 202 } 203 204 /** 205 Range of all alignment records in the file. 206 $(BR) 207 Element type of the returned range depends on the policy. 208 Default one is $(DPREF2 bam, readrange, withoutOffsets), 209 in this case range element type is $(DPREF2 bam, read, BamRead). 210 $(BR) 211 The other option is $(DPREF2 bam, readrange, withOffsets), 212 which allows to track read virtual offsets in the file. 213 In this case range element type is $(DPREF2 bam, readrange, BamReadBlock). 214 215 Example: 216 ---------------------------------- 217 import bio.bam.readrange; 218 ... 219 auto bam = new BamReader("file.bam"); 220 auto reads = bam.reads!withOffsets(); 221 writeln(reads.front.start_virtual_offset); 222 ---------------------------------- 223 */ 224 auto reads(alias IteratePolicy=bio.bam.readrange.withoutOffsets)() @property { 225 auto _decompressed_stream = getDecompressedBamReadStream(); 226 return bamReadRange!IteratePolicy(_decompressed_stream, this); 227 } 228 229 static struct ReadsWithProgressResult(alias IteratePolicy, R, S) { 230 this(R range, S stream, 231 void delegate(lazy float p) progressBarFunc, 232 void delegate() finishFunc) 233 { 234 _range = range; 235 _stream = stream; 236 _progress_bar_func = progressBarFunc; 237 _finish_func = finishFunc; 238 } 239 240 static if (__traits(identifier, IteratePolicy) == "withOffsets") { 241 auto front() @property { 242 return _range.front; 243 } 244 } else static if (__traits(identifier, IteratePolicy) == "withoutOffsets") { 245 auto front() @property { 246 return _range.front.read; 247 } 248 } else static assert(0, __traits(identifier, IteratePolicy)); 249 250 bool empty() @property { 251 auto result = _range.empty; 252 if (_finish_func !is null && !_called_finish_func && result) { 253 _called_finish_func = true; 254 _finish_func(); 255 } 256 return result; 257 } 258 259 void popFront() { 260 _bytes_read += _range.front.read.size_in_bytes; 261 _range.popFront(); 262 if (_progress_bar_func !is null) { 263 _progress_bar_func(min(1.0, 264 cast(float)_bytes_read / (_stream.total_compressed_size * 265 _stream.average_compression_ratio))); 266 } 267 } 268 269 private R _range; 270 private S _stream; 271 private size_t _bytes_read; 272 private void delegate(lazy float p) _progress_bar_func; 273 private void delegate() _finish_func; 274 private bool _called_finish_func = false; 275 } 276 277 /** 278 Returns: range of all reads in the file, calling $(I progressBarFunc) 279 for each read. 280 $(BR) 281 $(I progressBarFunc) will be called 282 each time next alignment is read, with the argument being a number from [0.0, 1.0], 283 which is estimated progress percentage. 284 $(BR) 285 Notice that $(I progressBarFunc) takes $(D lazy) argument, 286 so that the number of relatively expensive float division operations 287 can be controlled by user. 288 289 Once the iteration is finished (call to $(D empty) returned true), 290 $(I finishFunc) will be called if provided. 291 292 Example: 293 ------------------------------------ 294 import std.functional, std.stdio, bio.bam.reader; 295 void progress(lazy float p) { 296 static uint n; 297 if (++n % 63 == 0) writeln(p); // prints progress after every 63 records 298 } 299 ... 300 foreach (read; bam.readsWithProgress(toDelegate(&progress))) { 301 ... 302 } 303 ------------------------------------ 304 */ 305 auto readsWithProgress(alias IteratePolicy=bio.bam.readrange.withoutOffsets) 306 (void delegate(lazy float p) progressBarFunc, 307 void delegate() finishFunc=null) 308 { 309 auto _decompressed_stream = getDecompressedBamReadStream(); 310 auto reads_with_offsets = bamReadRange!withOffsets(_decompressed_stream, this); 311 312 alias ReadsWithProgressResult!(IteratePolicy, 313 typeof(reads_with_offsets), BgzfInputStream) Result; 314 315 return Result(reads_with_offsets, _decompressed_stream, 316 progressBarFunc, finishFunc); 317 } 318 319 /// 320 void assumeSequentialProcessing() { 321 _seqprocmode = true; 322 } 323 324 /// Part of IBamSamReader interface 325 std.range.InputRange!(bio.bam.read.BamRead) allReads() @property { 326 return inputRangeObject(reads!withoutOffsets()); 327 } 328 329 /** 330 Returns: the read which starts at a given virtual offset. 331 */ 332 bio.bam.read.BamRead getReadAt(bio.core.bgzf.virtualoffset.VirtualOffset offset) { 333 enforce(_random_access_manager !is null); 334 return _random_access_manager.getReadAt(offset); 335 } 336 337 /** 338 Returns: all reads located between two virtual offsets in the BAM file. 339 340 $(BR) 341 First offset must point to the start of an alignment record, 342 and be strictly less than the second one. 343 $(BR) 344 For decompression, the task pool specified at the construction is used. 345 */ 346 auto getReadsBetween(bio.core.bgzf.virtualoffset.VirtualOffset from, 347 bio.core.bgzf.virtualoffset.VirtualOffset to) { 348 enforce(from <= to, "First offset must be less than second"); 349 enforce(_stream_is_seekable, "Stream is not seekable"); 350 351 return _random_access_manager.getReadsBetween(from, to); 352 } 353 354 /** 355 Returns: all reads overlapping any region from a set. 356 */ 357 auto getReadsOverlapping(BamRegion[] regions) { 358 return _random_access_manager.getReads(regions); 359 } 360 361 /** 362 Unmapped reads, i.e. reads at the end of file whose reference id is -1. 363 The file must be coordinate-sorted and indexed. 364 */ 365 auto unmappedReads() { 366 enforce(_random_access_manager !is null); 367 auto bai = _random_access_manager.getBai(); 368 369 VirtualOffset start; 370 start = eofVirtualOffset(); 371 372 auto all_reads = this.reads(); 373 if (!all_reads.empty && all_reads.front.ref_id == -1) 374 start = _reads_start_voffset; 375 376 auto ioffsets = bai.indices[0 .. reference_sequences.length].retro() 377 .map!(index => index.ioffsets.retro()).joiner(); 378 if (!ioffsets.empty) 379 start = ioffsets.front; 380 381 auto stream = _random_access_manager.createStreamStartingFrom(start); 382 auto r = bamReadRange!withOffsets(stream, this); 383 while (!r.empty && r.front.ref_id != -1) 384 r.popFront(); 385 return r; 386 } 387 388 /** 389 Get BAI chunks containing all reads that overlap specified region. 390 For $(I ref_id) = -1, use $(D unmappedReads) method. 391 */ 392 bio.core.bgzf.chunk.Chunk[] getChunks(uint ref_id, int beg, int end) { 393 enforce(_random_access_manager !is null); 394 enforce(beg < end); 395 396 return _random_access_manager.getChunks(BamRegion(ref_id, beg, end)); 397 } 398 399 /** 400 Returns reference sequence with id $(I ref_id). 401 */ 402 bio.bam.reference.ReferenceSequence reference(int ref_id) { 403 enforce(ref_id < _reference_sequences.length, "Invalid reference index"); 404 return ReferenceSequence(_random_access_manager, 405 ref_id, 406 _reference_sequences[ref_id]); 407 } 408 409 /** 410 Returns reference sequence named $(I ref_name). 411 412 Example: 413 --------------------------- 414 import std.stdio, bio.bam.reader; 415 ... 416 auto bam = new BamReader("file.bam"); 417 writeln(bam["chr2"].length); 418 --------------------------- 419 */ 420 bio.bam.reference.ReferenceSequence opIndex(string ref_name) { 421 enforce(hasReference(ref_name), "Reference with name " ~ ref_name ~ " does not exist"); 422 auto ref_id = _reference_sequence_dict[ref_name]; 423 return reference(ref_id); 424 } 425 426 /** 427 Check if reference named $(I ref_name) is presented in BAM header. 428 */ 429 bool hasReference(string ref_name) { 430 return null != (ref_name in _reference_sequence_dict); 431 } 432 433 /** 434 Set buffer size for I/O operations. Values less than 4096 are disallowed. 435 $(BR) 436 This can help in multithreaded applications when several files are read 437 simultaneously (e.g. merging). 438 */ 439 void setBufferSize(size_t buffer_size) { 440 enforce(buffer_size >= 4096, "Buffer size must be >= 4096 bytes"); 441 _buffer_size = buffer_size; 442 _random_access_manager.setBufferSize(buffer_size); 443 } 444 445 package bool _seqprocmode; // available for bio.bam.readrange; 446 447 private: 448 449 string _filename; // filename (if available) 450 Stream _source_stream; // compressed 451 BgzfInputStream _decompressed_stream; // decompressed 452 Stream _bam; // decompressed + endian conversion 453 bool _stream_is_seekable; 454 455 // Virtual offset at which alignment records start. 456 VirtualOffset _reads_start_voffset; 457 458 BaiFile _dont_access_me_directly_use_bai_file_for_that; 459 enum BaiStatus { 460 notInitialized, 461 initialized, 462 fileNotFound 463 } 464 BaiStatus _bai_status = BaiStatus.notInitialized; 465 466 void initBai() { 467 if (_bai_status == BaiStatus.notInitialized) { 468 synchronized { 469 try { 470 _dont_access_me_directly_use_bai_file_for_that = BaiFile(_filename); 471 _bai_status = BaiStatus.initialized; 472 } catch (Exception e) { 473 _bai_status = BaiStatus.fileNotFound; 474 } 475 } 476 } 477 } 478 479 // provides access to index file 480 @property ref BaiFile _bai_file() { // initialized lazily 481 initBai(); 482 return _dont_access_me_directly_use_bai_file_for_that; 483 }; 484 485 RandomAccessManager _rndaccssmgr; // unreadable for a purpose 486 @property RandomAccessManager _random_access_manager() { 487 if (_rndaccssmgr is null) { 488 synchronized { 489 initBai(); 490 491 if (_bai_status == BaiStatus.initialized) { 492 _rndaccssmgr = new RandomAccessManager(this, _bai_file); 493 } else { 494 _rndaccssmgr = new RandomAccessManager(this); 495 } 496 497 _rndaccssmgr.setTaskPool(_task_pool); 498 _rndaccssmgr.setBufferSize(_buffer_size); 499 } 500 } 501 return _rndaccssmgr; 502 } 503 504 SamHeader _header; 505 string _headertext; // for lazy SAM header parsing 506 ReferenceSequenceInfo[] _reference_sequences; 507 int[string] _reference_sequence_dict; /// name -> index mapping 508 509 TaskPool _task_pool; 510 size_t _buffer_size = 4096; // buffer size to be used for I/O 511 512 Stream getNativeEndianSourceStream() { 513 assert(_filename !is null); 514 Stream file = new bio.core.utils.stream.File(_filename); 515 return new BufferedStream(file, _buffer_size); 516 } 517 518 Stream getSeekableCompressedStream() { 519 if (_stream_is_seekable) { 520 if (_filename !is null) { 521 auto file = getNativeEndianSourceStream(); 522 version(development) 523 { 524 std.stdio.stderr.writeln("[info] file size: ", file.size); 525 } 526 return new EndianStream(file, Endian.littleEndian); 527 } else { 528 _source_stream.seekSet(0); 529 return _source_stream; 530 } 531 } else { 532 return null; 533 } 534 } 535 536 // get decompressed stream out of compressed BAM file 537 BgzfInputStream getDecompressedStream() { 538 auto compressed_stream = getSeekableCompressedStream(); 539 540 auto block_supplier = new StreamSupplier(compressed_stream is null ? 541 _source_stream : 542 compressed_stream); 543 544 return new BgzfInputStream(block_supplier, _task_pool, 545 null, _buffer_size); 546 } 547 548 // get decompressed stream starting from the first alignment record 549 BgzfInputStream getDecompressedBamReadStream() { 550 auto compressed_stream = getSeekableCompressedStream(); 551 552 if (compressed_stream !is null) { 553 enforce(_reads_start_voffset != 0UL); 554 555 compressed_stream.seekCur(_reads_start_voffset.coffset); 556 auto block_supplier = new StreamSupplier(compressed_stream); 557 auto stream = new BgzfInputStream(block_supplier, _task_pool, 558 null, _buffer_size); 559 stream.readString(_reads_start_voffset.uoffset); 560 return stream; 561 } else { 562 // must be initialized in initializeStreams() 563 return _decompressed_stream; 564 } 565 } 566 567 // sets up the streams and ranges 568 void initializeStreams() { 569 570 _decompressed_stream = getDecompressedStream(); 571 _bam = new EndianStream(_decompressed_stream, Endian.littleEndian); 572 } 573 574 // initializes _header 575 void readSamHeader() { 576 int l_text; 577 _bam.read(l_text); 578 579 _headertext = cast(string)(_bam.readString(l_text)); 580 } 581 582 // initialize _reference_sequences 583 void readReferenceSequencesInfo() { 584 int n_ref; 585 _bam.read(n_ref); 586 _reference_sequences = new ReferenceSequenceInfo[n_ref]; 587 foreach (i; 0..n_ref) { 588 _reference_sequences[i] = ReferenceSequenceInfo(_bam); 589 590 // provide mapping Name -> Index 591 _reference_sequence_dict[_reference_sequences[i].name] = i; 592 } 593 } 594 }