1 /* 2 This file is part of BioD. 3 Copyright (C) 2012-2016 Artem Tarasov <lomereiter@gmail.com> 4 5 Permission is hereby granted, free of charge, to any person obtaining a 6 copy of this software and associated documentation files (the "Software"), 7 to deal in the Software without restriction, including without limitation 8 the rights to use, copy, modify, merge, publish, distribute, sublicense, 9 and/or sell copies of the Software, and to permit persons to whom the 10 Software is furnished to do so, subject to the following conditions: 11 12 The above copyright notice and this permission notice shall be included in 13 all copies or substantial portions of the Software. 14 15 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 16 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 17 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 18 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 19 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 20 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER 21 DEALINGS IN THE SOFTWARE. 22 23 */ 24 /// $(P This module is used for iterating over columns of alignment.) 25 /// $(P The function makePileup is called on 26 /// a range of coordinate-sorted reads mapped to the same reference. 27 /// It returns an input range of columns.) 28 /// $(P This returned range can then be iterated with $(D foreach). 29 /// First column is located at the same position on the reference, 30 /// as the first base of the first read. 31 /// $(BR) 32 /// Each $(D popFront) operation advances current position on the 33 /// reference. The default behaviour is to exclude sites with zero coverage 34 /// from the iteration.) 35 /// $(P Each column keeps set of reads that overlap corresponding position 36 /// on the reference. 37 /// If reads contain MD tags, and makePileup was asked 38 /// to use them, reference base at the column is also available.) 39 /// $(BR) 40 /// Each read preserves all standard read properties 41 /// but also keeps column-related information, namely 42 /// <ul> 43 /// $(LI number of bases consumed from the read sequence so far) 44 /// $(LI current CIGAR operation and offset in it) 45 /// $(LI all CIGAR operations before and after current one)</ul> 46 /// $(BR) 47 /// It is clear from the above that current CIGAR operation cannot be an insertion. 48 /// The following are suggested ways to check for them: 49 /// <ul> 50 /// $(LI $(D cigar_after.length > 0 && 51 /// cigar_operation_offset == cigar_operation.length - 1 && 52 /// cigar_after[0].type == 'I')) 53 /// $(LI $(D cigar_before.length > 0 && 54 /// cigar_operation_offset == 0 && 55 /// cigar_before[$ - 1].type == 'I'))</ul> 56 /// $(BR) 57 /// Example: 58 /// --------------------------------------------------------- 59 /// import bio.std.hts.bam.reader, bio.std.hts.bam.pileup, std.stdio, std.algorithm : count; 60 /// void main() { 61 /// auto bam = new BamReader("file.bam"); // assume single reference and MD tags 62 /// auto pileup = bam.reads().makePileup(useMD); 63 /// foreach (column; pileup) { 64 /// auto matches = column.bases.count(column.reference_base); 65 /// if (matches < column.coverage * 2 / 3) 66 /// writeln(column.position); // print positions of possible mismatches 67 /// } 68 /// } 69 /// --------------------------------------------------------- 70 module bio.std.hts.bam.pileup; 71 72 import bio.std.hts.bam.cigar; 73 import bio.std.hts.bam.read; 74 import bio.std.hts.bam.md.reconstruct; 75 import bio.std.hts.bam.splitter; 76 77 import std.algorithm; 78 import std.range; 79 import std.random; 80 import std.traits; 81 import std.conv; 82 import std.array; 83 import std.exception; 84 85 /// Represents a read aligned to a column 86 struct PileupRead(Read=bio.std.hts.bam.read.EagerBamRead) { 87 88 Read read; /// 89 alias read this; 90 private alias read _read; 91 92 /// Current CIGAR operation. One of 'M', '=', 'X', 'D', 'N. 93 /// Use $(D cigar_after)/$(D cigar_before) to determine insertions. 94 bio.std.hts.bam.read.CigarOperation cigar_operation() @property const { 95 return _cur_op; 96 } 97 98 /// Number of bases consumed from the current CIGAR operation. 99 uint cigar_operation_offset() @property const { 100 return _cur_op_offset; 101 } 102 103 /// CIGAR operations after the current operation 104 const(bio.std.hts.bam.read.CigarOperation)[] cigar_after() @property const { 105 return _read.cigar[_cur_op_index + 1 .. $]; 106 } 107 108 /// CIGAR operations before the current operation 109 const(bio.std.hts.bam.read.CigarOperation)[] cigar_before() @property const { 110 return _read.cigar[0 .. _cur_op_index]; 111 } 112 113 /// If current CIGAR operation is one of 'M', '=', or 'X', returns read base 114 /// at the current column. Otherwise, returns '-'. 115 char current_base() @property const { 116 assert(_query_offset <= _read.sequence_length); 117 if (_cur_op.is_query_consuming && _cur_op.is_reference_consuming) { 118 return _read.sequence[_query_offset]; 119 } else { 120 return '-'; 121 } 122 } 123 124 /// If current CIGAR operation is one of 'M', '=', or 'X', returns 125 /// Phred-scaled read base quality at the current column. 126 /// Otherwise, returns 255. 127 ubyte current_base_quality() @property const { 128 assert(_query_offset <= _read.sequence_length); 129 if (_cur_op.is_query_consuming && _cur_op.is_reference_consuming) { 130 return _read.base_qualities[_query_offset]; 131 } else { 132 return 255; 133 } 134 } 135 136 /// Returns number of bases consumed from the read sequence. 137 /// $(BR) 138 /// More concisely, 139 /// $(UL 140 /// $(LI if current CIGAR operation is 'M', '=', or 'X', 141 /// index of current read base in the read sequence) 142 /// $(LI if current CIGAR operation is 'D' or 'N', 143 /// index of read base after the deletion) 144 /// ) 145 /// (in both cases indices are 0-based) 146 int query_offset() @property const { 147 assert(_query_offset <= _read.sequence_length); 148 return _query_offset; 149 } 150 151 /// Returns duplicate 152 PileupRead dup() @property { 153 PileupRead r = void; 154 r._read = _read; // logically const, thus no .dup here 155 r._cur_op_index = _cur_op_index; 156 r._cur_op = _cur_op; 157 r._cur_op_offset = _cur_op_offset; 158 r._query_offset = _query_offset; 159 return r; 160 } 161 162 private { 163 // index of current CIGAR operation in _read.cigar 164 uint _cur_op_index; 165 166 // current CIGAR operation 167 CigarOperation _cur_op; 168 169 // number of bases consumed from the current CIGAR operation 170 uint _cur_op_offset; 171 172 // number of bases consumed from the read sequence 173 uint _query_offset; 174 175 this(Read read) { 176 _read = read; 177 178 // find first M/=/X/D operation 179 auto cigar = _read.cigar; 180 for (_cur_op_index = 0; _cur_op_index < cigar.length; ++_cur_op_index) { 181 _cur_op = cigar[_cur_op_index]; 182 if (_cur_op.is_reference_consuming) { 183 if (_cur_op.type != 'N') { 184 break; 185 } 186 } else if (_cur_op.is_query_consuming) { 187 _query_offset += _cur_op.length; // skip S and I operations 188 } 189 } 190 191 assertCigarIndexIsValid(); 192 } 193 194 // move one base to the right on the reference 195 void incrementPosition() { 196 ++_cur_op_offset; 197 198 // if current CIGAR operation is D or N, query offset is untouched 199 if (_cur_op.is_query_consuming) { 200 ++_query_offset; 201 } 202 203 if (_cur_op_offset >= _cur_op.length) { 204 205 _cur_op_offset = 0; // reset CIGAR operation offset 206 207 auto cigar = _read.cigar; 208 // get next reference-consuming CIGAR operation (M/=/X/D/N) 209 for (++_cur_op_index; _cur_op_index < cigar.length; ++_cur_op_index) { 210 _cur_op = cigar[_cur_op_index]; 211 if (_cur_op.is_reference_consuming) { 212 break; 213 } 214 215 if (_cur_op.is_query_consuming) { 216 _query_offset += _cur_op.length; 217 } 218 } 219 220 assertCigarIndexIsValid(); 221 } 222 } 223 224 void assertCigarIndexIsValid() { 225 assert(_cur_op_index < _read.cigar.length, "Invalid read " ~ _read.name 226 ~ " - CIGAR " ~ _read.cigarString() 227 ~ ", sequence " ~ to!string(_read.sequence)); 228 } 229 } 230 } 231 232 static assert(isBamRead!(PileupRead!BamRead)); 233 //static assert(isBamRead!(PileupRead!(EagerBamRead!BamRead))); 234 235 /// Represents a single pileup column 236 struct PileupColumn(R) { 237 private { 238 ulong _position; 239 int _ref_id = -1; 240 R _reads; 241 size_t _n_starting_here; 242 } 243 244 /// Reference base. 'N' if not available. 245 char reference_base() @property const { 246 return _reference_base; 247 } 248 249 private char _reference_base = 'N'; 250 251 /// Coverage at this position (equals to number of reads) 252 size_t coverage() const @property { 253 return _reads.length; 254 } 255 256 /// Returns reference ID (-1 if unmapped) 257 int ref_id() const @property { 258 return _ref_id; 259 } 260 261 /// Position on the reference 262 ulong position() const @property { 263 return _position; 264 } 265 266 /// Reads overlapping the position, sorted by coordinate 267 auto reads() @property { 268 return assumeSorted!compareCoordinates(_reads[]); 269 } 270 271 /// Reads that have leftmost mapped position at this column 272 auto reads_starting_here() @property { 273 return _reads[$ - _n_starting_here .. $]; 274 } 275 276 /// Shortcut for map!(read => read.current_base)(reads) 277 auto bases() @property { 278 return map!"a.current_base"(reads); 279 } 280 281 /// Shortcut for map!(read => read.current_base_quality)(reads) 282 auto base_qualities() @property { 283 return map!"a.current_base_quality"(reads); 284 } 285 286 /// Shortcut for map!(read => read.mapping_quality)(reads) 287 auto read_qualities() @property { 288 return map!"a.mapping_quality"(reads); 289 } 290 } 291 292 /** 293 * The class for iterating reference bases together with reads overlapping them. 294 */ 295 class PileupRange(R, alias TColumn=PileupColumn) { 296 alias Unqual!(ElementType!R) Raw; 297 alias EagerBamRead!Raw Eager; 298 alias PileupRead!Eager Read; 299 alias Read[] ReadArray; 300 alias TColumn!ReadArray Column; 301 302 private { 303 R _reads; 304 Column _column; 305 Appender!ReadArray _read_buf; 306 bool _skip_zero_coverage; 307 } 308 309 protected { 310 // This is extracted into a method not only to reduce duplication 311 // (not so much of it), but to allow to override it! 312 // For that reason it is not marked as final. Overhead of virtual 313 // function is negligible compared to computations in EagerBamRead 314 // constructor together with inserting new element into appender. 315 void add(ref Raw read) { 316 _read_buf.put(PileupRead!Eager(Eager(read))); 317 } 318 } 319 320 /** 321 * Create new pileup iterator from a range of reads. 322 */ 323 this(R reads, bool skip_zero_coverage) { 324 _reads = reads; 325 _read_buf = appender!ReadArray(); 326 _skip_zero_coverage = skip_zero_coverage; 327 328 if (!_reads.empty) { 329 initNewReference(); // C++ programmers, don't worry! Virtual tables in D 330 // are populated before constructor body is executed. 331 } 332 } 333 334 /// Returns PileupColumn struct corresponding to the current position. 335 ref Column front() @property { 336 return _column; 337 } 338 339 /// Whether all reads have been processed. 340 bool empty() @property { 341 return _reads.empty && _read_buf.data.empty; 342 } 343 344 /// Move to next position on the reference. 345 void popFront() { 346 auto pos = ++_column._position; 347 348 size_t survived = 0; 349 auto data = _read_buf.data; 350 351 for (size_t i = 0; i < data.length; ++i) { 352 if (data[i].end_position > pos) { 353 if (survived < i) 354 { 355 data[survived] = data[i]; 356 } 357 ++survived; 358 } 359 } 360 361 for (size_t i = 0; i < survived; ++i) { 362 data[i].incrementPosition(); 363 } 364 // unless range is empty, this value is 365 _read_buf.shrinkTo(survived); 366 367 _column._n_starting_here = 0; // updated either in initNewReference() 368 // or in the loop below 369 370 if (!_reads.empty) { 371 if (_reads.front.ref_id != _column._ref_id && 372 survived == 0) // processed all reads aligned to the previous reference 373 { 374 initNewReference(); 375 } else { 376 size_t n = 0; 377 while (!_reads.empty && 378 _reads.front.position == pos && 379 _reads.front.ref_id == _column._ref_id) 380 { 381 auto read = _reads.front; 382 add(read); 383 _reads.popFront(); 384 ++n; 385 } 386 _column._n_starting_here = n; 387 388 // handle option of skipping sites with zero coverage 389 if (survived == 0 && n == 0 && _skip_zero_coverage) { 390 // the name might be misleading but it does the trick 391 initNewReference(); 392 } 393 } 394 } 395 396 _column._reads = _read_buf.data; 397 } 398 399 protected void initNewReference() { 400 auto read = _reads.front; 401 402 _column._position = read.position; 403 _column._ref_id = read.ref_id; 404 uint n = 1; 405 add(read); 406 407 _reads.popFront(); 408 409 while (!_reads.empty) { 410 read = _reads.front; 411 if (read.ref_id == _column.ref_id && 412 read.position == _column._position) 413 { 414 add(read); 415 ++n; 416 _reads.popFront(); 417 } else { 418 break; 419 } 420 } 421 422 _column._n_starting_here = n; 423 _column._reads = _read_buf.data; 424 } 425 } 426 427 /// Abstract pileup structure. S is type of column range. 428 struct AbstractPileup(R, S) { 429 private R reads_; 430 R reads() @property { 431 return reads_; 432 } 433 434 S columns; 435 /// Pileup columns 436 alias columns this; 437 438 private { 439 ulong _start_position; 440 ulong _end_position; 441 int _ref_id; 442 } 443 444 /// $(D start_from) parameter provided to a pileup function 445 ulong start_position() @property const { 446 return _start_position; 447 } 448 449 /// $(D end_at) parameter provided to a pileup function 450 ulong end_position() @property const { 451 return _end_position; 452 } 453 454 /// Reference ID of all reads in this pileup. 455 int ref_id() @property const { 456 return _ref_id; 457 } 458 } 459 460 struct TakeUntil(alias pred, Range, Sentinel) if (isInputRange!Range) 461 { 462 private Range _input; 463 private Sentinel _sentinel; 464 bool _done; 465 466 this(Range input, Sentinel sentinel) { 467 _input = input; _sentinel = sentinel; _done = _input.empty || predSatisfied(); 468 } 469 470 @property bool empty() { return _done; } 471 @property auto ref front() { return _input.front; } 472 private bool predSatisfied() { return startsWith!pred(_input, _sentinel); } 473 void popFront() { _input.popFront(); _done = _input.empty || predSatisfied(); } 474 } 475 476 auto takeUntil(alias pred, Range, Sentinel)(Range range, Sentinel sentinel) { 477 return TakeUntil!(pred, Range, Sentinel)(range, sentinel); 478 } 479 480 auto pileupInstance(alias P, R)(R reads, ulong start_from, ulong end_at, bool skip_zero_coverage) { 481 auto rs = filter!"a.basesCovered() > 0"(reads); 482 while (!rs.empty) { 483 auto r = rs.front; 484 if (r.position + r.basesCovered() < start_from) { 485 rs.popFront(); 486 } else { 487 break; 488 } 489 } 490 int ref_id = -1; 491 if (!rs.empty) { 492 ref_id = rs.front.ref_id; 493 } 494 auto sameref_rs = takeUntil!"a.ref_id != b"(rs, ref_id); 495 alias typeof(sameref_rs) ReadRange; 496 PileupRange!ReadRange columns = new P!ReadRange(sameref_rs, skip_zero_coverage); 497 while (!columns.empty) { 498 auto c = columns.front; 499 if (c.position < start_from) { 500 columns.popFront(); 501 } else { 502 break; 503 } 504 } 505 auto chopped = takeUntil!"a.position >= b"(columns, end_at); 506 return AbstractPileup!(R, typeof(chopped))(reads, chopped, start_from, end_at, ref_id); 507 } 508 509 auto pileupColumns(R)(R reads, bool use_md_tag=false, bool skip_zero_coverage=true) { 510 auto rs = filter!"a.basesCovered() > 0"(reads); 511 alias typeof(rs) ReadRange; 512 PileupRange!ReadRange columns; 513 if (use_md_tag) { 514 columns = new PileupRangeUsingMdTag!ReadRange(rs, skip_zero_coverage); 515 } else { 516 columns = new PileupRange!ReadRange(rs, skip_zero_coverage); 517 } 518 return columns; 519 } 520 521 /// Tracks current reference base 522 final static class PileupRangeUsingMdTag(R) : 523 PileupRange!(R, PileupColumn) 524 { 525 // The code is similar to that in reconstruct.d but here we can't make 526 // an assumption about any particular read having non-zero length on reference. 527 528 // current chunk of reference 529 private alias typeof(_column._reads[].front) Read; 530 private ReturnType!(dna!Read) _chunk; 531 532 // end position of the current chunk on reference (assuming half-open interval) 533 private uint _chunk_end_position; 534 535 // next read from which we will extract reference chunk 536 // 537 // More precisely, 538 // _next_chunk_provider = argmax (read => read.end_position) 539 // {reads overlapping current column} 540 private Read _next_chunk_provider; 541 542 private bool _has_next_chunk_provider = false; 543 544 // coverage at the previous location 545 private ulong _prev_coverage; 546 547 // we also track current reference ID 548 private int _curr_ref_id = -1; 549 550 /// 551 this(R reads, bool skip_zero_coverage) { 552 super(reads, skip_zero_coverage); 553 } 554 555 alias Unqual!(ElementType!R) Raw; 556 557 // Checks length of the newly added read and tracks the read which 558 // end position on the reference is the largest. 559 // 560 // When reconstructed reference chunk will become empty, next one will be 561 // constructed from that read. This algorithm allows to minimize the number 562 // of reads for which MD tag will be decoded. 563 protected override void add(ref Raw read) { 564 // the behaviour depends on whether a new contig starts here or not 565 bool had_zero_coverage = _prev_coverage == 0; 566 567 super.add(read); 568 569 // get wrapped read 570 auto _read = _read_buf.data.back; 571 572 // if we've just moved to another reference sequence, do the setup 573 if (_read.ref_id != _curr_ref_id) { 574 _curr_ref_id = _read.ref_id; 575 576 _has_next_chunk_provider = true; 577 _next_chunk_provider = _read; 578 return; 579 } 580 581 // two subsequent next_chunk_providers must overlap 582 // unless (!) there was a region with zero coverage in-between 583 if (_read.position > _chunk_end_position && !had_zero_coverage) { 584 return; 585 } 586 587 // compare with previous candidate and replace if this one is better 588 if (_read.end_position > _chunk_end_position) { 589 if (!_has_next_chunk_provider) { 590 _has_next_chunk_provider = true; 591 _next_chunk_provider = _read; 592 } else if (_read.end_position > _next_chunk_provider.end_position) { 593 _next_chunk_provider = _read; 594 } 595 } 596 } 597 598 protected override void initNewReference() { 599 _prev_coverage = 0; 600 super.initNewReference(); 601 if (_has_next_chunk_provider) { 602 // prepare first chunk 603 _chunk = dna(_next_chunk_provider); 604 _chunk_end_position = _next_chunk_provider.end_position; 605 _has_next_chunk_provider = false; 606 _column._reference_base = _chunk.front; 607 _chunk.popFront(); 608 } else { 609 _column._reference_base = 'N'; 610 } 611 } 612 613 /// 614 override void popFront() { 615 if (!_chunk.empty) { 616 // update current reference base 617 _column._reference_base = _chunk.front; 618 619 _chunk.popFront(); 620 } else { 621 _column._reference_base = 'N'; 622 } 623 624 // update _prev_coverage 625 _prev_coverage = _column.coverage; 626 627 // the order is important - maybe we will obtain new next_chunk_provider 628 // during this call to popFront() 629 super.popFront(); 630 631 // If we have consumed the whole current chunk, 632 // we need to obtain the next one if it's possible. 633 if (_chunk.empty && _has_next_chunk_provider) { 634 _chunk = dna(_next_chunk_provider); 635 636 debug { 637 /* import std.stdio; 638 writeln(); 639 writeln("position: ", _next_chunk_provider.position); 640 writeln("next chunk: ", to!string(_chunk)); 641 */ 642 } 643 644 _chunk_end_position = _next_chunk_provider.end_position; 645 646 _has_next_chunk_provider = false; 647 648 _chunk.popFrontN(cast(size_t)(_column.position - _next_chunk_provider.position)); 649 650 _column._reference_base = _chunk.front; 651 _chunk.popFront(); 652 } 653 } 654 } 655 656 /// Creates a pileup range from a range of reads. 657 /// Note that all reads must be aligned to the same reference. 658 /// 659 /// See $(D PileupColumn) documentation for description of range elements. 660 /// Note also that you can't use $(D std.array.array()) function on pileup 661 /// because it won't make deep copies of underlying data structure. 662 /// (One might argue that in this case it would be better to use opApply, 663 /// but typically one would use $(D std.algorithm.map) on pileup columns 664 /// to obtain some numeric characteristics.) 665 /// 666 /// Params: 667 /// use_md_tag = if true, use MD tag together with CIGAR 668 /// to recover reference bases 669 /// 670 /// start_from = position from which to start 671 /// 672 /// end_at = position before which to stop 673 /// 674 /// $(BR) 675 /// That is, the range of positions is half-open interval 676 /// $(BR) 677 /// [max(start_from, first mapped read start position), 678 /// $(BR) 679 /// min(end_at, last mapped end position among all reads)) 680 /// 681 /// skip_zero_coverage = if true, skip sites with zero coverage 682 /// 683 auto makePileup(R)(R reads, 684 bool use_md_tag=false, 685 ulong start_from=0, 686 ulong end_at=ulong.max, 687 bool skip_zero_coverage=true) 688 { 689 if (use_md_tag) { 690 return pileupInstance!PileupRangeUsingMdTag(reads, start_from, end_at, skip_zero_coverage); 691 } else { 692 return pileupInstance!PileupRange(reads, start_from, end_at, skip_zero_coverage); 693 } 694 } 695 696 /// Allows to express the intention clearer. 697 enum useMD = true; 698 699 unittest { 700 import std.algorithm; 701 import std.range; 702 import std.array; 703 704 // the set of reads below was taken from 1000 Genomes BAM file 705 // NA20828.mapped.ILLUMINA.bwa.TSI.low_coverage.20101123.bam 706 // (region 20:1127810-1127819) 707 auto readnames = array(iota(10).map!(i => "r" ~ to!string(i))()); 708 709 auto sequences = ["ATTATGGACATTGTTTCCGTTATCATCATCATCATCATCATCATCATTATCATC", 710 "GACATTGTTTCCGTTATCATCATCATCATCATCATCATCATCATCATCATCATC", 711 "ATTGTTTCCGTTATCATCATCATCATCATCATCATCATCATCATCATCATCACC", 712 "TGTTTCCGTTATCATCATCATCATCATCATCATCATCATCATCATCATCACCAC", 713 "TCCGTTATCATCATCATCATCATCATCATCATCATCATCATCATCACCACCACC", 714 "GTTATCATCATCATCATCATCATCATCATCATCATCATCATCATCGTCACCCTG", 715 "TCATCATCATCATAATCATCATCATCATCATCATCATCGTCACCCTGTGTTGAG", 716 "TCATCATCATCGTCACCCTGTGTTGAGGACAGAAGTAATTTCCCTTTCTTGGCT", 717 "TCATCATCATCATCACCACCACCACCCTGTGTTGAGGACAGAAGTAATATCCCT", 718 "CACCACCACCCTGTGTTGAGGACAGAAGTAATTTCCCTTTCTTGGCTGGTCACC"]; 719 720 // multiple sequence alignment: 721 // *** 722 // ATTATGGACATTGTTTCCGTTATCATCATCATCATCATCATCATCATTATCATC 723 // GACATTGTTTCCGTTATCATCATCATCATCATCATCATCATCATCATCATCAT---C 724 // ATTGTTTCCGTTATCATCATCATCATCATCATCATCATCATCATCATCATCACC 725 // TGTTTCCGTTATCATCATCATCATCATCATCATCATCATCATCATCAT---CACCAC 726 // TCCGTTATCATCATCATCATCATCATCATCATCATCATCATCAT---CACCACCACC 727 // GTTATCATCATCATCATCATCATCATCATCATCATCATCAT---CATCGTCACCCTG 728 // ATCATCATCATAATCATCATCATCATCATCAT---CATCGTCACCCTGTGTTGAG 729 // TCATCATCATCGTCAC------------------CCTGTGTTGAGGACAGAAGTAATTTCCCTTTCTTGGCT 730 // TCATCATCATCATCACCACCACCACCCTGTGTTGAGGACAGAAGTAATATCCCT 731 // ---CACCACCACCCTGTGTTGAGGACAGAAGTAATTTCCCTTTCTTGGCTGGTCACC 732 // * * * * * * * * * * 733 // 760 770 780 790 800 810 820 830 840 850 734 735 auto cigars = [[CigarOperation(54, 'M')], 736 [CigarOperation(54, 'M')], 737 [CigarOperation(50, 'M'), CigarOperation(3, 'I'), CigarOperation(1, 'M')], 738 [CigarOperation(54, 'M')], 739 [CigarOperation(54, 'M')], 740 [CigarOperation(54, 'M')], 741 [CigarOperation(2, 'S'), CigarOperation(52, 'M')], 742 [CigarOperation(16, 'M'), CigarOperation(15, 'D'), CigarOperation(38, 'M')], 743 [CigarOperation(13, 'M'), CigarOperation(3, 'I'), CigarOperation(38, 'M')], 744 [CigarOperation(54, 'M')]]; 745 746 auto positions = [758, 764, 767, 769, 773, 776, 785, 795, 804, 817]; 747 748 auto md_tags = ["47C6", "54", "51", "50T3", "46T7", "45A0C7", "11C24A0C14", 749 "11A3T0^CATCATCATCACCAC38", "15T29T5", "2T45T5"]; 750 751 BamRead[] reads = new BamRead[10]; 752 753 foreach (i; iota(10)) { 754 reads[i] = BamRead(readnames[i], sequences[i], cigars[i]); 755 reads[i].position = positions[i]; 756 reads[i].ref_id = 0; 757 reads[i]["MD"] = md_tags[i]; 758 } 759 760 auto first_read_position = reads.front.position; 761 auto reference = to!string(dna(reads)); 762 763 import std.stdio; 764 // stderr.writeln("Testing pileup (low-level aspects)..."); 765 766 auto pileup = makePileup(reads, true, 796, 849, false); 767 auto pileup2 = makePileup(reads, true, 0, ulong.max, false); 768 assert(pileup.front.position == 796); 769 assert(pileup.start_position == 796); 770 assert(pileup.end_position == 849); 771 772 while (pileup2.front.position != 796) { 773 pileup2.popFront(); 774 } 775 776 while (!pileup.empty) { 777 auto column = pileup.front; 778 auto column2 = pileup2.front; 779 assert(column.coverage == column2.coverage); 780 pileup2.popFront(); 781 782 // check that DNA is built correctly from MD tags and CIGAR 783 assert(column.reference_base == reference[cast(size_t)(column.position - first_read_position)]); 784 785 switch (column.position) { 786 case 796: 787 assert(equal(column.bases, "CCCCCCAC")); 788 pileup.popFront(); 789 break; 790 case 805: 791 assert(equal(column.bases, "TCCCCCCCC")); 792 pileup.popFront(); 793 break; 794 case 806: 795 assert(equal(column.bases, "AAAAAAAGA")); 796 pileup.popFront(); 797 break; 798 case 810: 799 // last read is not yet fetched by pileup engine 800 assert(column.reads[column.coverage - 2].cigar_after.front.type == 'D'); 801 pileup.popFront(); 802 break; 803 case 817: 804 assert(column.reads[column.coverage - 2].cigar_before.back.type == 'I'); 805 pileup.popFront(); 806 break; 807 case 821: 808 assert(column.reads[column.coverage - 3].cigar_operation.type == 'D'); 809 assert(equal(column.bases, "AAGG-AA")); 810 pileup.popFront(); 811 break; 812 case 826: 813 assert(equal(column.bases, "CCCCCC")); 814 pileup.popFront(); 815 break; 816 case 849: 817 assert(equal(column.bases, "TAT")); 818 pileup.popFront(); 819 assert(pileup.empty); 820 break; 821 default: 822 pileup.popFront(); 823 break; 824 } 825 } 826 827 // another set of reads, the same file, region 20:12360032-12360050 828 // test the case when reference has some places with zero coverage 829 830 reads = [BamRead("r1", "CCCACATAGAAAGCTTGCTGTTTCTCTGTGGGAAGTTTTAACTTAGGTCAGCTT", 831 [CigarOperation(54, 'M')]), 832 BamRead("r2", "TAGAAAGCTTGCTGTTTCTCTGTGGGAAGTTTTAACTTAGGTTAGCTTCATCTA", 833 [CigarOperation(54, 'M')]), 834 BamRead("r3", "TTTTTCTTTCTTTCTTTGAAGAAGGCAGATTCCTGGTCCTGCCACTCAAATTTT", 835 [CigarOperation(54, 'M')]), 836 BamRead("r4", "TTTCTTTCTTTCTTTGAAGAAGGCAGATTCCTGGTCCTGCCACTCAAATTTTCA", 837 [CigarOperation(54, 'M')])]; 838 839 reads[0].position = 979; 840 reads[0]["MD"] = "54"; 841 reads[0].ref_id = 0; 842 843 reads[1].position = 985; 844 reads[1]["MD"] = "42C7C3"; 845 reads[1].ref_id = 0; 846 847 reads[2].position = 1046; 848 reads[2]["MD"] = "54"; 849 reads[2].ref_id = 0; 850 851 reads[3].position = 1048; 852 reads[3]["MD"] = "54"; 853 reads[3].ref_id = 0; 854 855 assert(equal(dna(reads), 856 map!(c => c.reference_base)(makePileup(reads, true, 0, ulong.max, false)))); 857 } 858 859 struct PileupChunkRange(C) { 860 private C _chunks; 861 private ElementType!C _prev_chunk; 862 private ElementType!C _current_chunk; 863 private bool _empty; 864 private ulong _beg = 0; 865 private bool _use_md_tag; 866 private ulong _start_from; 867 private ulong _end_at; 868 private int _chunk_right_end; 869 870 private int computeRightEnd(ref ElementType!C chunk) { 871 return chunk.map!(r => r.position + r.basesCovered()).reduce!max; 872 } 873 874 this(C chunks, bool use_md_tag, ulong start_from, ulong end_at) { 875 _chunks = chunks; 876 _use_md_tag = use_md_tag; 877 _start_from = start_from; 878 _end_at = end_at; 879 while (true) { 880 if (_chunks.empty) { 881 _empty = true; 882 } else { 883 _current_chunk = _chunks.front; 884 _chunks.popFront(); 885 886 if (_current_chunk[0].ref_id < 0) continue; 887 888 _beg = _current_chunk[0].position; 889 if (_beg >= end_at) { 890 _empty = true; 891 break; 892 } 893 894 _chunk_right_end = computeRightEnd(_current_chunk); 895 if (_chunk_right_end > start_from) 896 break; 897 } 898 } 899 } 900 901 bool empty() @property { 902 return _empty; 903 } 904 905 auto front() @property { 906 auto end_pos = _current_chunk[$-1].position; 907 if (_chunks.empty || _chunks.front[0].ref_id != _current_chunk[$-1].ref_id) 908 end_pos = _chunk_right_end; 909 910 return makePileup(chain(_prev_chunk, _current_chunk), 911 _use_md_tag, 912 max(_beg, _start_from), min(end_pos, _end_at)); 913 } 914 915 void popFront() { 916 _prev_chunk = _current_chunk; 917 918 while (true) { 919 if (_chunks.empty) { 920 _empty = true; 921 return; 922 } 923 _current_chunk = _chunks.front; 924 _chunks.popFront(); 925 926 if (_current_chunk[0].ref_id >= 0) break; 927 } 928 929 _chunk_right_end = computeRightEnd(_current_chunk); 930 931 // if we changed reference, nullify prev_chunk 932 if (_prev_chunk.length > 0 && 933 _prev_chunk[$ - 1].ref_id == _current_chunk[0].ref_id) 934 { 935 _beg = _prev_chunk[$-1].position; 936 } else { 937 _beg = _current_chunk[0].position; 938 _prev_chunk.length = 0; 939 } 940 941 // keep only those reads in _prev_chunk that have overlap with the last one 942 943 // 1) estimate read length 944 enum sampleSize = 15; 945 int[sampleSize] buf = void; 946 int read_length = void; 947 if (_prev_chunk.length <= sampleSize) { 948 for (size_t k = 0; k < _prev_chunk.length; ++k) { 949 buf[k] = _prev_chunk[k].sequence_length; 950 } 951 topN(buf[0.._prev_chunk.length], _prev_chunk.length / 2); 952 read_length = buf[_prev_chunk.length / 2]; 953 } else { 954 size_t i = 0; 955 foreach (read; randomSample(_prev_chunk, sampleSize)) 956 buf[i++] = read.sequence_length; 957 topN(buf[], sampleSize / 2); 958 read_length = buf[sampleSize / 2]; 959 debug { 960 import std.stdio; 961 stderr.writeln("[pileupChunks] read_length=", read_length); 962 } 963 } 964 965 // 2) do binary search for those reads that start from (_beg - 2 * read_length) 966 // (it's an experimental fact that almost none of reads consumes that much 967 // on a reference sequence) 968 auto pos = _beg - 2 * read_length; 969 long i = 0; 970 long j = _prev_chunk.length - 1; 971 // positions of _prev_chunk[0 .. i] are less than pos, 972 // positions of _prev_chunk[j + 1 .. $] are more or equal to pos. 973 974 while (i <= j) { 975 auto m = cast(size_t)(i + j) / 2; 976 assert(m < _prev_chunk.length); 977 auto p = _prev_chunk[m].position; 978 if (p >= pos) { 979 j = m - 1; 980 } else { 981 i = m + 1; 982 } 983 } 984 985 _prev_chunk = _prev_chunk[cast(size_t)i .. $]; 986 } 987 } 988 989 /// This function constructs range of non-overlapping consecutive pileups from a range of reads 990 /// so that these pileups can be processed in parallel. 991 /// 992 /// It's allowed to pass ranges of sorted reads with different ref. IDs, 993 /// they won't get mixed in any chunk. 994 /// 995 /// Params: 996 /// use_md_tag = recover reference bases from MD tag and CIGAR 997 /// 998 /// block_size = approximate amount of memory that each pileup will consume, 999 /// given in bytes. (Usually consumption will be a bit higher.) 1000 /// 1001 /// start_from = position of the first column of the first chunk 1002 /// 1003 /// end_at = position after the last column of the last chunk 1004 /// 1005 /// $(BR) 1006 /// WARNING: block size should be big enough so that every block will share 1007 /// some reads only with adjacent blocks. 1008 /// $(BR) 1009 /// As such, it is not recommended to reduce the $(I block_size). 1010 /// But there might be a need to increase it in case of very high coverage. 1011 auto pileupChunks(R)(R reads, bool use_md_tag=false, size_t block_size=16_384_000, 1012 ulong start_from=0, ulong end_at=ulong.max) { 1013 auto chunks = chunksConsumingLessThan(reads, block_size); 1014 return PileupChunkRange!(typeof(chunks))(chunks, use_md_tag, start_from, end_at); 1015 }