1 /*
2     This file is part of BioD.
3     Copyright (C) 2012-2013    Artem Tarasov <lomereiter@gmail.com>
4 
5     Permission is hereby granted, free of charge, to any person obtaining a
6     copy of this software and associated documentation files (the "Software"),
7     to deal in the Software without restriction, including without limitation
8     the rights to use, copy, modify, merge, publish, distribute, sublicense,
9     and/or sell copies of the Software, and to permit persons to whom the
10     Software is furnished to do so, subject to the following conditions:
11     
12     The above copyright notice and this permission notice shall be included in
13     all copies or substantial portions of the Software.
14     
15     THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16     IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17     FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18     AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19     LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20     FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21     DEALINGS IN THE SOFTWARE.
22 
23 */
24 /// Writing a script/tool for processing BAM data often starts this way:
25 /// 
26 /// ------------------------
27 /// import bio.bam.reader;
28 ///
29 /// void main(string[] args) {
30 ///     auto bam = new BamReader(args[1]); // open BAM file
31 ///     foreach (read; bam.reads) {        // iterate through its reads
32 ///         if (read.is_unmapped)
33 ///             continue;                  // maybe skip unmapped ones
34 ///         ...
35 ///     }
36 /// }
37 /// ------------------------
38 /// 
39 /// Or, if a specific interval on the reference sequence is to be explored:
40 /// ------------------------
41 /// import bio.bam.pileup;
42 /// ...
43 /// auto reads = bam["chr7"][50_000 .. 60_000]; // BAI index is required
44 /// foreach (column; makePileup(reads)) { ... } // see $(PMODULE pileup) docs
45 /// ------------------------
46 module bio.bam.reader;
47 
48 import bio.bam.abstractreader;
49 public import bio.sam.header;
50 public import bio.bam.reference;
51 public import bio.bam.region;
52 public import bio.bam.read;
53 public import bio.bam.tagvalue;
54 public import bio.bam.readrange;
55 import bio.bam.randomaccessmanager;
56 import bio.bam.baifile;
57 import bio.bam.bai.indexing;
58 import bio.core.utils.range;
59 import bio.core.utils.stream;
60 import bio.core.bgzf.inputstream;
61 public import bio.core.bgzf.virtualoffset;
62 
63 import std.system;
64 import std.stdio;
65 import std.algorithm;
66 import std.range;
67 import std.conv;
68 import std.exception;
69 import std.parallelism;
70 import std.array;
71 import core.stdc.stdio;
72 import std.string;
73 
74 /**
75   BAM file reader, featuring parallel decompression of BGZF blocks.
76  */
77 class BamReader : IBamSamReader {
78 
79     /**
80       Creates reader associated with file or stream.
81       (If stream constructor is used, no random access is possible.)
82       $(BR)
83       Optionally, task pool can be specified. 
84       It will be used to unpack BGZF blocks in parallel.
85 
86       Example:
87       -------------------------------------------
88       import std.parallelism, bio.bam.reader;
89       void main() {
90         auto pool = new TaskPool(4); // use 4 threads
91         scope (exit) pool.finish();  // don't forget!
92         auto bam = new BamReader("file.bam", pool);
93         ...
94       }
95       -------------------------------------------
96      */
97     this(undead.stream.Stream stream,
98          TaskPool task_pool = taskPool) {
99         _source_stream = new EndianStream(stream, Endian.littleEndian);
100         _task_pool = task_pool;
101 
102         if (stream.seekable) {
103             _stream_is_seekable = true;
104         }
105 
106         initializeStreams();
107 
108         auto magic = _bam.readString(4);
109         
110         enforce(magic == "BAM\1", "Invalid file format: expected BAM\\1");
111 
112         readSamHeader();
113         readReferenceSequencesInfo();
114 
115         // right after construction, we are at the beginning
116         //                           of the list of reads
117 
118         if (_stream_is_seekable) {
119             _reads_start_voffset = _decompressed_stream.virtualTell();
120         }
121     }
122 
123     /// ditto
124     this(string filename, std.parallelism.TaskPool task_pool) {
125 
126         _filename = filename;
127         _source_stream = getNativeEndianSourceStream();
128         this(_source_stream, task_pool);
129     }
130 
131     /// ditto
132     this(string filename) {
133         this(filename, std.parallelism.taskPool);
134     }
135  
136     /**
137       True if BAI file was found for this BAM file.
138       This is necessary for any random-access operations.
139       $(BR)
140       Looks for files in the same directory which filename
141       is either the file name of BAM file with '.bai' appended,
142       or with the last extension replaced with '.bai'
143       (that is, for $(I file.bam) paths $(I file.bai) and 
144       $(I file.bam.bai) will be checked)
145      */
146     bool has_index() @property {
147         return _random_access_manager.found_index_file;
148     }
149 
150     /**
151       Creates BAI file. If $(I overwrite) is false, it won't touch
152       existing index if it is already found.
153      */
154     void createIndex(bool overwrite = false) {
155         if (has_index && !overwrite)
156             return;
157         Stream stream = new BufferedFile(filename ~ ".bai", FileMode.OutNew);
158         scope(exit) stream.close();
159         bio.bam.bai.indexing.createIndex(this, stream);
160         _bai_status = BaiStatus.notInitialized;
161         _rndaccssmgr = null;
162     }
163 
164     /** Filename, if the object was created via file name constructor,
165         $(D null) otherwise.
166      */
167     string filename() @property const {
168         return _filename;
169     }
170 
171     /// If file ends with EOF block, returns virtual offset of the start of EOF block.
172     /// Otherwise, returns virtual offset of the physical end of file.
173     bio.core.bgzf.virtualoffset.VirtualOffset eofVirtualOffset() {
174         return _random_access_manager.eofVirtualOffset();
175     }
176 
177     /// Get BGZF block at a given file offset.
178     bio.core.bgzf.block.BgzfBlock getBgzfBlockAt(ulong offset) {
179         return _random_access_manager.getBgzfBlockAt(offset);
180     }
181 
182     /**
183       Returns: SAM header of the BAM file
184      */
185     bio.sam.header.SamHeader header() @property {
186         if (_header is null) {
187             synchronized {
188                 if (_header is null) {
189                     _header = new SamHeader(_headertext);
190                     _headertext = null;
191                 }
192             }
193         }
194         return _header;
195     }
196 
197     /**
198         Returns: information about reference sequences
199      */
200     const(bio.bam.referenceinfo.ReferenceSequenceInfo)[] reference_sequences() @property const nothrow {
201         return _reference_sequences;
202     }
203 
204     /**
205         Range of all alignment records in the file.
206         $(BR)
207         Element type of the returned range depends on the policy. 
208         Default one is $(DPREF2 bam, readrange, withoutOffsets),
209         in this case range element type is $(DPREF2 bam, read, BamRead).
210         $(BR)
211         The other option is $(DPREF2 bam, readrange, withOffsets),
212         which allows to track read virtual offsets in the file.
213         In this case range element type is $(DPREF2 bam, readrange, BamReadBlock).
214 
215         Example:
216         ----------------------------------
217         import bio.bam.readrange;
218         ...
219         auto bam = new BamReader("file.bam");
220         auto reads = bam.reads!withOffsets();
221         writeln(reads.front.start_virtual_offset);
222         ----------------------------------
223      */
224     auto reads(alias IteratePolicy=bio.bam.readrange.withoutOffsets)() @property {
225         auto _decompressed_stream = getDecompressedBamReadStream();
226         return bamReadRange!IteratePolicy(_decompressed_stream, this);
227     }
228 
229     static struct ReadsWithProgressResult(alias IteratePolicy, R, S) {
230         this(R range, S stream,
231              void delegate(lazy float p) progressBarFunc,
232              void delegate() finishFunc)
233         {
234             _range = range;
235             _stream = stream;
236             _progress_bar_func = progressBarFunc;
237             _finish_func = finishFunc;
238         }
239 
240         static if (__traits(identifier, IteratePolicy) == "withOffsets") {
241             auto front() @property {
242                 return _range.front;
243             } 
244         } else static if (__traits(identifier, IteratePolicy) == "withoutOffsets") {
245             auto front() @property {
246                 return _range.front.read;
247             }
248         } else static assert(0, __traits(identifier, IteratePolicy));
249 
250         bool empty() @property {
251             auto result = _range.empty;
252             if (_finish_func !is null && !_called_finish_func && result) {
253                 _called_finish_func = true;
254                 _finish_func();
255             }
256             return result;
257         }
258 
259         void popFront() {
260             _bytes_read += _range.front.read.size_in_bytes;
261             _range.popFront();
262             if (_progress_bar_func !is null) {
263                 _progress_bar_func(min(1.0, 
264                     cast(float)_bytes_read / (_stream.total_compressed_size * 
265                                               _stream.average_compression_ratio)));
266             }
267         }
268 
269         private R _range;
270         private S _stream;
271         private size_t _bytes_read;
272         private void delegate(lazy float p) _progress_bar_func;
273         private void delegate() _finish_func;
274         private bool _called_finish_func = false;
275     }
276 
277     /**
278         Returns: range of all reads in the file, calling $(I progressBarFunc)
279                  for each read. 
280         $(BR)
281         $(I progressBarFunc) will be called
282         each time next alignment is read, with the argument being a number from [0.0, 1.0],
283         which is estimated progress percentage.
284         $(BR)
285         Notice that $(I progressBarFunc) takes $(D lazy) argument, 
286         so that the number of relatively expensive float division operations
287         can be controlled by user.
288 
289         Once the iteration is finished (call to $(D empty) returned true),
290         $(I finishFunc) will be called if provided.
291 
292         Example:
293         ------------------------------------
294         import std.functional, std.stdio, bio.bam.reader;
295         void progress(lazy float p) {
296             static uint n;
297             if (++n % 63 == 0) writeln(p); // prints progress after every 63 records
298         }
299         ...
300         foreach (read; bam.readsWithProgress(toDelegate(&progress))) {
301             ...
302         }
303         ------------------------------------
304     */
305     auto readsWithProgress(alias IteratePolicy=bio.bam.readrange.withoutOffsets)
306         (void delegate(lazy float p) progressBarFunc,
307          void delegate() finishFunc=null) 
308     {
309         auto _decompressed_stream = getDecompressedBamReadStream();
310         auto reads_with_offsets = bamReadRange!withOffsets(_decompressed_stream, this);
311        
312         alias ReadsWithProgressResult!(IteratePolicy, 
313                        typeof(reads_with_offsets), BgzfInputStream) Result;
314         
315         return Result(reads_with_offsets, _decompressed_stream,
316                       progressBarFunc, finishFunc);
317     }
318 
319     ///
320     void assumeSequentialProcessing() {
321         _seqprocmode = true;
322     }
323 
324     /// Part of IBamSamReader interface
325     std.range.InputRange!(bio.bam.read.BamRead) allReads() @property {
326         return inputRangeObject(reads!withoutOffsets());
327     }
328 
329     /**
330       Returns: the read which starts at a given virtual offset.
331      */
332     bio.bam.read.BamRead getReadAt(bio.core.bgzf.virtualoffset.VirtualOffset offset) {
333         enforce(_random_access_manager !is null);
334         return _random_access_manager.getReadAt(offset);
335     }
336 
337     /**
338       Returns: all reads located between two virtual offsets in the BAM file.
339 
340       $(BR)
341       First offset must point to the start of an alignment record,
342       and be strictly less than the second one.
343       $(BR)
344       For decompression, the task pool specified at the construction is used.
345      */ 
346     auto getReadsBetween(bio.core.bgzf.virtualoffset.VirtualOffset from, 
347                          bio.core.bgzf.virtualoffset.VirtualOffset to) {
348         enforce(from <= to, "First offset must be less than second");
349         enforce(_stream_is_seekable, "Stream is not seekable");
350         
351         return _random_access_manager.getReadsBetween(from, to);
352     }
353 
354     /**
355       Returns: all reads overlapping any region from a set.
356     */
357     auto getReadsOverlapping(BamRegion[] regions) {
358 	return _random_access_manager.getReads(regions);
359     }
360 
361     /**
362       Unmapped reads, i.e. reads at the end of file whose reference id is -1.
363       The file must be coordinate-sorted and indexed.
364      */
365     auto unmappedReads() {
366         enforce(_random_access_manager !is null);
367         auto bai = _random_access_manager.getBai();
368 
369         VirtualOffset start;
370         start = eofVirtualOffset();
371 
372         auto all_reads = this.reads();
373         if (!all_reads.empty && all_reads.front.ref_id == -1)
374             start = _reads_start_voffset;
375 
376         auto ioffsets = bai.indices[0 .. reference_sequences.length].retro()
377                            .map!(index => index.ioffsets.retro()).joiner();
378         if (!ioffsets.empty)
379             start = ioffsets.front;
380 
381         auto stream = _random_access_manager.createStreamStartingFrom(start);
382         auto r = bamReadRange!withOffsets(stream, this);
383         while (!r.empty && r.front.ref_id != -1)
384             r.popFront();
385         return r;
386     }
387 
388     /**
389       Get BAI chunks containing all reads that overlap specified region.
390       For $(I ref_id) = -1, use $(D unmappedReads) method.
391      */
392     bio.core.bgzf.chunk.Chunk[] getChunks(uint ref_id, int beg, int end) {
393         enforce(_random_access_manager !is null);
394         enforce(beg < end);
395 
396         return _random_access_manager.getChunks(BamRegion(ref_id, beg, end));
397     }
398 
399     /**
400       Returns reference sequence with id $(I ref_id).
401      */
402     bio.bam.reference.ReferenceSequence reference(int ref_id) {
403         enforce(ref_id < _reference_sequences.length, "Invalid reference index");
404         return ReferenceSequence(_random_access_manager, 
405                                  ref_id,
406                                  _reference_sequences[ref_id]);
407     }
408 
409     /**
410       Returns reference sequence named $(I ref_name).
411 
412       Example:
413       ---------------------------
414       import std.stdio, bio.bam.reader;
415       ...
416       auto bam = new BamReader("file.bam");
417       writeln(bam["chr2"].length);
418       ---------------------------
419      */
420     bio.bam.reference.ReferenceSequence opIndex(string ref_name) {
421         enforce(hasReference(ref_name), "Reference with name " ~ ref_name ~ " does not exist");
422         auto ref_id = _reference_sequence_dict[ref_name];
423         return reference(ref_id);
424     }
425 
426     /**
427       Check if reference named $(I ref_name) is presented in BAM header.
428      */
429     bool hasReference(string ref_name) {
430         return null != (ref_name in _reference_sequence_dict);
431     }
432 
433     /**
434       Set buffer size for I/O operations. Values less than 4096 are disallowed.
435       $(BR)
436       This can help in multithreaded applications when several files are read 
437       simultaneously (e.g. merging).
438      */
439     void setBufferSize(size_t buffer_size) {
440         enforce(buffer_size >= 4096, "Buffer size must be >= 4096 bytes");
441         _buffer_size = buffer_size;
442         _random_access_manager.setBufferSize(buffer_size);
443     }
444 
445     package bool _seqprocmode; // available for bio.bam.readrange;
446 
447 private:
448     
449     string _filename;                       // filename (if available)
450     Stream _source_stream;                  // compressed
451     BgzfInputStream _decompressed_stream;   // decompressed
452     Stream _bam;                            // decompressed + endian conversion
453     bool _stream_is_seekable;
454 
455     // Virtual offset at which alignment records start.
456     VirtualOffset _reads_start_voffset;
457 
458     BaiFile _dont_access_me_directly_use_bai_file_for_that;
459     enum BaiStatus {
460         notInitialized,
461         initialized,
462         fileNotFound
463     }
464     BaiStatus _bai_status = BaiStatus.notInitialized;
465 
466     void initBai() {
467         if (_bai_status == BaiStatus.notInitialized) {
468             synchronized {
469                 try {
470                     _dont_access_me_directly_use_bai_file_for_that = BaiFile(_filename);
471                     _bai_status = BaiStatus.initialized;
472                 } catch (Exception e) {
473                     _bai_status = BaiStatus.fileNotFound;
474                 }
475             }
476         }
477     }
478 
479     // provides access to index file
480     @property ref BaiFile _bai_file() { // initialized lazily
481         initBai();
482         return _dont_access_me_directly_use_bai_file_for_that;
483     }; 
484 
485     RandomAccessManager _rndaccssmgr; // unreadable for a purpose
486     @property RandomAccessManager _random_access_manager() {
487         if (_rndaccssmgr is null) {
488             synchronized {
489                 initBai();
490 
491                 if (_bai_status == BaiStatus.initialized) {
492                     _rndaccssmgr = new RandomAccessManager(this, _bai_file);
493                 } else {
494                     _rndaccssmgr = new RandomAccessManager(this);
495                 }
496 
497                 _rndaccssmgr.setTaskPool(_task_pool);
498                 _rndaccssmgr.setBufferSize(_buffer_size);
499             }
500         }
501         return _rndaccssmgr;
502     }
503 
504     SamHeader _header;
505     string _headertext; // for lazy SAM header parsing
506     ReferenceSequenceInfo[] _reference_sequences;
507     int[string] _reference_sequence_dict; /// name -> index mapping
508 
509     TaskPool _task_pool;
510     size_t _buffer_size = 4096; // buffer size to be used for I/O
511 
512     Stream getNativeEndianSourceStream() {
513         assert(_filename !is null);
514         Stream file = new bio.core.utils.stream.File(_filename);
515         return new BufferedStream(file, _buffer_size);
516     }
517 
518     Stream getSeekableCompressedStream() {
519         if (_stream_is_seekable) {
520             if (_filename !is null) {
521                 auto file = getNativeEndianSourceStream();
522                 version(development)
523                 {
524                     std.stdio.stderr.writeln("[info] file size: ", file.size);
525                 }
526                 return new EndianStream(file, Endian.littleEndian);
527             } else {
528                 _source_stream.seekSet(0);
529                 return _source_stream;
530             } 
531         } else {
532             return null;
533         }
534     }
535 
536     // get decompressed stream out of compressed BAM file
537     BgzfInputStream getDecompressedStream() {
538         auto compressed_stream = getSeekableCompressedStream();
539 
540         auto block_supplier = new StreamSupplier(compressed_stream is null ?
541                                                  _source_stream :
542                                                  compressed_stream);
543 
544         return new BgzfInputStream(block_supplier, _task_pool,
545                                    null, _buffer_size);
546     }
547 
548     // get decompressed stream starting from the first alignment record
549     BgzfInputStream getDecompressedBamReadStream() {
550         auto compressed_stream = getSeekableCompressedStream();
551 
552         if (compressed_stream !is null) {
553             enforce(_reads_start_voffset != 0UL);
554 
555             compressed_stream.seekCur(_reads_start_voffset.coffset);
556             auto block_supplier = new StreamSupplier(compressed_stream);
557             auto stream = new BgzfInputStream(block_supplier, _task_pool, 
558                                               null, _buffer_size);
559             stream.readString(_reads_start_voffset.uoffset);
560             return stream;
561         } else {
562             // must be initialized in initializeStreams()
563             return _decompressed_stream;
564         }
565     }
566 
567     // sets up the streams and ranges
568     void initializeStreams() {
569         
570         _decompressed_stream = getDecompressedStream();
571         _bam = new EndianStream(_decompressed_stream, Endian.littleEndian); 
572     }
573 
574     // initializes _header
575     void readSamHeader() {
576         int l_text;
577         _bam.read(l_text);
578 
579         _headertext = cast(string)(_bam.readString(l_text));
580     }
581 
582     // initialize _reference_sequences
583     void readReferenceSequencesInfo() {
584         int n_ref;
585         _bam.read(n_ref);
586         _reference_sequences = new ReferenceSequenceInfo[n_ref];
587         foreach (i; 0..n_ref) {
588             _reference_sequences[i] = ReferenceSequenceInfo(_bam);
589 
590             // provide mapping Name -> Index
591             _reference_sequence_dict[_reference_sequences[i].name] = i;
592         }
593     }
594 }