1 /*
2     This file is part of BioD.
3     Copyright (C) 2012-2013    Artem Tarasov <lomereiter@gmail.com>
4 
5     Permission is hereby granted, free of charge, to any person obtaining a
6     copy of this software and associated documentation files (the "Software"),
7     to deal in the Software without restriction, including without limitation
8     the rights to use, copy, modify, merge, publish, distribute, sublicense,
9     and/or sell copies of the Software, and to permit persons to whom the
10     Software is furnished to do so, subject to the following conditions:
11 
12     The above copyright notice and this permission notice shall be included in
13     all copies or substantial portions of the Software.
14 
15     THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16     IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17     FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18     AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19     LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20     FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21     DEALINGS IN THE SOFTWARE.
22 
23 */
24 /// Writing a script/tool for processing BAM data often starts this way:
25 ///
26 /// ------------------------
27 /// import bio.std.hts.bam.reader;
28 ///
29 /// void main(string[] args) {
30 ///     auto bam = new BamReader(args[1]); // open BAM file
31 ///     foreach (read; bam.reads) {        // iterate through its reads
32 ///         if (read.is_unmapped)
33 ///             continue;                  // maybe skip unmapped ones
34 ///         ...
35 ///     }
36 /// }
37 /// ------------------------
38 ///
39 /// Or, if a specific interval on the reference sequence is to be explored:
40 /// ------------------------
41 /// import bio.std.hts.bam.pileup;
42 /// ...
43 /// auto reads = bam["chr7"][50_000 .. 60_000]; // BAI index is required
44 /// foreach (column; makePileup(reads)) { ... } // see $(PMODULE pileup) docs
45 /// ------------------------
46 module bio.std.hts.bam.reader;
47 
48 import bio.std.hts.bam.abstractreader;
49 public import bio.std.hts.sam.header;
50 public import bio.std.hts.bam.reference;
51 public import bio.std.hts.bam.region;
52 public import bio.std.hts.bam.read;
53 public import bio.std.hts.bam.tagvalue;
54 public import bio.std.hts.bam.readrange;
55 
56 import bio.std.hts.bam.randomaccessmanager;
57 import bio.std.hts.bam.baifile;
58 import bio.std.hts.bam.bai.indexing;
59 
60 import bio.core.utils.range;
61 import bio.core.utils.stream;
62 import bio.core.bgzf.inputstream;
63 public import bio.core.bgzf.virtualoffset;
64 
65 import std.system;
66 import std.stdio;
67 import std.algorithm;
68 import std.range;
69 import std.conv;
70 import std.exception;
71 import std.parallelism;
72 import std.array;
73 import core.stdc.stdio;
74 import std..string;
75 import std.stdio;
76 
77 /**
78   BAM file reader, featuring parallel decompression of BGZF blocks.
79  */
80 class BamReader : IBamSamReader {
81 
82     /**
83       Creates reader associated with file or stream.
84       (If stream constructor is used, no random access is possible.)
85       $(BR)
86       Optionally, task pool can be specified.
87       It will be used to unpack BGZF blocks in parallel.
88 
89       Example:
90       -------------------------------------------
91       import std.parallelism, bio.std.hts.bam.reader;
92       void main() {
93         auto pool = new TaskPool(4); // use 4 threads
94         scope (exit) pool.finish();  // don't forget!
95         auto bam = new BamReader("file.bam", pool);
96         ...
97       }
98       -------------------------------------------
99      */
100     this(contrib.undead.stream.Stream stream,
101          TaskPool task_pool = taskPool) {
102         _source_stream = new EndianStream(stream, Endian.littleEndian);
103         _task_pool = task_pool;
104 
105         if (stream.seekable) {
106             _stream_is_seekable = true;
107         }
108 
109         initializeStreams();
110 
111         auto magic = _bam.readString(4);
112 
113         enforce(magic == "BAM\1", "Invalid file format: expected BAM\\1");
114 
115         readSamHeader();
116         readReferenceSequencesInfo();
117 
118         // right after construction, we are at the beginning
119         //                           of the list of reads
120 
121         if (_stream_is_seekable) {
122             _reads_start_voffset = _decompressed_stream.virtualTell();
123         }
124     }
125 
126     /// ditto
127     this(string filename, std.parallelism.TaskPool task_pool) {
128 
129         _filename = filename;
130         _source_stream = getNativeEndianSourceStream();
131         this(_source_stream, task_pool);
132     }
133 
134 
135     /// ditto
136     this(string filename) {
137       this(filename, std.parallelism.taskPool);
138     }
139 
140     /**
141       True if BAI file was found for this BAM file.
142       This is necessary for any random-access operations.
143       $(BR)
144       Looks for files in the same directory which filename
145       is either the file name of BAM file with '.bai' appended,
146       or with the last extension replaced with '.bai'
147       (that is, for $(I file.bam) paths $(I file.bai) and
148       $(I file.bam.bai) will be checked)
149      */
150     bool has_index() @property {
151         return _random_access_manager.found_index_file;
152     }
153 
154     /**
155       Creates BAI file. If $(I overwrite) is false, it won't touch
156       existing index if it is already found.
157      */
158     void createIndex(bool overwrite = false) {
159         if (has_index && !overwrite)
160             return;
161         Stream stream = new BufferedFile(filename ~ ".bai", FileMode.OutNew);
162         scope(exit) stream.close();
163         bio.std.hts.bam.bai.indexing.createIndex(this, stream);
164         _bai_status = BaiStatus.notInitialized;
165         _rndaccssmgr = null;
166     }
167 
168     /** Filename, if the object was created via file name constructor,
169         $(D null) otherwise.
170      */
171     string filename() @property const {
172         return _filename;
173     }
174 
175     /// If file ends with EOF block, returns virtual offset of the start of EOF block.
176     /// Otherwise, returns virtual offset of the physical end of file.
177     bio.core.bgzf.virtualoffset.VirtualOffset eofVirtualOffset() {
178         return _random_access_manager.eofVirtualOffset();
179     }
180 
181     /// Get BGZF block at a given file offset.
182     bio.core.bgzf.block.BgzfBlock getBgzfBlockAt(ulong offset) {
183         return _random_access_manager.getBgzfBlockAt(offset);
184     }
185 
186     /**
187       Returns: SAM header of the BAM file
188      */
189     bio.std.hts.sam.header.SamHeader header() @property {
190         if (_header is null) {
191             synchronized {
192                 if (_header is null) {
193                     _header = new SamHeader(_headertext);
194                     _headertext = null;
195                 }
196             }
197         }
198         return _header;
199     }
200 
201     /**
202         Returns: information about reference sequences
203      */
204     const(bio.std.hts.bam.referenceinfo.ReferenceSequenceInfo)[] reference_sequences() @property const nothrow {
205         return _reference_sequences;
206     }
207 
208     /**
209         Range of all alignment records in the file.
210         $(BR)
211         Element type of the returned range depends on the policy.
212         Default one is $(DPREF2 bam, readrange, withoutOffsets),
213         in this case range element type is $(DPREF2 bam, read, BamRead).
214         $(BR)
215         The other option is $(DPREF2 bam, readrange, withOffsets),
216         which allows to track read virtual offsets in the file.
217         In this case range element type is $(DPREF2 bam, readrange, BamReadBlock).
218 
219         Example:
220         ----------------------------------
221         import bio.std.hts.bam.readrange;
222         ...
223         auto bam = new BamReader("file.bam");
224         auto reads = bam.reads!withOffsets();
225         writeln(reads.front.start_virtual_offset);
226         ----------------------------------
227      */
228     auto reads(alias IteratePolicy=bio.std.hts.bam.readrange.withoutOffsets)() @property {
229         auto _decompressed_stream = getDecompressedBamReadStream();
230         return bamReadRange!IteratePolicy(_decompressed_stream, this);
231     }
232 
233     static struct ReadsWithProgressResult(alias IteratePolicy, R, S) {
234         this(R range, S stream,
235              void delegate(lazy float p) progressBarFunc,
236              void delegate() finishFunc)
237         {
238             _range = range;
239             _stream = stream;
240             _progress_bar_func = progressBarFunc;
241             _finish_func = finishFunc;
242         }
243 
244         static if (__traits(identifier, IteratePolicy) == "withOffsets") {
245             auto front() @property {
246                 return _range.front;
247             }
248         } else static if (__traits(identifier, IteratePolicy) == "withoutOffsets") {
249             auto front() @property {
250                 return _range.front.read;
251             }
252         } else static assert(0, __traits(identifier, IteratePolicy));
253 
254         bool empty() @property {
255             auto result = _range.empty;
256             if (_finish_func !is null && !_called_finish_func && result) {
257                 _called_finish_func = true;
258                 _finish_func();
259             }
260             return result;
261         }
262 
263         void popFront() {
264             _bytes_read += _range.front.read.size_in_bytes;
265             _range.popFront();
266             if (_progress_bar_func !is null) {
267                 _progress_bar_func(min(1.0,
268                     cast(float)_bytes_read / (_stream.total_compressed_size *
269                                               _stream.average_compression_ratio)));
270             }
271         }
272 
273         private R _range;
274         private S _stream;
275         private size_t _bytes_read;
276         private void delegate(lazy float p) _progress_bar_func;
277         private void delegate() _finish_func;
278         private bool _called_finish_func = false;
279     }
280 
281     /**
282         Returns: range of all reads in the file, calling $(I progressBarFunc)
283                  for each read.
284         $(BR)
285         $(I progressBarFunc) will be called
286         each time next alignment is read, with the argument being a number from [0.0, 1.0],
287         which is estimated progress percentage.
288         $(BR)
289         Notice that $(I progressBarFunc) takes $(D lazy) argument,
290         so that the number of relatively expensive float division operations
291         can be controlled by user.
292 
293         Once the iteration is finished (call to $(D empty) returned true),
294         $(I finishFunc) will be called if provided.
295 
296         Example:
297         ------------------------------------
298         import std.functional, std.stdio, bio.std.hts.bam.reader;
299         void progress(lazy float p) {
300             static uint n;
301             if (++n % 63 == 0) writeln(p); // prints progress after every 63 records
302         }
303         ...
304         foreach (read; bam.readsWithProgress(toDelegate(&progress))) {
305             ...
306         }
307         ------------------------------------
308     */
309     auto readsWithProgress(alias IteratePolicy=bio.std.hts.bam.readrange.withoutOffsets)
310         (void delegate(lazy float p) progressBarFunc,
311          void delegate() finishFunc=null)
312     {
313         auto _decompressed_stream = getDecompressedBamReadStream();
314         auto reads_with_offsets = bamReadRange!withOffsets(_decompressed_stream, this);
315 
316         alias ReadsWithProgressResult!(IteratePolicy,
317                        typeof(reads_with_offsets), BgzfInputStream) Result;
318 
319         return Result(reads_with_offsets, _decompressed_stream,
320                       progressBarFunc, finishFunc);
321     }
322 
323     ///
324     void assumeSequentialProcessing() {
325         _seqprocmode = true;
326     }
327 
328     /// Part of IBamSamReader interface
329     std.range.InputRange!(bio.std.hts.bam.read.BamRead) allReads() @property {
330         return inputRangeObject(reads!withoutOffsets());
331     }
332 
333     /**
334       Returns: the read which starts at a given virtual offset.
335      */
336     bio.std.hts.bam.read.BamRead getReadAt(bio.core.bgzf.virtualoffset.VirtualOffset offset) {
337         enforce(_random_access_manager !is null);
338         return _random_access_manager.getReadAt(offset);
339     }
340 
341     /**
342       Returns: all reads located between two virtual offsets in the BAM file.
343 
344       $(BR)
345       First offset must point to the start of an alignment record,
346       and be strictly less than the second one.
347       $(BR)
348       For decompression, the task pool specified at the construction is used.
349      */
350     auto getReadsBetween(bio.core.bgzf.virtualoffset.VirtualOffset from,
351                          bio.core.bgzf.virtualoffset.VirtualOffset to) {
352         enforce(from <= to, "First offset must be less than second");
353         enforce(_stream_is_seekable, "Stream is not seekable");
354 
355         return _random_access_manager.getReadsBetween(from, to);
356     }
357 
358     /**
359       Returns: all reads overlapping any region from a set.
360     */
361     auto getReadsOverlapping(BamRegion[] regions) {
362 	return _random_access_manager.getReads(regions);
363     }
364 
365     /**
366       Unmapped reads, i.e. reads at the end of file whose reference id is -1.
367       The file must be coordinate-sorted and indexed.
368      */
369     auto unmappedReads() {
370         enforce(_random_access_manager !is null);
371         auto bai = _random_access_manager.getBai();
372 
373         VirtualOffset start;
374         start = eofVirtualOffset();
375 
376         auto all_reads = this.reads();
377         if (!all_reads.empty && all_reads.front.ref_id == -1)
378             start = _reads_start_voffset;
379 
380         auto ioffsets = bai.indices[0 .. reference_sequences.length].retro()
381                            .map!(index => index.ioffsets.retro()).joiner();
382         if (!ioffsets.empty)
383             start = ioffsets.front;
384 
385         auto stream = _random_access_manager.createStreamStartingFrom(start);
386         auto r = bamReadRange!withOffsets(stream, this);
387         while (!r.empty && r.front.ref_id != -1)
388             r.popFront();
389         return r;
390     }
391 
392     /**
393       Get BAI chunks containing all reads that overlap specified region.
394       For $(I ref_id) = -1, use $(D unmappedReads) method.
395      */
396     bio.core.bgzf.chunk.Chunk[] getChunks(uint ref_id, int beg, int end) {
397         enforce(_random_access_manager !is null);
398         enforce(beg < end);
399 
400         return _random_access_manager.getChunks(BamRegion(ref_id, beg, end));
401     }
402 
403     /**
404       Returns reference sequence with id $(I ref_id).
405      */
406     bio.std.hts.bam.reference.ReferenceSequence reference(int ref_id) {
407         enforce(ref_id < _reference_sequences.length, "Invalid reference index");
408         return ReferenceSequence(_random_access_manager,
409                                  ref_id,
410                                  _reference_sequences[ref_id]);
411     }
412 
413     /**
414       Returns reference sequence named $(I ref_name).
415 
416       Example:
417       ---------------------------
418       import std.stdio, bio.std.hts.bam.reader;
419       ...
420       auto bam = new BamReader("file.bam");
421       writeln(bam["chr2"].length);
422       ---------------------------
423      */
424     bio.std.hts.bam.reference.ReferenceSequence opIndex(string ref_name) {
425         enforce(hasReference(ref_name), "Reference with name " ~ ref_name ~ " does not exist");
426         auto ref_id = _reference_sequence_dict[ref_name];
427         return reference(ref_id);
428     }
429 
430     /**
431       Check if reference named $(I ref_name) is presented in BAM header.
432      */
433     bool hasReference(string ref_name) {
434         return null != (ref_name in _reference_sequence_dict);
435     }
436 
437     /**
438       Set buffer size for I/O operations. Values less than 4096 are disallowed.
439       $(BR)
440       This can help in multithreaded applications when several files are read
441       simultaneously (e.g. merging).
442      */
443     void setBufferSize(size_t buffer_size) {
444         enforce(buffer_size >= 4096, "Buffer size must be >= 4096 bytes");
445         _buffer_size = buffer_size;
446         _random_access_manager.setBufferSize(buffer_size);
447     }
448 
449     package bool _seqprocmode; // available for bio.std.hts.bam.readrange;
450 
451 private:
452 
453     string _filename;                       // filename (if available)
454     Stream _source_stream;                  // compressed
455     BgzfInputStream _decompressed_stream;   // decompressed
456     Stream _bam;                            // decompressed + endian conversion
457     bool _stream_is_seekable;
458 
459     // Virtual offset at which alignment records start.
460     VirtualOffset _reads_start_voffset;
461 
462     BaiFile _dont_access_me_directly_use_bai_file_for_that;
463     enum BaiStatus {
464         notInitialized,
465         initialized,
466         fileNotFound
467     }
468     BaiStatus _bai_status = BaiStatus.notInitialized;
469 
470     void initBai() {
471         if (_bai_status == BaiStatus.notInitialized) {
472             synchronized {
473                 try {
474                     _dont_access_me_directly_use_bai_file_for_that = BaiFile(_filename);
475                     _bai_status = BaiStatus.initialized;
476                 } catch (Exception e) {
477                     _bai_status = BaiStatus.fileNotFound;
478                 }
479             }
480         }
481     }
482 
483     // provides access to index file
484     @property ref BaiFile _bai_file() { // initialized lazily
485         initBai();
486         return _dont_access_me_directly_use_bai_file_for_that;
487     };
488 
489     RandomAccessManager _rndaccssmgr; // unreadable for a purpose
490     @property RandomAccessManager _random_access_manager() {
491         if (_rndaccssmgr is null) {
492             synchronized {
493                 initBai();
494 
495                 if (_bai_status == BaiStatus.initialized) {
496                     _rndaccssmgr = new RandomAccessManager(this, _bai_file);
497                 } else {
498                     _rndaccssmgr = new RandomAccessManager(this);
499                 }
500 
501                 _rndaccssmgr.setTaskPool(_task_pool);
502                 _rndaccssmgr.setBufferSize(_buffer_size);
503             }
504         }
505         return _rndaccssmgr;
506     }
507 
508     SamHeader _header;
509     string _headertext; // for lazy SAM header parsing
510     ReferenceSequenceInfo[] _reference_sequences;
511     int[string] _reference_sequence_dict; /// name -> index mapping
512 
513     TaskPool _task_pool;
514     size_t _buffer_size = 4096; // buffer size to be used for I/O
515 
516     Stream getNativeEndianSourceStream() {
517         assert(_filename !is null);
518         Stream file = new bio.core.utils.stream.File(_filename);
519         return new BufferedStream(file, _buffer_size);
520     }
521 
522     Stream getSeekableCompressedStream() {
523         if (_stream_is_seekable) {
524             if (_filename !is null) {
525                 auto file = getNativeEndianSourceStream();
526                 version(development)
527                 {
528                     std.stdio.stderr.writeln("[info] file size: ", file.size);
529                 }
530                 return new EndianStream(file, Endian.littleEndian);
531             } else {
532                 _source_stream.seekSet(0);
533                 return _source_stream;
534             }
535         } else {
536             return null;
537         }
538     }
539 
540     // get decompressed stream out of compressed BAM file
541     BgzfInputStream getDecompressedStream() {
542         auto compressed_stream = getSeekableCompressedStream();
543 
544         auto block_supplier = new StreamSupplier(compressed_stream is null ?
545                                                  _source_stream :
546                                                  compressed_stream);
547 
548         return new BgzfInputStream(block_supplier, _task_pool,
549                                    _buffer_size);
550     }
551 
552     // get decompressed stream starting from the first alignment record
553     BgzfInputStream getDecompressedBamReadStream() {
554         auto compressed_stream = getSeekableCompressedStream();
555 
556         if (compressed_stream !is null) {
557             enforce(_reads_start_voffset != 0UL);
558 
559             compressed_stream.seekCur(_reads_start_voffset.coffset);
560             auto block_supplier = new StreamSupplier(compressed_stream);
561             auto stream = new BgzfInputStream(block_supplier, _task_pool,
562                                               _buffer_size);
563             stream.readString(_reads_start_voffset.uoffset);
564             return stream;
565         } else {
566             // must be initialized in initializeStreams()
567             return _decompressed_stream;
568         }
569     }
570 
571     // sets up the streams and ranges
572     void initializeStreams() {
573 
574         _decompressed_stream = getDecompressedStream();
575         _bam = new EndianStream(_decompressed_stream, Endian.littleEndian);
576     }
577 
578     // initializes _header
579     void readSamHeader() {
580         int l_text;
581         _bam.read(l_text);
582 
583         _headertext = cast(string)(_bam.readString(l_text));
584     }
585 
586     // initialize _reference_sequences
587     void readReferenceSequencesInfo() {
588         int n_ref;
589         _bam.read(n_ref);
590         _reference_sequences = new ReferenceSequenceInfo[n_ref];
591         foreach (i; 0..n_ref) {
592             _reference_sequences[i] = ReferenceSequenceInfo(_bam);
593 
594             // provide mapping Name -> Index
595             _reference_sequence_dict[_reference_sequences[i].name] = i;
596         }
597     }
598 }