1 /*
2     This file is part of BioD.
3     Copyright (C) 2012-2016    Artem Tarasov <lomereiter@gmail.com>
4 
5     Permission is hereby granted, free of charge, to any person obtaining a
6     copy of this software and associated documentation files (the "Software"),
7     to deal in the Software without restriction, including without limitation
8     the rights to use, copy, modify, merge, publish, distribute, sublicense,
9     and/or sell copies of the Software, and to permit persons to whom the
10     Software is furnished to do so, subject to the following conditions:
11 
12     The above copyright notice and this permission notice shall be included in
13     all copies or substantial portions of the Software.
14 
15     THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16     IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17     FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18     AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19     LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20     FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21     DEALINGS IN THE SOFTWARE.
22 
23 */
24 module bio.std.hts.bam.multireader;
25 
26 import bio.std.hts.sam.header;
27 import bio.std.hts.bam.reader;
28 import bio.std.hts.bam.read;
29 import bio.std.hts.bam.referenceinfo;
30 import bio.std.hts.utils.samheadermerger;
31 
32 import std.algorithm;
33 import std.range;
34 import std.conv;
35 import std.parallelism;
36 import std.array;
37 import std.numeric : normalize, dotProduct;
38 import std.exception;
39 import std.typecons;
40 import std.file : getSize;
41 
42 alias size_t FileId;
43 
44 /// Read from one of multiple BAM files
45 struct MultiBamRead(R=BamRead) {
46     R read;
47     alias read this;
48 
49     /// from which file it came
50     FileId file_id;
51 
52     ///
53     MultiBamRead dup() @property const {
54         return MultiBamRead(read.dup, file_id);
55     }
56 }
57 
58 // ((MultiBamRead, SamHeaderMerger), (MultiBamRead, SamHeaderMerger)) -> bool
59 bool compare(T)(auto ref T r1, auto ref T r2) {
60     assert(r1[1] == r2[1]);
61 
62     SortingOrder sorting_order;
63     if (r1[1] is null)
64         sorting_order = r1[0].read.reader.header.sorting_order;
65     else
66         sorting_order = r1[1].merged_header.sorting_order;
67 
68     if (sorting_order == SortingOrder.coordinate)
69         return compareCoordinatesAndStrand(r1[0], r2[0]);
70     else if (sorting_order == SortingOrder.queryname)
71         return compareReadNames(r1[0], r2[0]);
72     else
73         assert(0);
74 }
75 
76 // ([BamRead/BamReadBlock], FileId) -> [MultiBamRead]
77 auto multiBamReads(R)(R reads, FileId index) {
78   static if (is(ElementType!R == BamRead))
79     return reads.zip(repeat(index)).map!(x => MultiBamRead!BamRead(x[0], x[1]));
80   else
81     return reads.zip(repeat(index)).map!(x => MultiBamRead!BamRead(x[0].read, x[1]));
82 }
83 
84 // (BamReader, SamHeaderMerger, FileId) -> [(MultiBamRead, SamHeaderMerger, FileId)]
85 auto readRange(BamReader reader, SamHeaderMerger merger, FileId index) {
86     return zip(reader.reads.multiBamReads(index), repeat(merger), repeat(index));
87 }
88 
89 // (BamReader, SamHeaderMerger, FileId, int, uint, uint) ->
90 //                                    [(MultiBamRead, SamHeaderMerger, FileId)]
91 auto readRange(BamReader reader, SamHeaderMerger merger, FileId index,
92                int ref_id, uint start, uint end)
93 {
94     int old_ref_id = ref_id;
95     if (merger !is null)
96         old_ref_id = cast(int)merger.ref_id_reverse_map[index][ref_id];
97     auto reads = reader.reference(old_ref_id)[start .. end];
98     return zip(reads.multiBamReads(index), repeat(merger), repeat(index));
99 }
100 
101 // (BamReader, SamHeaderMerger, FileId, [BamRegion]) ->
102 //                                    [(MultiBamRead, SamHeaderMerger, FileId)]
103 auto readRange(BamReader reader, SamHeaderMerger merger, FileId index,
104                BamRegion[] regions) {
105     if (merger is null) // single reader => no fiddling with ref_id
106         return zip(reader.getReadsOverlapping(regions).multiBamReads(index),
107                    repeat(merger), repeat(index));
108 
109     auto old_regions = merger is null ? regions : regions.dup;
110     foreach (j; 0 .. regions.length) {
111         auto new_ref_id = regions[j].ref_id;
112         if (new_ref_id != -1) {
113             auto old_ref_id = cast(uint)merger.ref_id_reverse_map[index][new_ref_id];
114             old_regions[j].ref_id = old_ref_id;
115         }
116     }
117     return zip(reader.getReadsOverlapping(old_regions).multiBamReads(index),
118                repeat(merger), repeat(index));
119 }
120 
121 // ([BamReader], SamHeaderMerger) -> [[(MultiBamRead, SamHeaderMerger, FileId)]]
122 auto readRanges(BamReader[] readers, SamHeaderMerger merger) {
123     return readers.zip(repeat(merger), iota(readers.length))
124                   .map!(t => readRange(t[0], t[1], t[2]))();
125 }
126 
127 auto readRangeWithProgress
128 (BamReader reader, SamHeaderMerger merger, FileId index,
129  void delegate() f, void delegate(lazy float) u) {
130     return zip(reader.readsWithProgress(u, f).multiBamReads(index),
131                repeat(merger), repeat(index));
132 }
133 
134 auto readRangesWithProgress
135 (BamReader[] readers, SamHeaderMerger merger,
136  void delegate() f, void delegate(lazy float) delegate(size_t) u)
137 {
138     return readers.zip(repeat(merger), iota(readers.length))
139                   .map!(t => readRangeWithProgress(t[0], t[1], t[2], f, u(t[2])))();
140 }
141 
142 // ([BamReader], SamHeaderMerger, int, uint, uint) ->
143 //                                    [[(MultiBamRead, SamHeaderMerger, FileId)]]
144 auto readRanges(BamReader[] readers, SamHeaderMerger merger,
145                 int ref_id, uint start, uint end)
146 {
147     return readers.zip(repeat(merger), iota(readers.length),
148                        repeat(ref_id), repeat(start), repeat(end))
149                   .map!(t => readRange(t[0], t[1], t[2], t[3], t[4], t[5]))();
150 }
151 
152 // ([BamReader], SamHeaderMerger, [BamRegion]) ->
153 //        [[(MultiBamRead, SamHeaderMerger, FileId)])
154 auto readRanges(BamReader[] readers, SamHeaderMerger merger, BamRegion[] regions)
155 {
156     return readers.zip(repeat(merger), iota(readers.length),
157                        repeat(regions))
158         .map!(t => readRange(t[0], t[1], t[2], t[3]))();
159 }
160 
161 // tweaks RG and PG tags, and reference sequence ID
162 // [[(BamRead, SamHeaderMerger, size_t)]] -> [[MultiBamRead]]
163 auto adjustTags(R)(R reads_with_aux_info, TaskPool pool, size_t bufsize)
164     if (isInputRange!R)
165 {
166   alias R2 = typeof(pool.map!adjustTagsInRange(reads_with_aux_info.front, 1));
167   R2[] result;
168   foreach (read_range; reads_with_aux_info)
169     result ~= pool.map!adjustTagsInRange(read_range, bufsize);
170   return result;
171 }
172 
173 // (BamRead, SamHeaderMerger, size_t) -> (MultiBamRead, SamHeaderMerger)
174 auto adjustTagsInRange(R)(R read_with_aux_info) if (!isInputRange!R) {
175     auto read = read_with_aux_info[0];
176     auto merger = read_with_aux_info[1];
177     auto file_id = read_with_aux_info[2];
178 
179     if (merger is null) {
180         assert(file_id == 0);
181         return tuple(read, merger);
182     }
183 
184     with (merger) {
185         assert(file_id < ref_id_map.length);
186 
187         auto old_ref_id = read.ref_id;
188         if (old_ref_id != -1 && old_ref_id in ref_id_map[file_id]) {
189             auto new_ref_id = to!int(ref_id_map[file_id][old_ref_id]);
190             if (new_ref_id != old_ref_id)
191                 read.ref_id = new_ref_id;
192         }
193 
194         auto program = read["PG"];
195         if (!program.is_nothing) {
196             auto pg_str = *(cast(string*)(&program));
197             if (pg_str in program_id_map[file_id]) {
198                 auto new_pg = program_id_map[file_id][pg_str];
199                 if (new_pg != pg_str)
200                     read["PG"] = new_pg;
201             }
202         }
203 
204         auto read_group = read["RG"];
205         if (!read_group.is_nothing) {
206             auto rg_str = *(cast(string*)(&read_group));
207             if (rg_str in readgroup_id_map[file_id]) {
208                 auto new_rg = readgroup_id_map[file_id][rg_str];
209                 if (new_rg != rg_str)
210                     read["RG"] = new_rg;
211             }
212         }
213     }
214     return tuple(read, merger);
215 }
216 
217 ///
218 class MultiBamReader {
219 
220     ///
221     this(BamReader[] readers) {
222         _readers = readers;
223 
224         enforce(_readers.length >= 1, "MultiBamReader requires at least one BAM file");
225 
226         if (_readers.length > 1) {
227             _merger = new SamHeaderMerger(readers.map!(r => r.header)().array());
228             enforce(_merger.strategy == SamHeaderMerger.Strategy.simple, "NYI"); // TODO
229 
230             auto n_references = _merger.merged_header.sequences.length;
231             _reference_sequences = new ReferenceSequenceInfo[n_references];
232             size_t i;
233             foreach (line; _merger.merged_header.sequences) {
234                 _reference_sequences[i] = ReferenceSequenceInfo(line.name, line.length);
235                 _reference_sequence_dict[line.name] = i++;
236             }
237         }
238 
239         // TODO: maybe try to guess optimal size, based on the number of files?
240         setBufferSize(1_048_576);
241     }
242 
243     ///
244     this(string[] filenames) {
245         this(filenames.map!(fn => new BamReader(fn))().array());
246     }
247 
248     ///
249     this(string[] filenames, std.parallelism.TaskPool task_pool = taskPool) {
250         this(filenames.zip(repeat(task_pool))
251                       .map!(fn => new BamReader(fn[0], fn[1]))().array());
252     }
253 
254     ///
255     BamReader[] readers() @property {
256         return _readers;
257     }
258 
259     ///
260     SamHeader header() @property {
261         return _readers.length > 1 ? _merger.merged_header : _readers[0].header;
262     }
263 
264     /// Input range of MultiBamRead instances
265     auto reads() @property {
266         return readRanges(_readers, _merger).adjustTags(task_pool, _adj_bufsz)
267                                             .nWayUnion!compare().map!"a[0]"();
268     }
269 
270     ///
271     auto readsWithProgress(void delegate(lazy float p) progressBarFunc,
272                            void delegate() finishFunc=null)
273     {
274         size_t _running = _readers.length;
275         void innerFinishFunc() {
276             if (--_running == 0 && finishFunc)
277                 finishFunc();
278         }
279 
280         auto _progress = new float[_readers.length];
281         _progress[] = 0.0;
282         auto _weights = _readers.map!(r => r.filename.getSize.to!float).array;
283         normalize(_weights);
284 
285         auto innerProgressBarFunc(size_t idx) {
286             return (lazy float p) {
287                 _progress[idx] = p;
288                 progressBarFunc(dotProduct(_progress, _weights));
289             };
290         }
291 
292         return readRangesWithProgress(_readers, _merger,
293                                       &innerFinishFunc, &innerProgressBarFunc)
294                          .adjustTags(task_pool, _adj_bufsz)
295                          .nWayUnion!compare().map!"a[0]"();
296     }
297 
298     ///
299     const(ReferenceSequenceInfo)[] reference_sequences() @property const nothrow {
300         if (_readers.length > 1)
301             return _reference_sequences;
302         else
303             return _readers[0].reference_sequences;
304     }
305 
306     /**
307       Check if reference named $(I ref_name) is presented in BAM header.
308      */
309     bool hasReference(string ref_name) {
310         if (_readers.length > 1)
311             return null != (ref_name in _reference_sequence_dict);
312         else
313             return _readers[0].hasReference(ref_name);
314     }
315 
316     /**
317        Check if all BAM files have indices.
318     */
319     bool has_index() @property {
320         return readers.all!(b => b.has_index);
321     }
322 
323     /**
324       Returns reference sequence with id $(I ref_id).
325      */
326     MultiBamReference reference(int ref_id) {
327         enforce(ref_id >= 0, "Invalid reference index");
328         enforce(ref_id < reference_sequences.length, "Invalid reference index");
329         return MultiBamReference(_readers, _merger, task_pool, _adj_bufsz,
330                                  reference_sequences[ref_id], ref_id);
331     }
332 
333     /**
334       Returns reference sequence named $(I ref_name).
335      */
336     MultiBamReference opIndex(string ref_name) {
337         enforce(hasReference(ref_name),
338                 "Reference with name " ~ ref_name ~ " does not exist");
339         if (_readers.length > 1) {
340             auto ref_id = cast(int)_reference_sequence_dict[ref_name];
341             return reference(ref_id);
342         } else {
343             auto ref_id = _readers[0][ref_name].id;
344             return reference(ref_id);
345         }
346     }
347 
348     /// Sets buffer size for all readers (default is 1MB)
349     void setBufferSize(size_t bytes) {
350         foreach (reader; _readers)
351             reader.setBufferSize(bytes);
352     }
353 
354     /**
355        Requires coordinate sorting and presence of indices.
356     */
357     auto getReadsOverlapping(BamRegion[] regions) {
358         enforce(header.sorting_order == SortingOrder.coordinate,
359                 "Not all files are coordinate-sorted");
360         enforce(has_index, "Not all files are indexed");
361 
362         auto ranges = readRanges(_readers, _merger, regions);
363         return ranges.adjustTags(_task_pool, _adj_bufsz)
364                      .nWayUnion!compare().map!"a[0]"();
365     }
366 
367     private {
368         BamReader[] _readers;
369         SamHeaderMerger _merger;
370         ReferenceSequenceInfo[] _reference_sequences;
371         size_t[string] _reference_sequence_dict;
372         TaskPool _task_pool;
373         TaskPool task_pool() @property {
374             if (_task_pool is null)
375                 _task_pool = taskPool;
376             return _task_pool;
377         }
378 
379         size_t _adj_bufsz = 512;
380     }
381 }
382 
383 ///
384 struct MultiBamReference {
385     private {
386         BamReader[] _readers;
387         SamHeaderMerger _merger;
388         int _ref_id;
389         ReferenceSequenceInfo _info;
390         TaskPool _pool;
391         size_t _adj_bufsz;
392     }
393 
394     this(BamReader[] readers, SamHeaderMerger merger,
395          TaskPool task_pool, size_t adj_bufsize,
396          ReferenceSequenceInfo info, int ref_id)
397     {
398         _readers = readers;
399         _merger = merger;
400         _pool = task_pool;
401         _adj_bufsz = adj_bufsize;
402         _ref_id = ref_id;
403         _info = info;
404     }
405 
406     ///
407     string name() @property const { return _info.name; }
408 
409     ///
410     int length() @property const { return _info.length; }
411 
412     ///
413     int id() @property const { return _ref_id; }
414 
415     /// Get alignments overlapping [start, end) region.
416     /// $(BR)
417     /// Coordinates are 0-based.
418     auto opSlice(uint start, uint end) {
419         enforce(start < end, "start must be less than end");
420         auto ranges = readRanges(_readers, _merger, id, start, end);
421         return ranges.adjustTags(_pool, _adj_bufsz)
422                      .nWayUnion!compare().map!"a[0]"();
423     }
424 
425     ///
426     auto opSlice() {
427         return opSlice(0, length);
428     }
429 }