1 /* 2 This file is part of BioD. 3 Copyright (C) 2012-2016 Artem Tarasov <lomereiter@gmail.com> 4 5 Permission is hereby granted, free of charge, to any person obtaining a 6 copy of this software and associated documentation files (the "Software"), 7 to deal in the Software without restriction, including without limitation 8 the rights to use, copy, modify, merge, publish, distribute, sublicense, 9 and/or sell copies of the Software, and to permit persons to whom the 10 Software is furnished to do so, subject to the following conditions: 11 12 The above copyright notice and this permission notice shall be included in 13 all copies or substantial portions of the Software. 14 15 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 16 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 17 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 18 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 19 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 20 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER 21 DEALINGS IN THE SOFTWARE. 22 23 */ 24 module bio.std.hts.bam.multireader; 25 26 import bio.std.hts.sam.header; 27 import bio.std.hts.bam.reader; 28 import bio.std.hts.bam.read; 29 import bio.std.hts.bam.referenceinfo; 30 import bio.std.hts.utils.samheadermerger; 31 32 import std.algorithm; 33 import std.range; 34 import std.conv; 35 import std.parallelism; 36 import std.array; 37 import std.numeric : normalize, dotProduct; 38 import std.exception; 39 import std.typecons; 40 import std.file : getSize; 41 42 alias size_t FileId; 43 44 /// Read from one of multiple BAM files 45 struct MultiBamRead(R=BamRead) { 46 R read; 47 alias read this; 48 49 /// from which file it came 50 FileId file_id; 51 52 /// 53 MultiBamRead dup() @property const { 54 return MultiBamRead(read.dup, file_id); 55 } 56 } 57 58 // ((MultiBamRead, SamHeaderMerger), (MultiBamRead, SamHeaderMerger)) -> bool 59 bool compare(T)(auto ref T r1, auto ref T r2) { 60 assert(r1[1] == r2[1]); 61 62 SortingOrder sorting_order; 63 if (r1[1] is null) 64 sorting_order = r1[0].read.reader.header.sorting_order; 65 else 66 sorting_order = r1[1].merged_header.sorting_order; 67 68 if (sorting_order == SortingOrder.coordinate) 69 return compareCoordinatesAndStrand(r1[0], r2[0]); 70 else if (sorting_order == SortingOrder.queryname) 71 return compareReadNames(r1[0], r2[0]); 72 else 73 assert(0); 74 } 75 76 // ([BamRead/BamReadBlock], FileId) -> [MultiBamRead] 77 auto multiBamReads(R)(R reads, FileId index) { 78 static if (is(ElementType!R == BamRead)) 79 return reads.zip(repeat(index)).map!(x => MultiBamRead!BamRead(x[0], x[1])); 80 else 81 return reads.zip(repeat(index)).map!(x => MultiBamRead!BamRead(x[0].read, x[1])); 82 } 83 84 // (BamReader, SamHeaderMerger, FileId) -> [(MultiBamRead, SamHeaderMerger, FileId)] 85 auto readRange(BamReader reader, SamHeaderMerger merger, FileId index) { 86 return zip(reader.reads.multiBamReads(index), repeat(merger), repeat(index)); 87 } 88 89 // (BamReader, SamHeaderMerger, FileId, int, uint, uint) -> 90 // [(MultiBamRead, SamHeaderMerger, FileId)] 91 auto readRange(BamReader reader, SamHeaderMerger merger, FileId index, 92 int ref_id, uint start, uint end) 93 { 94 int old_ref_id = ref_id; 95 if (merger !is null) 96 old_ref_id = cast(int)merger.ref_id_reverse_map[index][ref_id]; 97 auto reads = reader.reference(old_ref_id)[start .. end]; 98 return zip(reads.multiBamReads(index), repeat(merger), repeat(index)); 99 } 100 101 // (BamReader, SamHeaderMerger, FileId, [BamRegion]) -> 102 // [(MultiBamRead, SamHeaderMerger, FileId)] 103 auto readRange(BamReader reader, SamHeaderMerger merger, FileId index, 104 BamRegion[] regions) { 105 if (merger is null) // single reader => no fiddling with ref_id 106 return zip(reader.getReadsOverlapping(regions).multiBamReads(index), 107 repeat(merger), repeat(index)); 108 109 auto old_regions = merger is null ? regions : regions.dup; 110 foreach (j; 0 .. regions.length) { 111 auto new_ref_id = regions[j].ref_id; 112 if (new_ref_id != -1) { 113 auto old_ref_id = cast(uint)merger.ref_id_reverse_map[index][new_ref_id]; 114 old_regions[j].ref_id = old_ref_id; 115 } 116 } 117 return zip(reader.getReadsOverlapping(old_regions).multiBamReads(index), 118 repeat(merger), repeat(index)); 119 } 120 121 // ([BamReader], SamHeaderMerger) -> [[(MultiBamRead, SamHeaderMerger, FileId)]] 122 auto readRanges(BamReader[] readers, SamHeaderMerger merger) { 123 return readers.zip(repeat(merger), iota(readers.length)) 124 .map!(t => readRange(t[0], t[1], t[2]))(); 125 } 126 127 auto readRangeWithProgress 128 (BamReader reader, SamHeaderMerger merger, FileId index, 129 void delegate() f, void delegate(lazy float) u) { 130 return zip(reader.readsWithProgress(u, f).multiBamReads(index), 131 repeat(merger), repeat(index)); 132 } 133 134 auto readRangesWithProgress 135 (BamReader[] readers, SamHeaderMerger merger, 136 void delegate() f, void delegate(lazy float) delegate(size_t) u) 137 { 138 return readers.zip(repeat(merger), iota(readers.length)) 139 .map!(t => readRangeWithProgress(t[0], t[1], t[2], f, u(t[2])))(); 140 } 141 142 // ([BamReader], SamHeaderMerger, int, uint, uint) -> 143 // [[(MultiBamRead, SamHeaderMerger, FileId)]] 144 auto readRanges(BamReader[] readers, SamHeaderMerger merger, 145 int ref_id, uint start, uint end) 146 { 147 return readers.zip(repeat(merger), iota(readers.length), 148 repeat(ref_id), repeat(start), repeat(end)) 149 .map!(t => readRange(t[0], t[1], t[2], t[3], t[4], t[5]))(); 150 } 151 152 // ([BamReader], SamHeaderMerger, [BamRegion]) -> 153 // [[(MultiBamRead, SamHeaderMerger, FileId)]) 154 auto readRanges(BamReader[] readers, SamHeaderMerger merger, BamRegion[] regions) 155 { 156 return readers.zip(repeat(merger), iota(readers.length), 157 repeat(regions)) 158 .map!(t => readRange(t[0], t[1], t[2], t[3]))(); 159 } 160 161 // tweaks RG and PG tags, and reference sequence ID 162 // [[(BamRead, SamHeaderMerger, size_t)]] -> [[MultiBamRead]] 163 auto adjustTags(R)(R reads_with_aux_info, TaskPool pool, size_t bufsize) 164 if (isInputRange!R) 165 { 166 alias R2 = typeof(pool.map!adjustTagsInRange(reads_with_aux_info.front, 1)); 167 R2[] result; 168 foreach (read_range; reads_with_aux_info) 169 result ~= pool.map!adjustTagsInRange(read_range, bufsize); 170 return result; 171 } 172 173 // (BamRead, SamHeaderMerger, size_t) -> (MultiBamRead, SamHeaderMerger) 174 auto adjustTagsInRange(R)(R read_with_aux_info) if (!isInputRange!R) { 175 auto read = read_with_aux_info[0]; 176 auto merger = read_with_aux_info[1]; 177 auto file_id = read_with_aux_info[2]; 178 179 if (merger is null) { 180 assert(file_id == 0); 181 return tuple(read, merger); 182 } 183 184 with (merger) { 185 assert(file_id < ref_id_map.length); 186 187 auto old_ref_id = read.ref_id; 188 if (old_ref_id != -1 && old_ref_id in ref_id_map[file_id]) { 189 auto new_ref_id = to!int(ref_id_map[file_id][old_ref_id]); 190 if (new_ref_id != old_ref_id) 191 read.ref_id = new_ref_id; 192 } 193 194 auto program = read["PG"]; 195 if (!program.is_nothing) { 196 auto pg_str = *(cast(string*)(&program)); 197 if (pg_str in program_id_map[file_id]) { 198 auto new_pg = program_id_map[file_id][pg_str]; 199 if (new_pg != pg_str) 200 read["PG"] = new_pg; 201 } 202 } 203 204 auto read_group = read["RG"]; 205 if (!read_group.is_nothing) { 206 auto rg_str = *(cast(string*)(&read_group)); 207 if (rg_str in readgroup_id_map[file_id]) { 208 auto new_rg = readgroup_id_map[file_id][rg_str]; 209 if (new_rg != rg_str) 210 read["RG"] = new_rg; 211 } 212 } 213 } 214 return tuple(read, merger); 215 } 216 217 /// 218 class MultiBamReader { 219 220 /// 221 this(BamReader[] readers) { 222 _readers = readers; 223 224 enforce(_readers.length >= 1, "MultiBamReader requires at least one BAM file"); 225 226 if (_readers.length > 1) { 227 _merger = new SamHeaderMerger(readers.map!(r => r.header)().array()); 228 enforce(_merger.strategy == SamHeaderMerger.Strategy.simple, "NYI"); // TODO 229 230 auto n_references = _merger.merged_header.sequences.length; 231 _reference_sequences = new ReferenceSequenceInfo[n_references]; 232 size_t i; 233 foreach (line; _merger.merged_header.sequences) { 234 _reference_sequences[i] = ReferenceSequenceInfo(line.name, line.length); 235 _reference_sequence_dict[line.name] = i++; 236 } 237 } 238 239 // TODO: maybe try to guess optimal size, based on the number of files? 240 setBufferSize(1_048_576); 241 } 242 243 /// 244 this(string[] filenames) { 245 this(filenames.map!(fn => new BamReader(fn))().array()); 246 } 247 248 /// 249 this(string[] filenames, std.parallelism.TaskPool task_pool = taskPool) { 250 this(filenames.zip(repeat(task_pool)) 251 .map!(fn => new BamReader(fn[0], fn[1]))().array()); 252 } 253 254 /// 255 BamReader[] readers() @property { 256 return _readers; 257 } 258 259 /// 260 SamHeader header() @property { 261 return _readers.length > 1 ? _merger.merged_header : _readers[0].header; 262 } 263 264 /// Input range of MultiBamRead instances 265 auto reads() @property { 266 return readRanges(_readers, _merger).adjustTags(task_pool, _adj_bufsz) 267 .nWayUnion!compare().map!"a[0]"(); 268 } 269 270 /// 271 auto readsWithProgress(void delegate(lazy float p) progressBarFunc, 272 void delegate() finishFunc=null) 273 { 274 size_t _running = _readers.length; 275 void innerFinishFunc() { 276 if (--_running == 0 && finishFunc) 277 finishFunc(); 278 } 279 280 auto _progress = new float[_readers.length]; 281 _progress[] = 0.0; 282 auto _weights = _readers.map!(r => r.filename.getSize.to!float).array; 283 normalize(_weights); 284 285 auto innerProgressBarFunc(size_t idx) { 286 return (lazy float p) { 287 _progress[idx] = p; 288 progressBarFunc(dotProduct(_progress, _weights)); 289 }; 290 } 291 292 return readRangesWithProgress(_readers, _merger, 293 &innerFinishFunc, &innerProgressBarFunc) 294 .adjustTags(task_pool, _adj_bufsz) 295 .nWayUnion!compare().map!"a[0]"(); 296 } 297 298 /// 299 const(ReferenceSequenceInfo)[] reference_sequences() @property const nothrow { 300 if (_readers.length > 1) 301 return _reference_sequences; 302 else 303 return _readers[0].reference_sequences; 304 } 305 306 /** 307 Check if reference named $(I ref_name) is presented in BAM header. 308 */ 309 bool hasReference(string ref_name) { 310 if (_readers.length > 1) 311 return null != (ref_name in _reference_sequence_dict); 312 else 313 return _readers[0].hasReference(ref_name); 314 } 315 316 /** 317 Check if all BAM files have indices. 318 */ 319 bool has_index() @property { 320 return readers.all!(b => b.has_index); 321 } 322 323 /** 324 Returns reference sequence with id $(I ref_id). 325 */ 326 MultiBamReference reference(int ref_id) { 327 enforce(ref_id >= 0, "Invalid reference index"); 328 enforce(ref_id < reference_sequences.length, "Invalid reference index"); 329 return MultiBamReference(_readers, _merger, task_pool, _adj_bufsz, 330 reference_sequences[ref_id], ref_id); 331 } 332 333 /** 334 Returns reference sequence named $(I ref_name). 335 */ 336 MultiBamReference opIndex(string ref_name) { 337 enforce(hasReference(ref_name), 338 "Reference with name " ~ ref_name ~ " does not exist"); 339 if (_readers.length > 1) { 340 auto ref_id = cast(int)_reference_sequence_dict[ref_name]; 341 return reference(ref_id); 342 } else { 343 auto ref_id = _readers[0][ref_name].id; 344 return reference(ref_id); 345 } 346 } 347 348 /// Sets buffer size for all readers (default is 1MB) 349 void setBufferSize(size_t bytes) { 350 foreach (reader; _readers) 351 reader.setBufferSize(bytes); 352 } 353 354 /** 355 Requires coordinate sorting and presence of indices. 356 */ 357 auto getReadsOverlapping(BamRegion[] regions) { 358 enforce(header.sorting_order == SortingOrder.coordinate, 359 "Not all files are coordinate-sorted"); 360 enforce(has_index, "Not all files are indexed"); 361 362 auto ranges = readRanges(_readers, _merger, regions); 363 return ranges.adjustTags(_task_pool, _adj_bufsz) 364 .nWayUnion!compare().map!"a[0]"(); 365 } 366 367 private { 368 BamReader[] _readers; 369 SamHeaderMerger _merger; 370 ReferenceSequenceInfo[] _reference_sequences; 371 size_t[string] _reference_sequence_dict; 372 TaskPool _task_pool; 373 TaskPool task_pool() @property { 374 if (_task_pool is null) 375 _task_pool = taskPool; 376 return _task_pool; 377 } 378 379 size_t _adj_bufsz = 512; 380 } 381 } 382 383 /// 384 struct MultiBamReference { 385 private { 386 BamReader[] _readers; 387 SamHeaderMerger _merger; 388 int _ref_id; 389 ReferenceSequenceInfo _info; 390 TaskPool _pool; 391 size_t _adj_bufsz; 392 } 393 394 this(BamReader[] readers, SamHeaderMerger merger, 395 TaskPool task_pool, size_t adj_bufsize, 396 ReferenceSequenceInfo info, int ref_id) 397 { 398 _readers = readers; 399 _merger = merger; 400 _pool = task_pool; 401 _adj_bufsz = adj_bufsize; 402 _ref_id = ref_id; 403 _info = info; 404 } 405 406 /// 407 string name() @property const { return _info.name; } 408 409 /// 410 int length() @property const { return _info.length; } 411 412 /// 413 int id() @property const { return _ref_id; } 414 415 /// Get alignments overlapping [start, end) region. 416 /// $(BR) 417 /// Coordinates are 0-based. 418 auto opSlice(uint start, uint end) { 419 enforce(start < end, "start must be less than end"); 420 auto ranges = readRanges(_readers, _merger, id, start, end); 421 return ranges.adjustTags(_pool, _adj_bufsz) 422 .nWayUnion!compare().map!"a[0]"(); 423 } 424 425 /// 426 auto opSlice() { 427 return opSlice(0, length); 428 } 429 }