1 /*
2     This file is part of BioD.
3     Copyright (C) 2012-2016    Artem Tarasov <lomereiter@gmail.com>
4 
5     Permission is hereby granted, free of charge, to any person obtaining a
6     copy of this software and associated documentation files (the "Software"),
7     to deal in the Software without restriction, including without limitation
8     the rights to use, copy, modify, merge, publish, distribute, sublicense,
9     and/or sell copies of the Software, and to permit persons to whom the
10     Software is furnished to do so, subject to the following conditions:
11 
12     The above copyright notice and this permission notice shall be included in
13     all copies or substantial portions of the Software.
14 
15     THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16     IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17     FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18     AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19     LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20     FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21     DEALINGS IN THE SOFTWARE.
22 
23 */
24 /// $(P This module is used for iterating over columns of alignment.)
25 /// $(P The function makePileup is called on
26 /// a range of coordinate-sorted reads mapped to the same reference.
27 /// It returns an input range of columns.)
28 /// $(P This returned range can then be iterated with $(D foreach).
29 /// First column is located at the same position on the reference,
30 /// as the first base of the first read.
31 /// $(BR)
32 /// Each $(D popFront) operation advances current position on the
33 /// reference. The default behaviour is to exclude sites with zero coverage
34 /// from the iteration.)
35 /// $(P Each column keeps set of reads that overlap corresponding position
36 /// on the reference.
37 /// If reads contain MD tags, and makePileup was asked
38 /// to use them, reference base at the column is also available.)
39 /// $(BR)
40 /// Each read preserves all standard read properties
41 /// but also keeps column-related information, namely
42 /// <ul>
43 ///     $(LI number of bases consumed from the read sequence so far)
44 ///     $(LI current CIGAR operation and offset in it)
45 ///     $(LI all CIGAR operations before and after current one)</ul>
46 /// $(BR)
47 /// It is clear from the above that current CIGAR operation cannot be an insertion.
48 /// The following are suggested ways to check for them:
49 /// <ul>
50 ///     $(LI $(D cigar_after.length > 0 &&
51 ///              cigar_operation_offset == cigar_operation.length - 1 &&
52 ///              cigar_after[0].type == 'I'))
53 ///     $(LI $(D cigar_before.length > 0 &&
54 ///              cigar_operation_offset == 0 &&
55 ///              cigar_before[$ - 1].type == 'I'))</ul>
56 /// $(BR)
57 /// Example:
58 /// ---------------------------------------------------------
59 /// import bio.std.hts.bam.reader, bio.std.hts.bam.pileup, std.stdio, std.algorithm : count;
60 /// void main() {
61 ///     auto bam = new BamReader("file.bam");       // assume single reference and MD tags
62 ///     auto pileup = bam.reads().makePileup(useMD);
63 ///     foreach (column; pileup) {
64 ///         auto matches = column.bases.count(column.reference_base);
65 ///         if (matches < column.coverage * 2 / 3)
66 ///             writeln(column.position);           // print positions of possible mismatches
67 ///     }
68 /// }
69 /// ---------------------------------------------------------
70 module bio.std.hts.bam.pileup;
71 
72 import bio.std.hts.bam.cigar;
73 import bio.std.hts.bam.read;
74 import bio.std.hts.bam.md.reconstruct;
75 import bio.std.hts.bam.splitter;
76 
77 import std.algorithm;
78 import std.range;
79 import std.random;
80 import std.traits;
81 import std.conv;
82 import std.array;
83 import std.exception;
84 
85 /// Represents a read aligned to a column
86 struct PileupRead(Read=bio.std.hts.bam.read.EagerBamRead) {
87 
88     Read read; ///
89     alias read this;
90     private alias read _read;
91 
92     /// Current CIGAR operation. One of 'M', '=', 'X', 'D', 'N.
93     /// Use $(D cigar_after)/$(D cigar_before) to determine insertions.
94     bio.std.hts.bam.read.CigarOperation cigar_operation() @property const {
95         return _cur_op;
96     }
97 
98     /// Number of bases consumed from the current CIGAR operation.
99     uint cigar_operation_offset() @property const {
100         return _cur_op_offset;
101     }
102 
103     /// CIGAR operations after the current operation
104     const(bio.std.hts.bam.read.CigarOperation)[] cigar_after() @property const {
105         return _read.cigar[_cur_op_index + 1 .. $];
106     }
107 
108     /// CIGAR operations before the current operation
109     const(bio.std.hts.bam.read.CigarOperation)[] cigar_before() @property const {
110         return _read.cigar[0 .. _cur_op_index];
111     }
112 
113     /// If current CIGAR operation is one of 'M', '=', or 'X', returns read base
114     /// at the current column. Otherwise, returns '-'.
115     char current_base() @property const {
116         assert(_query_offset <= _read.sequence_length);
117         if (_cur_op.is_query_consuming && _cur_op.is_reference_consuming) {
118             return _read.sequence[_query_offset];
119         } else {
120             return '-';
121         }
122     }
123 
124     /// If current CIGAR operation is one of 'M', '=', or 'X', returns
125     /// Phred-scaled read base quality at the current column.
126     /// Otherwise, returns 255.
127     ubyte current_base_quality() @property const {
128         assert(_query_offset <= _read.sequence_length);
129         if (_cur_op.is_query_consuming && _cur_op.is_reference_consuming) {
130             return _read.base_qualities[_query_offset];
131         } else {
132             return 255;
133         }
134     }
135 
136     /// Returns number of bases consumed from the read sequence.
137     /// $(BR)
138     /// More concisely,
139     /// $(UL
140     ///     $(LI if current CIGAR operation is 'M', '=', or 'X',
141     ///       index of current read base in the read sequence)
142     ///     $(LI if current CIGAR operation is 'D' or 'N',
143     ///       index of read base after the deletion)
144     /// )
145     /// (in both cases indices are 0-based)
146     int query_offset() @property const {
147         assert(_query_offset <= _read.sequence_length);
148         return _query_offset;
149     }
150 
151     /// Returns duplicate
152     PileupRead dup() @property {
153         PileupRead r = void;
154         r._read = _read; // logically const, thus no .dup here
155         r._cur_op_index = _cur_op_index;
156         r._cur_op = _cur_op;
157         r._cur_op_offset = _cur_op_offset;
158         r._query_offset = _query_offset;
159         return r;
160     }
161 
162     private {
163         // index of current CIGAR operation in _read.cigar
164         uint _cur_op_index;
165 
166         // current CIGAR operation
167         CigarOperation _cur_op;
168 
169         // number of bases consumed from the current CIGAR operation
170         uint _cur_op_offset;
171 
172         // number of bases consumed from the read sequence
173         uint _query_offset;
174 
175         this(Read read) {
176             _read = read;
177 
178             // find first M/=/X/D operation
179             auto cigar = _read.cigar;
180             for (_cur_op_index = 0; _cur_op_index < cigar.length; ++_cur_op_index) {
181                 _cur_op = cigar[_cur_op_index];
182                 if (_cur_op.is_reference_consuming) {
183                     if (_cur_op.type != 'N') {
184                         break;
185                     }
186                 } else if (_cur_op.is_query_consuming) {
187                     _query_offset += _cur_op.length; // skip S and I operations
188                 }
189             }
190 
191             assertCigarIndexIsValid();
192         }
193 
194         // move one base to the right on the reference
195         void incrementPosition() {
196             ++_cur_op_offset;
197 
198             // if current CIGAR operation is D or N, query offset is untouched
199             if (_cur_op.is_query_consuming) {
200                 ++_query_offset;
201             }
202 
203             if (_cur_op_offset >= _cur_op.length) {
204 
205                 _cur_op_offset = 0; // reset CIGAR operation offset
206 
207                 auto cigar = _read.cigar;
208                 // get next reference-consuming CIGAR operation (M/=/X/D/N)
209                 for (++_cur_op_index; _cur_op_index < cigar.length; ++_cur_op_index) {
210                     _cur_op = cigar[_cur_op_index];
211                     if (_cur_op.is_reference_consuming) {
212                         break;
213                     }
214 
215                     if (_cur_op.is_query_consuming) {
216                         _query_offset += _cur_op.length;
217                     }
218                 }
219 
220                 assertCigarIndexIsValid();
221             }
222         }
223 
224         void assertCigarIndexIsValid() {
225             assert(_cur_op_index < _read.cigar.length, "Invalid read " ~ _read.name
226                                                        ~ " - CIGAR " ~ _read.cigarString()
227                                                        ~ ", sequence " ~ to!string(_read.sequence));
228         }
229     }
230 }
231 
232 static assert(isBamRead!(PileupRead!BamRead));
233 //static assert(isBamRead!(PileupRead!(EagerBamRead!BamRead)));
234 
235 /// Represents a single pileup column
236 struct PileupColumn(R) {
237     private {
238         ulong _position;
239         int _ref_id = -1;
240         R _reads;
241         size_t _n_starting_here;
242     }
243 
244     /// Reference base. 'N' if not available.
245     char reference_base() @property const {
246         return _reference_base;
247     }
248 
249     private char _reference_base = 'N';
250 
251     /// Coverage at this position (equals to number of reads)
252     size_t coverage() const @property {
253         return _reads.length;
254     }
255 
256     /// Returns reference ID (-1 if unmapped)
257     int ref_id() const @property {
258         return _ref_id;
259     }
260 
261     /// Position on the reference
262     ulong position() const @property {
263         return _position;
264     }
265 
266     /// Reads overlapping the position, sorted by coordinate
267     auto reads() @property {
268         return assumeSorted!compareCoordinates(_reads[]);
269     }
270 
271     /// Reads that have leftmost mapped position at this column
272     auto reads_starting_here() @property {
273         return _reads[$ - _n_starting_here .. $];
274     }
275 
276     /// Shortcut for map!(read => read.current_base)(reads)
277     auto bases() @property {
278         return map!"a.current_base"(reads);
279     }
280 
281     /// Shortcut for map!(read => read.current_base_quality)(reads)
282     auto base_qualities() @property {
283         return map!"a.current_base_quality"(reads);
284     }
285 
286     /// Shortcut for map!(read => read.mapping_quality)(reads)
287     auto read_qualities() @property {
288         return map!"a.mapping_quality"(reads);
289     }
290 }
291 
292 /**
293  * The class for iterating reference bases together with reads overlapping them.
294  */
295 class PileupRange(R, alias TColumn=PileupColumn) {
296     alias Unqual!(ElementType!R) Raw;
297     alias EagerBamRead!Raw Eager;
298     alias PileupRead!Eager Read;
299     alias Read[] ReadArray;
300     alias TColumn!ReadArray Column;
301 
302     private {
303         R _reads;
304         Column _column;
305         Appender!ReadArray _read_buf;
306         bool _skip_zero_coverage;
307     }
308 
309     protected {
310         // This is extracted into a method not only to reduce duplication
311         // (not so much of it), but to allow to override it!
312         // For that reason it is not marked as final. Overhead of virtual
313         // function is negligible compared to computations in EagerBamRead
314         // constructor together with inserting new element into appender.
315         void add(ref Raw read) {
316             _read_buf.put(PileupRead!Eager(Eager(read)));
317         }
318     }
319 
320     /**
321      * Create new pileup iterator from a range of reads.
322      */
323     this(R reads, bool skip_zero_coverage) {
324         _reads = reads;
325         _read_buf = appender!ReadArray();
326         _skip_zero_coverage = skip_zero_coverage;
327 
328         if (!_reads.empty) {
329             initNewReference(); // C++ programmers, don't worry! Virtual tables in D
330                                 // are populated before constructor body is executed.
331         }
332     }
333 
334     /// Returns PileupColumn struct corresponding to the current position.
335     ref Column front() @property {
336         return _column;
337     }
338 
339     /// Whether all reads have been processed.
340     bool empty() @property {
341         return _reads.empty && _read_buf.data.empty;
342     }
343 
344     /// Move to next position on the reference.
345     void popFront() {
346         auto pos = ++_column._position;
347 
348         size_t survived = 0;
349         auto data = _read_buf.data;
350 
351         for (size_t i = 0; i < data.length; ++i) {
352             if (data[i].end_position > pos) {
353                 if (survived < i)
354                 {
355                     data[survived] = data[i];
356                 }
357                 ++survived;
358             }
359         }
360 
361         for (size_t i = 0; i < survived; ++i) {
362             data[i].incrementPosition();
363         }
364                                       // unless range is empty, this value is
365         _read_buf.shrinkTo(survived);
366 
367         _column._n_starting_here = 0; // updated either in initNewReference()
368                                       // or in the loop below
369 
370         if (!_reads.empty) {
371             if (_reads.front.ref_id != _column._ref_id &&
372                 survived == 0) // processed all reads aligned to the previous reference
373             {
374                 initNewReference();
375             } else {
376                 size_t n = 0;
377                 while (!_reads.empty &&
378                         _reads.front.position == pos &&
379                         _reads.front.ref_id == _column._ref_id)
380                 {
381                     auto read = _reads.front;
382                     add(read);
383                     _reads.popFront();
384                     ++n;
385                 }
386                 _column._n_starting_here = n;
387 
388                 // handle option of skipping sites with zero coverage
389                 if (survived == 0 && n == 0 && _skip_zero_coverage) {
390                     // the name might be misleading but it does the trick
391                     initNewReference();
392                 }
393             }
394         }
395 
396         _column._reads = _read_buf.data;
397     }
398 
399     protected void initNewReference() {
400         auto read = _reads.front;
401 
402         _column._position = read.position;
403         _column._ref_id = read.ref_id;
404         uint n = 1;
405         add(read);
406 
407         _reads.popFront();
408 
409         while (!_reads.empty) {
410             read = _reads.front;
411             if (read.ref_id == _column.ref_id &&
412                 read.position == _column._position)
413             {
414                 add(read);
415                 ++n;
416                 _reads.popFront();
417             } else {
418                 break;
419             }
420         }
421 
422         _column._n_starting_here = n;
423         _column._reads = _read_buf.data;
424     }
425 }
426 
427 /// Abstract pileup structure. S is type of column range.
428 struct AbstractPileup(R, S) {
429     private R reads_;
430     R reads() @property {
431         return reads_;
432     }
433 
434     S columns;
435     /// Pileup columns
436     alias columns this;
437 
438     private {
439         ulong _start_position;
440         ulong _end_position;
441         int _ref_id;
442     }
443 
444     /// $(D start_from) parameter provided to a pileup function
445     ulong start_position() @property const {
446         return _start_position;
447     }
448 
449     /// $(D end_at) parameter provided to a pileup function
450     ulong end_position() @property const {
451         return _end_position;
452     }
453 
454     /// Reference ID of all reads in this pileup.
455     int ref_id() @property const {
456         return _ref_id;
457     }
458 }
459 
460 struct TakeUntil(alias pred, Range, Sentinel) if (isInputRange!Range)
461 {
462     private Range _input;
463     private Sentinel _sentinel;
464     bool _done;
465 
466     this(Range input, Sentinel sentinel) {
467         _input = input; _sentinel = sentinel; _done = _input.empty || predSatisfied();
468     }
469 
470     @property bool empty() { return _done; }
471     @property auto ref front() { return _input.front; }
472     private bool predSatisfied() { return startsWith!pred(_input, _sentinel); }
473     void popFront() { _input.popFront(); _done = _input.empty || predSatisfied(); }
474 }
475 
476 auto takeUntil(alias pred, Range, Sentinel)(Range range, Sentinel sentinel) {
477     return TakeUntil!(pred, Range, Sentinel)(range, sentinel);
478 }
479 
480 auto pileupInstance(alias P, R)(R reads, ulong start_from, ulong end_at, bool skip_zero_coverage) {
481     auto rs = filter!"a.basesCovered() > 0"(reads);
482     while (!rs.empty) {
483         auto r = rs.front;
484         if (r.position + r.basesCovered() < start_from) {
485             rs.popFront();
486         } else {
487             break;
488         }
489     }
490     int ref_id = -1;
491     if (!rs.empty) {
492         ref_id = rs.front.ref_id;
493     }
494     auto sameref_rs = takeUntil!"a.ref_id != b"(rs, ref_id);
495     alias typeof(sameref_rs) ReadRange;
496     PileupRange!ReadRange columns = new P!ReadRange(sameref_rs, skip_zero_coverage);
497     while (!columns.empty) {
498         auto c = columns.front;
499         if (c.position < start_from) {
500             columns.popFront();
501         } else {
502             break;
503         }
504     }
505     auto chopped = takeUntil!"a.position >= b"(columns, end_at);
506     return AbstractPileup!(R, typeof(chopped))(reads, chopped, start_from, end_at, ref_id);
507 }
508 
509 auto pileupColumns(R)(R reads, bool use_md_tag=false, bool skip_zero_coverage=true) {
510     auto rs = filter!"a.basesCovered() > 0"(reads);
511     alias typeof(rs) ReadRange;
512     PileupRange!ReadRange columns;
513     if (use_md_tag) {
514         columns = new PileupRangeUsingMdTag!ReadRange(rs, skip_zero_coverage);
515     } else {
516         columns = new PileupRange!ReadRange(rs, skip_zero_coverage);
517     }
518     return columns;
519 }
520 
521 /// Tracks current reference base
522 final static class PileupRangeUsingMdTag(R) :
523     PileupRange!(R, PileupColumn)
524 {
525     // The code is similar to that in reconstruct.d but here we can't make
526     // an assumption about any particular read having non-zero length on reference.
527 
528     // current chunk of reference
529     private alias typeof(_column._reads[].front) Read;
530     private ReturnType!(dna!Read) _chunk;
531 
532     // end position of the current chunk on reference (assuming half-open interval)
533     private uint _chunk_end_position;
534 
535     // next read from which we will extract reference chunk
536     //
537     // More precisely,
538     // _next_chunk_provider = argmax (read => read.end_position)
539     //                 {reads overlapping current column}
540     private Read _next_chunk_provider;
541 
542     private bool _has_next_chunk_provider = false;
543 
544     // coverage at the previous location
545     private ulong _prev_coverage;
546 
547     // we also track current reference ID
548     private int _curr_ref_id = -1;
549 
550     ///
551     this(R reads, bool skip_zero_coverage) {
552         super(reads, skip_zero_coverage);
553     }
554 
555     alias Unqual!(ElementType!R) Raw;
556 
557     //  Checks length of the newly added read and tracks the read which
558     //  end position on the reference is the largest.
559     //
560     //  When reconstructed reference chunk will become empty, next one will be
561     //  constructed from that read. This algorithm allows to minimize the number
562     //  of reads for which MD tag will be decoded.
563     protected override void add(ref Raw read) {
564         // the behaviour depends on whether a new contig starts here or not
565         bool had_zero_coverage = _prev_coverage == 0;
566 
567         super.add(read);
568 
569         // get wrapped read
570         auto _read = _read_buf.data.back;
571 
572         // if we've just moved to another reference sequence, do the setup
573         if (_read.ref_id != _curr_ref_id) {
574             _curr_ref_id = _read.ref_id;
575 
576             _has_next_chunk_provider = true;
577             _next_chunk_provider = _read;
578             return;
579         }
580 
581         // two subsequent next_chunk_providers must overlap
582         // unless (!) there was a region with zero coverage in-between
583         if (_read.position > _chunk_end_position && !had_zero_coverage) {
584             return;
585         }
586 
587         // compare with previous candidate and replace if this one is better
588         if (_read.end_position > _chunk_end_position) {
589             if (!_has_next_chunk_provider) {
590                 _has_next_chunk_provider = true;
591                 _next_chunk_provider = _read;
592             } else if (_read.end_position > _next_chunk_provider.end_position) {
593                 _next_chunk_provider = _read;
594             }
595         }
596     }
597 
598     protected override void initNewReference() {
599         _prev_coverage = 0;
600         super.initNewReference();
601         if (_has_next_chunk_provider) {
602             // prepare first chunk
603             _chunk = dna(_next_chunk_provider);
604             _chunk_end_position = _next_chunk_provider.end_position;
605             _has_next_chunk_provider = false;
606             _column._reference_base = _chunk.front;
607             _chunk.popFront();
608         } else {
609             _column._reference_base = 'N';
610         }
611     }
612 
613     ///
614     override void popFront() {
615         if (!_chunk.empty) {
616             // update current reference base
617             _column._reference_base = _chunk.front;
618 
619             _chunk.popFront();
620         } else {
621             _column._reference_base = 'N';
622         }
623 
624         // update _prev_coverage
625         _prev_coverage = _column.coverage;
626 
627         // the order is important - maybe we will obtain new next_chunk_provider
628         // during this call to popFront()
629         super.popFront();
630 
631         // If we have consumed the whole current chunk,
632         // we need to obtain the next one if it's possible.
633         if (_chunk.empty && _has_next_chunk_provider) {
634             _chunk = dna(_next_chunk_provider);
635 
636             debug {
637             /*  import std.stdio;
638                 writeln();
639                 writeln("position: ", _next_chunk_provider.position);
640                 writeln("next chunk: ", to!string(_chunk));
641                 */
642             }
643 
644             _chunk_end_position = _next_chunk_provider.end_position;
645 
646             _has_next_chunk_provider = false;
647 
648             _chunk.popFrontN(cast(size_t)(_column.position - _next_chunk_provider.position));
649 
650             _column._reference_base = _chunk.front;
651             _chunk.popFront();
652         }
653     }
654 }
655 
656 /// Creates a pileup range from a range of reads.
657 /// Note that all reads must be aligned to the same reference.
658 ///
659 /// See $(D PileupColumn) documentation for description of range elements.
660 /// Note also that you can't use $(D std.array.array()) function on pileup
661 /// because it won't make deep copies of underlying data structure.
662 /// (One might argue that in this case it would be better to use opApply,
663 /// but typically one would use $(D std.algorithm.map) on pileup columns
664 /// to obtain some numeric characteristics.)
665 ///
666 /// Params:
667 ///     use_md_tag =  if true, use MD tag together with CIGAR
668 ///                   to recover reference bases
669 ///
670 ///     start_from =  position from which to start
671 ///
672 ///     end_at     =  position before which to stop
673 ///
674 /// $(BR)
675 /// That is, the range of positions is half-open interval
676 /// $(BR)
677 /// [max(start_from, first mapped read start position),
678 /// $(BR)
679 ///  min(end_at, last mapped end position among all reads))
680 ///
681 ///     skip_zero_coverage = if true, skip sites with zero coverage
682 ///
683 auto makePileup(R)(R reads,
684                    bool use_md_tag=false,
685                    ulong start_from=0,
686                    ulong end_at=ulong.max,
687                    bool skip_zero_coverage=true)
688 {
689     if (use_md_tag) {
690         return pileupInstance!PileupRangeUsingMdTag(reads, start_from, end_at, skip_zero_coverage);
691     } else {
692         return pileupInstance!PileupRange(reads, start_from, end_at, skip_zero_coverage);
693     }
694 }
695 
696 /// Allows to express the intention clearer.
697 enum useMD = true;
698 
699 unittest {
700     import std.algorithm;
701     import std.range;
702     import std.array;
703 
704     // the set of reads below was taken from 1000 Genomes BAM file
705     // NA20828.mapped.ILLUMINA.bwa.TSI.low_coverage.20101123.bam
706     // (region 20:1127810-1127819)
707     auto readnames = array(iota(10).map!(i => "r" ~ to!string(i))());
708 
709     auto sequences = ["ATTATGGACATTGTTTCCGTTATCATCATCATCATCATCATCATCATTATCATC",
710                       "GACATTGTTTCCGTTATCATCATCATCATCATCATCATCATCATCATCATCATC",
711                       "ATTGTTTCCGTTATCATCATCATCATCATCATCATCATCATCATCATCATCACC",
712                       "TGTTTCCGTTATCATCATCATCATCATCATCATCATCATCATCATCATCACCAC",
713                       "TCCGTTATCATCATCATCATCATCATCATCATCATCATCATCATCACCACCACC",
714                       "GTTATCATCATCATCATCATCATCATCATCATCATCATCATCATCGTCACCCTG",
715                       "TCATCATCATCATAATCATCATCATCATCATCATCATCGTCACCCTGTGTTGAG",
716                       "TCATCATCATCGTCACCCTGTGTTGAGGACAGAAGTAATTTCCCTTTCTTGGCT",
717                       "TCATCATCATCATCACCACCACCACCCTGTGTTGAGGACAGAAGTAATATCCCT",
718                       "CACCACCACCCTGTGTTGAGGACAGAAGTAATTTCCCTTTCTTGGCTGGTCACC"];
719 
720 // multiple sequence alignment:
721 //                                                            ***
722 // ATTATGGACATTGTTTCCGTTATCATCATCATCATCATCATCATCATTATCATC
723 //       GACATTGTTTCCGTTATCATCATCATCATCATCATCATCATCATCATCATCAT---C
724 //          ATTGTTTCCGTTATCATCATCATCATCATCATCATCATCATCATCATCATCACC
725 //            TGTTTCCGTTATCATCATCATCATCATCATCATCATCATCATCATCAT---CACCAC
726 //                TCCGTTATCATCATCATCATCATCATCATCATCATCATCATCAT---CACCACCACC
727 //                   GTTATCATCATCATCATCATCATCATCATCATCATCATCAT---CATCGTCACCCTG
728 //                            ATCATCATCATAATCATCATCATCATCATCAT---CATCGTCACCCTGTGTTGAG
729 //                                      TCATCATCATCGTCAC------------------CCTGTGTTGAGGACAGAAGTAATTTCCCTTTCTTGGCT
730 //                                               TCATCATCATCATCACCACCACCACCCTGTGTTGAGGACAGAAGTAATATCCCT
731 //                                                            ---CACCACCACCCTGTGTTGAGGACAGAAGTAATTTCCCTTTCTTGGCTGGTCACC
732 //   *         *         *         *         *         *            *         *        *         *
733 //  760       770       780       790       800       810          820       830      840       850
734 
735     auto cigars = [[CigarOperation(54, 'M')],
736                    [CigarOperation(54, 'M')],
737                    [CigarOperation(50, 'M'), CigarOperation(3, 'I'), CigarOperation(1, 'M')],
738                    [CigarOperation(54, 'M')],
739                    [CigarOperation(54, 'M')],
740                    [CigarOperation(54, 'M')],
741                    [CigarOperation(2, 'S'), CigarOperation(52, 'M')],
742                    [CigarOperation(16, 'M'), CigarOperation(15, 'D'), CigarOperation(38, 'M')],
743                    [CigarOperation(13, 'M'), CigarOperation(3, 'I'), CigarOperation(38, 'M')],
744                    [CigarOperation(54, 'M')]];
745 
746     auto positions = [758, 764, 767, 769, 773, 776, 785, 795, 804, 817];
747 
748     auto md_tags = ["47C6", "54", "51", "50T3", "46T7", "45A0C7", "11C24A0C14",
749                     "11A3T0^CATCATCATCACCAC38", "15T29T5", "2T45T5"];
750 
751     BamRead[] reads = new BamRead[10];
752 
753     foreach (i; iota(10)) {
754         reads[i] = BamRead(readnames[i], sequences[i], cigars[i]);
755         reads[i].position = positions[i];
756         reads[i].ref_id = 0;
757         reads[i]["MD"] = md_tags[i];
758     }
759 
760     auto first_read_position = reads.front.position;
761     auto reference = to!string(dna(reads));
762 
763     import std.stdio;
764     // stderr.writeln("Testing pileup (low-level aspects)...");
765 
766     auto pileup = makePileup(reads, true, 796, 849, false);
767     auto pileup2 = makePileup(reads, true, 0, ulong.max, false);
768     assert(pileup.front.position == 796);
769     assert(pileup.start_position == 796);
770     assert(pileup.end_position == 849);
771 
772     while (pileup2.front.position != 796) {
773         pileup2.popFront();
774     }
775 
776     while (!pileup.empty) {
777         auto column = pileup.front;
778         auto column2 = pileup2.front;
779         assert(column.coverage == column2.coverage);
780         pileup2.popFront();
781 
782         // check that DNA is built correctly from MD tags and CIGAR
783         assert(column.reference_base == reference[cast(size_t)(column.position - first_read_position)]);
784 
785         switch (column.position) {
786             case 796:
787                 assert(equal(column.bases, "CCCCCCAC"));
788                 pileup.popFront();
789                 break;
790             case 805:
791                 assert(equal(column.bases, "TCCCCCCCC"));
792                 pileup.popFront();
793                 break;
794             case 806:
795                 assert(equal(column.bases, "AAAAAAAGA"));
796                 pileup.popFront();
797                 break;
798             case 810:
799                 // last read is not yet fetched by pileup engine
800                 assert(column.reads[column.coverage - 2].cigar_after.front.type == 'D');
801                 pileup.popFront();
802                 break;
803             case 817:
804                 assert(column.reads[column.coverage - 2].cigar_before.back.type == 'I');
805                 pileup.popFront();
806                 break;
807             case 821:
808                 assert(column.reads[column.coverage - 3].cigar_operation.type == 'D');
809                 assert(equal(column.bases, "AAGG-AA"));
810                 pileup.popFront();
811                 break;
812             case 826:
813                 assert(equal(column.bases, "CCCCCC"));
814                 pileup.popFront();
815                 break;
816             case 849:
817                 assert(equal(column.bases, "TAT"));
818                 pileup.popFront();
819                 assert(pileup.empty);
820                 break;
821             default:
822                 pileup.popFront();
823                 break;
824         }
825     }
826 
827     // another set of reads, the same file, region 20:12360032-12360050
828     // test the case when reference has some places with zero coverage
829 
830     reads = [BamRead("r1", "CCCACATAGAAAGCTTGCTGTTTCTCTGTGGGAAGTTTTAACTTAGGTCAGCTT",
831                        [CigarOperation(54, 'M')]),
832              BamRead("r2", "TAGAAAGCTTGCTGTTTCTCTGTGGGAAGTTTTAACTTAGGTTAGCTTCATCTA",
833                        [CigarOperation(54, 'M')]),
834              BamRead("r3", "TTTTTCTTTCTTTCTTTGAAGAAGGCAGATTCCTGGTCCTGCCACTCAAATTTT",
835                        [CigarOperation(54, 'M')]),
836              BamRead("r4", "TTTCTTTCTTTCTTTGAAGAAGGCAGATTCCTGGTCCTGCCACTCAAATTTTCA",
837                        [CigarOperation(54, 'M')])];
838 
839     reads[0].position = 979;
840     reads[0]["MD"] = "54";
841     reads[0].ref_id = 0;
842 
843     reads[1].position = 985;
844     reads[1]["MD"] = "42C7C3";
845     reads[1].ref_id = 0;
846 
847     reads[2].position = 1046;
848     reads[2]["MD"] = "54";
849     reads[2].ref_id = 0;
850 
851     reads[3].position = 1048;
852     reads[3]["MD"] = "54";
853     reads[3].ref_id = 0;
854 
855     assert(equal(dna(reads),
856                  map!(c => c.reference_base)(makePileup(reads, true, 0, ulong.max, false))));
857 }
858 
859 struct PileupChunkRange(C) {
860     private C _chunks;
861     private ElementType!C _prev_chunk;
862     private ElementType!C _current_chunk;
863     private bool _empty;
864     private ulong _beg = 0;
865     private bool _use_md_tag;
866     private ulong _start_from;
867     private ulong _end_at;
868     private int _chunk_right_end;
869 
870     private int computeRightEnd(ref ElementType!C chunk) {
871         return chunk.map!(r => r.position + r.basesCovered()).reduce!max;
872     }
873 
874     this(C chunks, bool use_md_tag, ulong start_from, ulong end_at) {
875         _chunks = chunks;
876         _use_md_tag = use_md_tag;
877         _start_from = start_from;
878         _end_at = end_at;
879         while (true) {
880             if (_chunks.empty) {
881                 _empty = true;
882             } else {
883                 _current_chunk = _chunks.front;
884                 _chunks.popFront();
885 
886                 if (_current_chunk[0].ref_id < 0) continue;
887 
888                 _beg = _current_chunk[0].position;
889                 if (_beg >= end_at) {
890                     _empty = true;
891                     break;
892                 }
893 
894                 _chunk_right_end = computeRightEnd(_current_chunk);
895                 if (_chunk_right_end > start_from)
896                     break;
897             }
898         }
899     }
900 
901     bool empty() @property {
902         return _empty;
903     }
904 
905     auto front() @property {
906         auto end_pos = _current_chunk[$-1].position;
907         if (_chunks.empty || _chunks.front[0].ref_id != _current_chunk[$-1].ref_id)
908             end_pos = _chunk_right_end;
909 
910         return makePileup(chain(_prev_chunk, _current_chunk),
911                           _use_md_tag,
912                           max(_beg, _start_from), min(end_pos, _end_at));
913     }
914 
915     void popFront() {
916         _prev_chunk = _current_chunk;
917 
918         while (true) {
919             if (_chunks.empty) {
920                 _empty = true;
921                 return;
922             }
923             _current_chunk = _chunks.front;
924             _chunks.popFront();
925 
926             if (_current_chunk[0].ref_id >= 0) break;
927         }
928 
929         _chunk_right_end = computeRightEnd(_current_chunk);
930 
931         // if we changed reference, nullify prev_chunk
932         if (_prev_chunk.length > 0 &&
933             _prev_chunk[$ - 1].ref_id == _current_chunk[0].ref_id)
934         {
935             _beg = _prev_chunk[$-1].position;
936         } else {
937             _beg = _current_chunk[0].position;
938             _prev_chunk.length = 0;
939         }
940 
941         // keep only those reads in _prev_chunk that have overlap with the last one
942 
943         // 1) estimate read length
944         enum sampleSize = 15;
945         int[sampleSize] buf = void;
946         int read_length = void;
947         if (_prev_chunk.length <= sampleSize) {
948             for (size_t k = 0; k < _prev_chunk.length; ++k) {
949                 buf[k] = _prev_chunk[k].sequence_length;
950             }
951             topN(buf[0.._prev_chunk.length], _prev_chunk.length / 2);
952             read_length = buf[_prev_chunk.length / 2];
953         } else {
954             size_t i = 0;
955             foreach (read; randomSample(_prev_chunk, sampleSize))
956                 buf[i++] = read.sequence_length;
957             topN(buf[], sampleSize / 2);
958             read_length = buf[sampleSize / 2];
959             debug {
960                 import std.stdio;
961                 stderr.writeln("[pileupChunks] read_length=", read_length);
962             }
963         }
964 
965         // 2) do binary search for those reads that start from (_beg - 2 * read_length)
966         //    (it's an experimental fact that almost none of reads consumes that much
967         //     on a reference sequence)
968         auto pos = _beg - 2 * read_length;
969         long i = 0;
970         long j = _prev_chunk.length - 1;
971         // positions of _prev_chunk[0 .. i] are less than pos,
972         // positions of _prev_chunk[j + 1 .. $] are more or equal to pos.
973 
974         while (i <= j) {
975             auto m = cast(size_t)(i + j) / 2;
976             assert(m < _prev_chunk.length);
977             auto p = _prev_chunk[m].position;
978             if (p >= pos) {
979                 j = m - 1;
980             } else {
981                 i = m + 1;
982             }
983         }
984 
985         _prev_chunk = _prev_chunk[cast(size_t)i .. $];
986     }
987 }
988 
989 /// This function constructs range of non-overlapping consecutive pileups from a range of reads
990 /// so that these pileups can be processed in parallel.
991 ///
992 /// It's allowed to pass ranges of sorted reads with different ref. IDs,
993 /// they won't get mixed in any chunk.
994 ///
995 /// Params:
996 ///   use_md_tag =   recover reference bases from MD tag and CIGAR
997 ///
998 ///   block_size =   approximate amount of memory that each pileup will consume,
999 ///                  given in bytes. (Usually consumption will be a bit higher.)
1000 ///
1001 ///   start_from =   position of the first column of the first chunk
1002 ///
1003 ///   end_at     =   position after the last column of the last chunk
1004 ///
1005 /// $(BR)
1006 /// WARNING:     block size should be big enough so that every block will share
1007 ///              some reads only with adjacent blocks.
1008 ///              $(BR)
1009 ///              As such, it is not recommended to reduce the $(I block_size).
1010 ///              But there might be a need to increase it in case of very high coverage.
1011 auto pileupChunks(R)(R reads, bool use_md_tag=false, size_t block_size=16_384_000,
1012                      ulong start_from=0, ulong end_at=ulong.max) {
1013     auto chunks = chunksConsumingLessThan(reads, block_size);
1014     return PileupChunkRange!(typeof(chunks))(chunks, use_md_tag, start_from, end_at);
1015 }