1 /*
2     This file is part of BioD.
3     Copyright (C) 2012-2014    Artem Tarasov <lomereiter@gmail.com>
4 
5     Permission is hereby granted, free of charge, to any person obtaining a
6     copy of this software and associated documentation files (the "Software"),
7     to deal in the Software without restriction, including without limitation
8     the rights to use, copy, modify, merge, publish, distribute, sublicense,
9     and/or sell copies of the Software, and to permit persons to whom the
10     Software is furnished to do so, subject to the following conditions:
11 
12     The above copyright notice and this permission notice shall be included in
13     all copies or substantial portions of the Software.
14 
15     THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16     IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17     FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18     AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19     LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20     FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21     DEALINGS IN THE SOFTWARE.
22 
23 */
24 /**
25   Module for random access operations on BAM file.
26  */
27 module bio.std.hts.bam.randomaccessmanager;
28 
29 import bio.std.hts.bam.constants;
30 import bio.std.hts.bam.reader;
31 import bio.std.hts.bam.read;
32 import bio.std.hts.bam.readrange;
33 import bio.std.hts.bam.baifile;
34 import bio.std.hts.bam.region;
35 import bio.core.utils.algo;
36 
37 import bio.core.bgzf.block;
38 import bio.core.bgzf.virtualoffset;
39 import bio.core.bgzf.inputstream;
40 import bio.core.bgzf.constants;
41 import bio.core.bgzf.chunk;
42 import bio.core.utils.range;
43 import bio.core.utils.stream;
44 
45 import std.system;
46 import std.algorithm;
47 import std.array;
48 import std.range;
49 import std.traits;
50 import std.exception;
51 import std.container;
52 import std.parallelism;
53 static import std.file;
54 
55 debug {
56     import std.stdio;
57 }
58 
59 private {
60     auto nonOverlappingChunks(R)(R chunks) {
61         static ref auto chunkB(ref Chunk chunk) { return chunk.beg; }
62         static ref auto chunkE(ref Chunk chunk) { return chunk.end; }
63         return nonOverlapping!(chunkB, chunkE)(chunks);
64     }
65 }
66 
67 /// Class which random access tasks are delegated to.
68 class RandomAccessManager {
69 
70   // void setCache(BgzfBlockCache cache) {
71       // _cache = cache;
72   //}
73 
74     void setTaskPool(TaskPool task_pool) {
75         _task_pool = task_pool;
76     }
77 
78     void setBufferSize(size_t buffer_size) {
79         _buffer_size = buffer_size;
80     }
81 
82     /// Constructs new manager for BAM file
83     this(string filename) {
84         _filename = filename;
85     }
86 
87     /// ditto
88     this(BamReader reader) {
89         _reader = reader;
90         _filename = reader.filename;
91     }
92 
93     /// Constructs new manager with given index file.
94     /// This allows to do random-access interval queries.
95     ///
96     /// Params:
97     ///     filename =  location of BAM file
98     ///     bai  =  index file
99     this(string filename, ref BaiFile bai) {
100         _filename = filename;
101         _bai = bai;
102         _found_index_file = true;
103     }
104 
105     /// ditto
106     this(BamReader reader, ref BaiFile bai) {
107         _reader = reader;
108         _filename = reader.filename;
109         _bai = bai;
110         _found_index_file = true;
111     }
112 
113     /// If file ends with EOF block, returns virtual offset of the start of EOF block.
114     /// Otherwise, returns virtual offset of the physical end of file.
115     VirtualOffset eofVirtualOffset() const {
116         ulong file_offset = std.file.getSize(_filename);
117         if (hasEofBlock()) {
118             return VirtualOffset(file_offset - BAM_EOF.length, 0);
119         } else {
120             return VirtualOffset(file_offset, 0);
121         }
122     }
123 
124     /// Returns true if the file ends with EOF block, and false otherwise.
125     bool hasEofBlock() const {
126         auto _stream = new bio.core.utils.stream.File(_filename);
127         if (_stream.size < BAM_EOF.length) {
128             return false;
129         }
130 
131         ubyte[BAM_EOF.length] buf;
132         _stream.seekEnd(-cast(int)BAM_EOF.length);
133 
134         _stream.readExact(&buf, BAM_EOF.length);
135         if (buf != BAM_EOF) {
136             return false;
137         }
138 
139         return true;
140     }
141 
142     /// Get new BgzfInputStream starting from specified virtual offset.
143     BgzfInputStream createStreamStartingFrom(VirtualOffset offset)
144     {
145         auto _stream = new bio.core.utils.stream.File(_filename);
146         auto _compressed_stream = new EndianStream(_stream, Endian.littleEndian);
147         _compressed_stream.seekSet(cast(size_t)(offset.coffset));
148         auto supplier = new StreamSupplier(_compressed_stream, offset.uoffset);
149         // auto bgzf_stream = new BgzfInputStream(supplier, _task_pool, _cache);
150         auto bgzf_stream = new BgzfInputStream(supplier, _task_pool);
151         return bgzf_stream;
152     }
153 
154     /// Get single read at a given virtual offset.
155     /// Every time new stream is used.
156     BamRead getReadAt(VirtualOffset offset) {
157         auto stream = createStreamStartingFrom(offset);
158 
159         bool old_mode = _reader._seqprocmode;
160         _reader._seqprocmode = true;
161         auto read = bamReadRange(stream, _reader).front.dup;
162         _reader._seqprocmode = old_mode;
163         return read;
164     }
165 
166     /// Get BGZF block at a given offset.
167     BgzfBlock getBgzfBlockAt(ulong offset) {
168         auto fstream = new bio.core.utils.stream.File(_filename);
169         auto stream = new EndianStream(fstream, Endian.littleEndian);
170         stream.seekSet(offset);
171         BgzfBlock block = void;
172         ubyte[BGZF_MAX_BLOCK_SIZE] buf = void;
173         fillBgzfBufferFromStream(stream, true, &block, buf.ptr);
174         block._buffer = block._buffer.dup;
175         return block;
176     }
177 
178     /// Get reads between two virtual offsets. First virtual offset must point
179     /// to a start of an alignment record.
180     auto getReadsBetween(VirtualOffset from, VirtualOffset to) {
181         auto stream = createStreamStartingFrom(from);
182 
183         static bool offsetTooBig(BamReadBlock record, VirtualOffset vo) {
184             return record.end_virtual_offset > vo;
185         }
186 
187         return until!offsetTooBig(bamReadRange!withOffsets(stream, _reader), to);
188     }
189 
190     bool found_index_file() @property const {
191         return _found_index_file;
192     }
193     private bool _found_index_file = false; // overwritten in constructor if filename is provided
194 
195     /// BAI file
196     ref const(BaiFile) getBai() const {
197         enforce(found_index_file, "BAM index file (.bai) must be provided");
198         return _bai;
199     }
200 
201     private void checkIndexExistence() {
202         enforce(found_index_file, "BAM index file (.bai) must be provided");
203     }
204 
205     private void checkRefId(uint ref_id) {
206         enforce(ref_id < _bai.indices.length, "Invalid reference sequence index");
207     }
208 
209     private void appendChunks(ref Chunk[] chunks, Bin bin, VirtualOffset min_offset) {
210         foreach (chunk; bin.chunks) {
211             if (chunk.end > min_offset) {
212                 chunks ~= chunk;
213 
214                 // optimization
215                 if (chunks[$-1].beg < min_offset)
216                     chunks[$-1].beg = min_offset;
217             }
218         }
219     }
220 
221     /// Get BAI chunks containing all alignment records overlapping the region
222     Chunk[] getChunks(BamRegion region) {
223         auto ref_id = region.ref_id;
224         auto beg = region.start;
225         auto end = region.end;
226         checkIndexExistence();
227         checkRefId(ref_id);
228 
229         // Select all bins that overlap with [beg, end).
230         // Then from such bins select all chunks that end to the right of min_offset.
231         // Sort these chunks by leftmost coordinate and remove all overlaps.
232 
233         auto min_offset = _bai.indices[ref_id].getMinimumOffset(beg);
234 
235         Chunk[] bai_chunks;
236         foreach (b; _bai.indices[ref_id].bins) {
237             if (!b.canOverlapWith(beg, end))
238                 continue;
239             appendChunks(bai_chunks, b, min_offset);
240         }
241 
242         sort(bai_chunks);
243         return bai_chunks.nonOverlappingChunks().array();
244     }
245 
246     // regions should be from the same reference sequence
247     private Chunk[] getGroupChunks(BamRegion[] regions) {
248         auto bitset = Array!bool();
249         enforce(regions.length > 0);
250         bitset.length = BAI_MAX_BIN_ID;
251         bitset[0] = true;
252         foreach (region; regions) {
253             auto beg = region.start;
254             auto end = region.end;
255             int i = 0, k;
256             enforce(beg < end);
257             --end;
258             for (k =    1 + (beg>>26); k <=    1 + (end>>26); ++k) bitset[k] = true;
259             for (k =    9 + (beg>>23); k <=    9 + (end>>23); ++k) bitset[k] = true;
260             for (k =   73 + (beg>>20); k <=   73 + (end>>20); ++k) bitset[k] = true;
261             for (k =  585 + (beg>>17); k <=  585 + (end>>17); ++k) bitset[k] = true;
262             for (k = 4681 + (beg>>14); k <= 4681 + (end>>14); ++k) bitset[k] = true;
263         }
264 
265         auto ref_id = regions.front.ref_id;
266         checkIndexExistence();
267         checkRefId(ref_id);
268 
269         // Select all bins that overlap with [beg, end).
270         // Then from such bins select all chunks that end to the right of min_offset.
271         // Sort these chunks by leftmost coordinate and remove all overlaps.
272 
273         auto min_offset = _bai.indices[ref_id].getMinimumOffset(regions.front.start);
274 
275         version(extraVerbose) {
276             import std.stdio;
277             stderr.writeln("min offset = ", min_offset);
278         }
279 
280         Chunk[] bai_chunks;
281         auto bins = _bai.indices[ref_id].bins;
282         foreach (bin; bins)
283             if (bitset[bin.id])
284                 appendChunks(bai_chunks, bin, min_offset);
285         sort(bai_chunks);
286 
287         version(extraVerbose) {
288             stderr.writeln("[chunks before normalization]");
289             foreach(chunk; bai_chunks)
290                 stderr.writeln(chunk.beg, " - ", chunk.end);
291         }
292 
293         return bai_chunks.nonOverlappingChunks().array();
294     }
295 
296     private auto filteredReads(alias IteratePolicy)(BamRegion[] regions) {
297         auto chunks = getGroupChunks(regions);
298         version(extraVerbose) {
299             import std.stdio;
300             stderr.writeln("[chunks]");
301             foreach (chunk; chunks)
302                 stderr.writeln(chunk.beg, " - ", chunk.end);
303         }
304         auto reads = readsFromChunks!IteratePolicy(chunks);
305         return filterBamReads(reads, regions);
306     }
307 
308     /// Fetch alignments with given reference sequence id, overlapping [beg..end)
309     auto getReads(alias IteratePolicy=withOffsets)(BamRegion region)
310     {
311         auto chunks = getChunks(region);
312         auto reads = readsFromChunks!IteratePolicy(chunks);
313         return filterBamReads(reads, [region]);
314     }
315 
316     auto getReads(alias IteratePolicy=withOffsets)(BamRegion[] regions) {
317         auto sorted_regions = regions.sort();
318         BamRegion[][] regions_by_ref;
319         // TODO: replace with groupBy once it's included into Phobos
320         uint last_ref_id = uint.max;
321         foreach (region; sorted_regions) {
322             if (region.ref_id == last_ref_id) {
323                 regions_by_ref.back ~= region;
324             } else {
325                 regions_by_ref ~= [region];
326                 last_ref_id = region.ref_id;
327             }
328         }
329 
330         static ref auto regB(ref BamRegion region) { return region.start; }
331         static ref auto regE(ref BamRegion region) { return region.end; }
332         foreach (ref group; regions_by_ref)
333             group = nonOverlapping!(regB, regE)(group).array();
334 
335         return regions_by_ref.zip(repeat(this))
336                              .map!(gt => gt[1].filteredReads!IteratePolicy(gt[0]))()
337                              .joiner();
338     }
339 
340 private:
341     auto readsFromChunks(alias IteratePolicy, R)(R chunks) {
342         auto fstream = new bio.core.utils.stream.File(_filename);
343         auto compressed_stream = new EndianStream(fstream, Endian.littleEndian);
344         auto supplier = new StreamChunksSupplier(compressed_stream, chunks);
345         // auto stream = new BgzfInputStream(supplier, _task_pool, _cache);
346         auto stream = new BgzfInputStream(supplier, _task_pool);
347         return bamReadRange!IteratePolicy(stream, _reader);
348     }
349 
350     string _filename;
351     BaiFile _bai;
352     BamReader _reader;
353     TaskPool _task_pool;
354     size_t _buffer_size;
355 
356   // BgzfBlockCache _cache;
357 
358     TaskPool task_pool() @property {
359         if (_task_pool is null)
360             _task_pool = taskPool;
361         return _task_pool;
362     }
363 
364 public:
365 
366     static struct BamReadFilter(R) {
367         this(R r, BamRegion[] regions) {
368             _range = r;
369             _regions = regions;
370             enforce(regions.length > 0);
371             _region = _regions.front;
372             _ref_id = _region.ref_id; // assumed to be constant
373             findNext();
374         }
375 
376         bool empty() @property {
377             return _empty;
378         }
379 
380         ElementType!R front() @property {
381             return _current_read;
382         }
383 
384         void popFront() {
385             _range.popFront();
386             findNext();
387         }
388 
389     private:
390         R _range;
391         uint _ref_id;
392         BamRegion _region;
393         BamRegion[] _regions; // non-overlapping and sorted
394         bool _empty;
395         ElementType!R _current_read;
396 
397         void findNext() {
398             if (_regions.empty || _range.empty) {
399                 _empty = true;
400                 return;
401             }
402 
403             while (!_range.empty) {
404                 _current_read = _range.front;
405 
406                 // BamReads are sorted first by ref. ID.
407                 auto current_ref_id = cast(uint)_current_read.ref_id;
408                 // ref_id can't be -1 unless the index is fucked up
409                 if (current_ref_id > _ref_id) {
410                     // no more records for this _ref_id
411                     _empty = true;
412                     return;
413                 } else if (current_ref_id < _ref_id) {
414                     // skip reads referring to sequences
415                     // with ID less than ours
416                     _range.popFront();
417                     continue;
418                 }
419 
420                 if (_current_read.position >= _region.end) {
421                     // As reads are sorted by leftmost coordinate,
422                     // all remaining alignments in _range
423                     // will not overlap the current interval as well.
424                     //
425                     //                  [-----)
426                     //                  . [-----------)
427                     //                  .  [---)
428                     //                  .    [-------)
429                     //                  .         [-)
430                     //    [beg .....  end)
431                     _regions.popFront();
432                     // TODO: potentially binary search may be faster,
433                     // but it needs to be checked
434                     if (_regions.empty) {
435                         _empty = true;
436                         return;
437                     } else {
438                         _region = _regions.front;
439                         continue;
440                     }
441                 }
442 
443                 if (_current_read.position > _region.start) {
444                     return; // definitely overlaps
445                 }
446 
447                 if (_current_read.position +
448                     _current_read.basesCovered() <= _region.start)
449                     {
450                         /// ends before beginning of the region
451                         ///  [-----------)
452                         ///               [beg .......... end)
453                         _range.popFront();
454                         /// Zero-length reads are also considered non-overlapping,
455                         /// so for consistency the inequality 12 lines above is strict.
456                     } else {
457                     return; /// _current_read overlaps the region
458                 }
459             }
460             _empty = true;
461         }
462     }
463 
464     // Get range of alignments sorted by leftmost coordinate,
465     // together with an interval [beg, end),
466     // and return another range of alignments which overlap the region.
467     static auto filterBamReads(R)(R r, BamRegion[] regions)
468     {
469         return BamReadFilter!R(r, regions);
470     }
471 }