1 /*
2     This file is part of BioD.
3     Copyright (C) 2012-2014    Artem Tarasov <lomereiter@gmail.com>
4 
5     Permission is hereby granted, free of charge, to any person obtaining a
6     copy of this software and associated documentation files (the "Software"),
7     to deal in the Software without restriction, including without limitation
8     the rights to use, copy, modify, merge, publish, distribute, sublicense,
9     and/or sell copies of the Software, and to permit persons to whom the
10     Software is furnished to do so, subject to the following conditions:
11     
12     The above copyright notice and this permission notice shall be included in
13     all copies or substantial portions of the Software.
14     
15     THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16     IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17     FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18     AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19     LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20     FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21     DEALINGS IN THE SOFTWARE.
22 
23 */
24 /**
25   Module for random access operations on BAM file.
26  */
27 module bio.bam.randomaccessmanager;
28 
29 import bio.bam.constants;
30 import bio.bam.reader;
31 import bio.bam.read;
32 import bio.bam.readrange;
33 import bio.bam.baifile;
34 import bio.bam.region;
35 import bio.core.utils.algo;
36 
37 import bio.core.bgzf.block;
38 import bio.core.bgzf.virtualoffset;
39 import bio.core.bgzf.inputstream;
40 import bio.core.bgzf.constants;
41 import bio.core.bgzf.chunk;
42 import bio.core.utils.range;
43 import bio.core.utils.stream;
44 
45 import std.system;
46 import std.algorithm;
47 import std.array;
48 import std.range;
49 import std.traits;
50 import std.exception;
51 import std.container;
52 import std.parallelism;
53 static import std.file;
54 
55 debug {
56     import std.stdio;
57 }
58 
59 private {
60     auto nonOverlappingChunks(R)(R chunks) {
61         static ref auto chunkB(ref Chunk chunk) { return chunk.beg; }
62         static ref auto chunkE(ref Chunk chunk) { return chunk.end; }
63         return nonOverlapping!(chunkB, chunkE)(chunks);
64     }
65 }
66 
67 /// Class which random access tasks are delegated to.
68 class RandomAccessManager {
69 
70     void setCache(BgzfBlockCache cache) {
71         _cache = cache;
72     }
73     
74     void setTaskPool(TaskPool task_pool) {
75         _task_pool = task_pool;
76     }
77 
78     void setBufferSize(size_t buffer_size) {
79         _buffer_size = buffer_size;
80     }
81 
82     /// Constructs new manager for BAM file
83     this(string filename) {
84         _filename = filename;
85     }
86 
87     /// ditto
88     this(BamReader reader) {
89         _reader = reader;
90         _filename = reader.filename;
91     }
92 
93     /// Constructs new manager with given index file.
94     /// This allows to do random-access interval queries.
95     ///
96     /// Params:
97     ///     filename =  location of BAM file
98     ///     bai  =  index file
99     this(string filename, ref BaiFile bai) {
100         _filename = filename;
101         _bai = bai;
102         _found_index_file = true;
103     }
104 
105     /// ditto
106     this(BamReader reader, ref BaiFile bai) {
107         _reader = reader;
108         _filename = reader.filename;
109         _bai = bai;
110         _found_index_file = true;
111     }
112 
113     /// If file ends with EOF block, returns virtual offset of the start of EOF block.
114     /// Otherwise, returns virtual offset of the physical end of file.
115     VirtualOffset eofVirtualOffset() const {
116         ulong file_offset = std.file.getSize(_filename);
117         if (hasEofBlock()) {
118             return VirtualOffset(file_offset - BAM_EOF.length, 0);
119         } else {
120             return VirtualOffset(file_offset, 0);
121         }
122     }
123 
124     /// Returns true if the file ends with EOF block, and false otherwise.
125     bool hasEofBlock() const {
126         auto _stream = new bio.core.utils.stream.File(_filename);
127         if (_stream.size < BAM_EOF.length) {
128             return false;
129         }
130 
131         ubyte[BAM_EOF.length] buf;
132         _stream.seekEnd(-cast(int)BAM_EOF.length);
133 
134         _stream.readExact(&buf, BAM_EOF.length);
135         if (buf != BAM_EOF) {
136             return false;
137         }
138 
139         return true;
140     }
141 
142     /// Get new BgzfInputStream starting from specified virtual offset.
143     BgzfInputStream createStreamStartingFrom(VirtualOffset offset)
144     {
145         auto _stream = new bio.core.utils.stream.File(_filename);
146         auto _compressed_stream = new EndianStream(_stream, Endian.littleEndian);
147         _compressed_stream.seekSet(cast(size_t)(offset.coffset));
148         auto supplier = new StreamSupplier(_compressed_stream, offset.uoffset);
149         auto bgzf_stream = new BgzfInputStream(supplier, _task_pool, _cache);
150         return bgzf_stream;
151     }
152 
153     /// Get single read at a given virtual offset.
154     /// Every time new stream is used.
155     BamRead getReadAt(VirtualOffset offset) {
156         auto stream = createStreamStartingFrom(offset);
157         
158         bool old_mode = _reader._seqprocmode;
159         _reader._seqprocmode = true;
160         auto read = bamReadRange(stream, _reader).front.dup;
161         _reader._seqprocmode = old_mode;
162         return read;
163     }
164 
165     /// Get BGZF block at a given offset.
166     BgzfBlock getBgzfBlockAt(ulong offset) {
167         auto fstream = new bio.core.utils.stream.File(_filename);
168         auto stream = new EndianStream(fstream, Endian.littleEndian);
169         stream.seekSet(offset);
170         BgzfBlock block = void;
171         ubyte[BGZF_MAX_BLOCK_SIZE] buf = void;
172         fillBgzfBufferFromStream(stream, true, &block, buf.ptr);
173         block._buffer = block._buffer.dup;
174         return block;
175     }
176 
177     /// Get reads between two virtual offsets. First virtual offset must point
178     /// to a start of an alignment record.
179     auto getReadsBetween(VirtualOffset from, VirtualOffset to) {
180         auto stream = createStreamStartingFrom(from);
181 
182         static bool offsetTooBig(BamReadBlock record, VirtualOffset vo) {
183             return record.end_virtual_offset > vo;
184         }
185 
186         return until!offsetTooBig(bamReadRange!withOffsets(stream, _reader), to);
187     }
188 
189     bool found_index_file() @property const {
190         return _found_index_file;
191     }
192     private bool _found_index_file = false; // overwritten in constructor if filename is provided
193 
194     /// BAI file
195     ref const(BaiFile) getBai() const {
196         enforce(found_index_file, "BAM index file (.bai) must be provided");
197         return _bai;
198     }
199 
200     private void checkIndexExistence() {
201         enforce(found_index_file, "BAM index file (.bai) must be provided");
202     }
203 
204     private void checkRefId(uint ref_id) {
205         enforce(ref_id < _bai.indices.length, "Invalid reference sequence index");
206     }
207 
208     private void appendChunks(ref Chunk[] chunks, Bin bin, VirtualOffset min_offset) {
209         foreach (chunk; bin.chunks) {
210             if (chunk.end > min_offset) {
211                 chunks ~= chunk;
212 
213                 // optimization
214                 if (chunks[$-1].beg < min_offset)
215                     chunks[$-1].beg = min_offset;
216             }
217         }
218     }
219 
220     /// Get BAI chunks containing all alignment records overlapping the region
221     Chunk[] getChunks(BamRegion region) {
222         auto ref_id = region.ref_id;
223         auto beg = region.start;
224         auto end = region.end;
225         checkIndexExistence();
226         checkRefId(ref_id);
227 
228         // Select all bins that overlap with [beg, end).
229         // Then from such bins select all chunks that end to the right of min_offset.
230         // Sort these chunks by leftmost coordinate and remove all overlaps.
231 
232         auto min_offset = _bai.indices[ref_id].getMinimumOffset(beg);
233 
234         Chunk[] bai_chunks;
235         foreach (b; _bai.indices[ref_id].bins) {
236             if (!b.canOverlapWith(beg, end))
237                 continue;
238             appendChunks(bai_chunks, b, min_offset);
239         }
240 
241         sort(bai_chunks);
242         return bai_chunks.nonOverlappingChunks().array();
243     }
244 
245     // regions should be from the same reference sequence
246     private Chunk[] getGroupChunks(BamRegion[] regions) {
247         auto bitset = Array!bool();
248         enforce(regions.length > 0);
249         bitset.length = BAI_MAX_BIN_ID;
250         bitset[0] = true;
251         foreach (region; regions) {
252             auto beg = region.start;
253             auto end = region.end;
254             int i = 0, k;
255             enforce(beg < end);
256             --end;
257             for (k =    1 + (beg>>26); k <=    1 + (end>>26); ++k) bitset[k] = true;
258             for (k =    9 + (beg>>23); k <=    9 + (end>>23); ++k) bitset[k] = true;
259             for (k =   73 + (beg>>20); k <=   73 + (end>>20); ++k) bitset[k] = true;
260             for (k =  585 + (beg>>17); k <=  585 + (end>>17); ++k) bitset[k] = true;
261             for (k = 4681 + (beg>>14); k <= 4681 + (end>>14); ++k) bitset[k] = true;
262         }
263 
264         auto ref_id = regions.front.ref_id;
265         checkIndexExistence();
266         checkRefId(ref_id);
267 
268         // Select all bins that overlap with [beg, end).
269         // Then from such bins select all chunks that end to the right of min_offset.
270         // Sort these chunks by leftmost coordinate and remove all overlaps.
271 
272         auto min_offset = _bai.indices[ref_id].getMinimumOffset(regions.front.start);
273 
274         version(extraVerbose) {
275             import std.stdio;
276             stderr.writeln("min offset = ", min_offset);
277         }
278 
279         Chunk[] bai_chunks;
280         auto bins = _bai.indices[ref_id].bins;
281         foreach (bin; bins)
282             if (bitset[bin.id])
283                 appendChunks(bai_chunks, bin, min_offset);
284         sort(bai_chunks);
285 
286         version(extraVerbose) {
287             stderr.writeln("[chunks before normalization]");
288             foreach(chunk; bai_chunks)
289                 stderr.writeln(chunk.beg, " - ", chunk.end);
290         }
291 
292         return bai_chunks.nonOverlappingChunks().array();
293     }
294 
295     private auto filteredReads(alias IteratePolicy)(BamRegion[] regions) {
296         auto chunks = getGroupChunks(regions);
297         version(extraVerbose) {
298             import std.stdio;
299             stderr.writeln("[chunks]");
300             foreach (chunk; chunks)
301                 stderr.writeln(chunk.beg, " - ", chunk.end);
302         }
303         auto reads = readsFromChunks!IteratePolicy(chunks);
304         return filterBamReads(reads, regions);
305     }
306     
307     /// Fetch alignments with given reference sequence id, overlapping [beg..end)
308     auto getReads(alias IteratePolicy=withOffsets)(BamRegion region)
309     {
310         auto chunks = getChunks(region);
311         auto reads = readsFromChunks!IteratePolicy(chunks);
312         return filterBamReads(reads, [region]);
313     }
314 
315     auto getReads(alias IteratePolicy=withOffsets)(BamRegion[] regions) {
316         auto sorted_regions = regions.sort();
317         BamRegion[][] regions_by_ref;
318         // TODO: replace with groupBy once it's included into Phobos
319         uint last_ref_id = uint.max;
320         foreach (region; sorted_regions) {
321             if (region.ref_id == last_ref_id) {
322                 regions_by_ref.back ~= region;
323             } else {
324                 regions_by_ref ~= [region];
325                 last_ref_id = region.ref_id;
326             }
327         }
328     
329         static ref auto regB(ref BamRegion region) { return region.start; }
330         static ref auto regE(ref BamRegion region) { return region.end; }
331         foreach (ref group; regions_by_ref)
332             group = nonOverlapping!(regB, regE)(group).array();
333 
334         return regions_by_ref.zip(repeat(this))
335                              .map!(gt => gt[1].filteredReads!IteratePolicy(gt[0]))()
336                              .joiner();
337     }
338 
339 private:
340     auto readsFromChunks(alias IteratePolicy, R)(R chunks) {
341         auto fstream = new bio.core.utils.stream.File(_filename);
342         auto compressed_stream = new EndianStream(fstream, Endian.littleEndian);
343         auto supplier = new StreamChunksSupplier(compressed_stream, chunks);
344         auto stream = new BgzfInputStream(supplier, _task_pool, _cache);
345         return bamReadRange!IteratePolicy(stream, _reader);
346     }
347     
348     string _filename;
349     BaiFile _bai;
350     BamReader _reader;
351     TaskPool _task_pool;
352     size_t _buffer_size;
353 
354     BgzfBlockCache _cache;
355 
356     TaskPool task_pool() @property {
357         if (_task_pool is null)
358             _task_pool = taskPool;
359         return _task_pool;
360     }
361         
362 public:
363 
364     static struct BamReadFilter(R) {
365         this(R r, BamRegion[] regions) {
366             _range = r;
367             _regions = regions;
368             enforce(regions.length > 0);
369             _region = _regions.front;
370             _ref_id = _region.ref_id; // assumed to be constant
371             findNext();
372         }
373 
374         bool empty() @property {
375             return _empty;
376         }
377 
378         ElementType!R front() @property {
379             return _current_read;
380         }
381         
382         void popFront() {
383             _range.popFront();
384             findNext();
385         }
386 
387     private: 
388         R _range;
389         uint _ref_id;
390         BamRegion _region;
391         BamRegion[] _regions; // non-overlapping and sorted
392         bool _empty;
393         ElementType!R _current_read;
394 
395         void findNext() {
396             if (_regions.empty || _range.empty) {
397                 _empty = true;
398                 return;
399             }
400 
401             while (!_range.empty) {
402                 _current_read = _range.front;
403 
404                 // BamReads are sorted first by ref. ID.
405                 auto current_ref_id = cast(uint)_current_read.ref_id;
406                 // ref_id can't be -1 unless the index is fucked up
407                 if (current_ref_id > _ref_id) {
408                     // no more records for this _ref_id
409                     _empty = true;
410                     return;
411                 } else if (current_ref_id < _ref_id) {
412                     // skip reads referring to sequences
413                     // with ID less than ours
414                     _range.popFront();
415                     continue;
416                 }
417 
418                 if (_current_read.position >= _region.end) {
419                     // As reads are sorted by leftmost coordinate,
420                     // all remaining alignments in _range 
421                     // will not overlap the current interval as well.
422                     // 
423                     //                  [-----)
424                     //                  . [-----------)
425                     //                  .  [---)
426                     //                  .    [-------)
427                     //                  .         [-)
428                     //    [beg .....  end)
429                     _regions.popFront();
430                     // TODO: potentially binary search may be faster,
431                     // but it needs to be checked
432                     if (_regions.empty) {
433                         _empty = true;
434                         return;
435                     } else {
436                         _region = _regions.front;
437                         continue;
438                     }
439                 }
440 
441                 if (_current_read.position > _region.start) {
442                     return; // definitely overlaps
443                 }
444 
445                 if (_current_read.position +
446                     _current_read.basesCovered() <= _region.start) 
447                     {
448                         /// ends before beginning of the region
449                         ///  [-----------)
450                         ///               [beg .......... end)
451                         _range.popFront();
452                         /// Zero-length reads are also considered non-overlapping,
453                         /// so for consistency the inequality 12 lines above is strict.
454                     } else {
455                     return; /// _current_read overlaps the region
456                 }
457             }
458             _empty = true; 
459         }
460     }
461 
462     // Get range of alignments sorted by leftmost coordinate,
463     // together with an interval [beg, end),
464     // and return another range of alignments which overlap the region.
465     static auto filterBamReads(R)(R r, BamRegion[] regions)
466     {
467         return BamReadFilter!R(r, regions);
468     }
469 }