1 /* 2 This file is part of BioD. 3 Copyright (C) 2012-2014 Artem Tarasov <lomereiter@gmail.com> 4 5 Permission is hereby granted, free of charge, to any person obtaining a 6 copy of this software and associated documentation files (the "Software"), 7 to deal in the Software without restriction, including without limitation 8 the rights to use, copy, modify, merge, publish, distribute, sublicense, 9 and/or sell copies of the Software, and to permit persons to whom the 10 Software is furnished to do so, subject to the following conditions: 11 12 The above copyright notice and this permission notice shall be included in 13 all copies or substantial portions of the Software. 14 15 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 16 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 17 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 18 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 19 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 20 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER 21 DEALINGS IN THE SOFTWARE. 22 23 */ 24 /** 25 Module for random access operations on BAM file. 26 */ 27 module bio.bam.randomaccessmanager; 28 29 import bio.bam.constants; 30 import bio.bam.reader; 31 import bio.bam.read; 32 import bio.bam.readrange; 33 import bio.bam.baifile; 34 import bio.bam.region; 35 import bio.core.utils.algo; 36 37 import bio.core.bgzf.block; 38 import bio.core.bgzf.virtualoffset; 39 import bio.core.bgzf.inputstream; 40 import bio.core.bgzf.constants; 41 import bio.core.bgzf.chunk; 42 import bio.core.utils.range; 43 import bio.core.utils.stream; 44 45 import std.system; 46 import std.algorithm; 47 import std.array; 48 import std.range; 49 import std.traits; 50 import std.exception; 51 import std.container; 52 import std.parallelism; 53 static import std.file; 54 55 debug { 56 import std.stdio; 57 } 58 59 private { 60 auto nonOverlappingChunks(R)(R chunks) { 61 static ref auto chunkB(ref Chunk chunk) { return chunk.beg; } 62 static ref auto chunkE(ref Chunk chunk) { return chunk.end; } 63 return nonOverlapping!(chunkB, chunkE)(chunks); 64 } 65 } 66 67 /// Class which random access tasks are delegated to. 68 class RandomAccessManager { 69 70 void setCache(BgzfBlockCache cache) { 71 _cache = cache; 72 } 73 74 void setTaskPool(TaskPool task_pool) { 75 _task_pool = task_pool; 76 } 77 78 void setBufferSize(size_t buffer_size) { 79 _buffer_size = buffer_size; 80 } 81 82 /// Constructs new manager for BAM file 83 this(string filename) { 84 _filename = filename; 85 } 86 87 /// ditto 88 this(BamReader reader) { 89 _reader = reader; 90 _filename = reader.filename; 91 } 92 93 /// Constructs new manager with given index file. 94 /// This allows to do random-access interval queries. 95 /// 96 /// Params: 97 /// filename = location of BAM file 98 /// bai = index file 99 this(string filename, ref BaiFile bai) { 100 _filename = filename; 101 _bai = bai; 102 _found_index_file = true; 103 } 104 105 /// ditto 106 this(BamReader reader, ref BaiFile bai) { 107 _reader = reader; 108 _filename = reader.filename; 109 _bai = bai; 110 _found_index_file = true; 111 } 112 113 /// If file ends with EOF block, returns virtual offset of the start of EOF block. 114 /// Otherwise, returns virtual offset of the physical end of file. 115 VirtualOffset eofVirtualOffset() const { 116 ulong file_offset = std.file.getSize(_filename); 117 if (hasEofBlock()) { 118 return VirtualOffset(file_offset - BAM_EOF.length, 0); 119 } else { 120 return VirtualOffset(file_offset, 0); 121 } 122 } 123 124 /// Returns true if the file ends with EOF block, and false otherwise. 125 bool hasEofBlock() const { 126 auto _stream = new bio.core.utils.stream.File(_filename); 127 if (_stream.size < BAM_EOF.length) { 128 return false; 129 } 130 131 ubyte[BAM_EOF.length] buf; 132 _stream.seekEnd(-cast(int)BAM_EOF.length); 133 134 _stream.readExact(&buf, BAM_EOF.length); 135 if (buf != BAM_EOF) { 136 return false; 137 } 138 139 return true; 140 } 141 142 /// Get new BgzfInputStream starting from specified virtual offset. 143 BgzfInputStream createStreamStartingFrom(VirtualOffset offset) 144 { 145 auto _stream = new bio.core.utils.stream.File(_filename); 146 auto _compressed_stream = new EndianStream(_stream, Endian.littleEndian); 147 _compressed_stream.seekSet(cast(size_t)(offset.coffset)); 148 auto supplier = new StreamSupplier(_compressed_stream, offset.uoffset); 149 auto bgzf_stream = new BgzfInputStream(supplier, _task_pool, _cache); 150 return bgzf_stream; 151 } 152 153 /// Get single read at a given virtual offset. 154 /// Every time new stream is used. 155 BamRead getReadAt(VirtualOffset offset) { 156 auto stream = createStreamStartingFrom(offset); 157 158 bool old_mode = _reader._seqprocmode; 159 _reader._seqprocmode = true; 160 auto read = bamReadRange(stream, _reader).front.dup; 161 _reader._seqprocmode = old_mode; 162 return read; 163 } 164 165 /// Get BGZF block at a given offset. 166 BgzfBlock getBgzfBlockAt(ulong offset) { 167 auto fstream = new bio.core.utils.stream.File(_filename); 168 auto stream = new EndianStream(fstream, Endian.littleEndian); 169 stream.seekSet(offset); 170 BgzfBlock block = void; 171 ubyte[BGZF_MAX_BLOCK_SIZE] buf = void; 172 fillBgzfBufferFromStream(stream, true, &block, buf.ptr); 173 block._buffer = block._buffer.dup; 174 return block; 175 } 176 177 /// Get reads between two virtual offsets. First virtual offset must point 178 /// to a start of an alignment record. 179 auto getReadsBetween(VirtualOffset from, VirtualOffset to) { 180 auto stream = createStreamStartingFrom(from); 181 182 static bool offsetTooBig(BamReadBlock record, VirtualOffset vo) { 183 return record.end_virtual_offset > vo; 184 } 185 186 return until!offsetTooBig(bamReadRange!withOffsets(stream, _reader), to); 187 } 188 189 bool found_index_file() @property const { 190 return _found_index_file; 191 } 192 private bool _found_index_file = false; // overwritten in constructor if filename is provided 193 194 /// BAI file 195 ref const(BaiFile) getBai() const { 196 enforce(found_index_file, "BAM index file (.bai) must be provided"); 197 return _bai; 198 } 199 200 private void checkIndexExistence() { 201 enforce(found_index_file, "BAM index file (.bai) must be provided"); 202 } 203 204 private void checkRefId(uint ref_id) { 205 enforce(ref_id < _bai.indices.length, "Invalid reference sequence index"); 206 } 207 208 private void appendChunks(ref Chunk[] chunks, Bin bin, VirtualOffset min_offset) { 209 foreach (chunk; bin.chunks) { 210 if (chunk.end > min_offset) { 211 chunks ~= chunk; 212 213 // optimization 214 if (chunks[$-1].beg < min_offset) 215 chunks[$-1].beg = min_offset; 216 } 217 } 218 } 219 220 /// Get BAI chunks containing all alignment records overlapping the region 221 Chunk[] getChunks(BamRegion region) { 222 auto ref_id = region.ref_id; 223 auto beg = region.start; 224 auto end = region.end; 225 checkIndexExistence(); 226 checkRefId(ref_id); 227 228 // Select all bins that overlap with [beg, end). 229 // Then from such bins select all chunks that end to the right of min_offset. 230 // Sort these chunks by leftmost coordinate and remove all overlaps. 231 232 auto min_offset = _bai.indices[ref_id].getMinimumOffset(beg); 233 234 Chunk[] bai_chunks; 235 foreach (b; _bai.indices[ref_id].bins) { 236 if (!b.canOverlapWith(beg, end)) 237 continue; 238 appendChunks(bai_chunks, b, min_offset); 239 } 240 241 sort(bai_chunks); 242 return bai_chunks.nonOverlappingChunks().array(); 243 } 244 245 // regions should be from the same reference sequence 246 private Chunk[] getGroupChunks(BamRegion[] regions) { 247 auto bitset = Array!bool(); 248 enforce(regions.length > 0); 249 bitset.length = BAI_MAX_BIN_ID; 250 bitset[0] = true; 251 foreach (region; regions) { 252 auto beg = region.start; 253 auto end = region.end; 254 int i = 0, k; 255 enforce(beg < end); 256 --end; 257 for (k = 1 + (beg>>26); k <= 1 + (end>>26); ++k) bitset[k] = true; 258 for (k = 9 + (beg>>23); k <= 9 + (end>>23); ++k) bitset[k] = true; 259 for (k = 73 + (beg>>20); k <= 73 + (end>>20); ++k) bitset[k] = true; 260 for (k = 585 + (beg>>17); k <= 585 + (end>>17); ++k) bitset[k] = true; 261 for (k = 4681 + (beg>>14); k <= 4681 + (end>>14); ++k) bitset[k] = true; 262 } 263 264 auto ref_id = regions.front.ref_id; 265 checkIndexExistence(); 266 checkRefId(ref_id); 267 268 // Select all bins that overlap with [beg, end). 269 // Then from such bins select all chunks that end to the right of min_offset. 270 // Sort these chunks by leftmost coordinate and remove all overlaps. 271 272 auto min_offset = _bai.indices[ref_id].getMinimumOffset(regions.front.start); 273 274 version(extraVerbose) { 275 import std.stdio; 276 stderr.writeln("min offset = ", min_offset); 277 } 278 279 Chunk[] bai_chunks; 280 auto bins = _bai.indices[ref_id].bins; 281 foreach (bin; bins) 282 if (bitset[bin.id]) 283 appendChunks(bai_chunks, bin, min_offset); 284 sort(bai_chunks); 285 286 version(extraVerbose) { 287 stderr.writeln("[chunks before normalization]"); 288 foreach(chunk; bai_chunks) 289 stderr.writeln(chunk.beg, " - ", chunk.end); 290 } 291 292 return bai_chunks.nonOverlappingChunks().array(); 293 } 294 295 private auto filteredReads(alias IteratePolicy)(BamRegion[] regions) { 296 auto chunks = getGroupChunks(regions); 297 version(extraVerbose) { 298 import std.stdio; 299 stderr.writeln("[chunks]"); 300 foreach (chunk; chunks) 301 stderr.writeln(chunk.beg, " - ", chunk.end); 302 } 303 auto reads = readsFromChunks!IteratePolicy(chunks); 304 return filterBamReads(reads, regions); 305 } 306 307 /// Fetch alignments with given reference sequence id, overlapping [beg..end) 308 auto getReads(alias IteratePolicy=withOffsets)(BamRegion region) 309 { 310 auto chunks = getChunks(region); 311 auto reads = readsFromChunks!IteratePolicy(chunks); 312 return filterBamReads(reads, [region]); 313 } 314 315 auto getReads(alias IteratePolicy=withOffsets)(BamRegion[] regions) { 316 auto sorted_regions = regions.sort(); 317 BamRegion[][] regions_by_ref; 318 // TODO: replace with groupBy once it's included into Phobos 319 uint last_ref_id = uint.max; 320 foreach (region; sorted_regions) { 321 if (region.ref_id == last_ref_id) { 322 regions_by_ref.back ~= region; 323 } else { 324 regions_by_ref ~= [region]; 325 last_ref_id = region.ref_id; 326 } 327 } 328 329 static ref auto regB(ref BamRegion region) { return region.start; } 330 static ref auto regE(ref BamRegion region) { return region.end; } 331 foreach (ref group; regions_by_ref) 332 group = nonOverlapping!(regB, regE)(group).array(); 333 334 return regions_by_ref.zip(repeat(this)) 335 .map!(gt => gt[1].filteredReads!IteratePolicy(gt[0]))() 336 .joiner(); 337 } 338 339 private: 340 auto readsFromChunks(alias IteratePolicy, R)(R chunks) { 341 auto fstream = new bio.core.utils.stream.File(_filename); 342 auto compressed_stream = new EndianStream(fstream, Endian.littleEndian); 343 auto supplier = new StreamChunksSupplier(compressed_stream, chunks); 344 auto stream = new BgzfInputStream(supplier, _task_pool, _cache); 345 return bamReadRange!IteratePolicy(stream, _reader); 346 } 347 348 string _filename; 349 BaiFile _bai; 350 BamReader _reader; 351 TaskPool _task_pool; 352 size_t _buffer_size; 353 354 BgzfBlockCache _cache; 355 356 TaskPool task_pool() @property { 357 if (_task_pool is null) 358 _task_pool = taskPool; 359 return _task_pool; 360 } 361 362 public: 363 364 static struct BamReadFilter(R) { 365 this(R r, BamRegion[] regions) { 366 _range = r; 367 _regions = regions; 368 enforce(regions.length > 0); 369 _region = _regions.front; 370 _ref_id = _region.ref_id; // assumed to be constant 371 findNext(); 372 } 373 374 bool empty() @property { 375 return _empty; 376 } 377 378 ElementType!R front() @property { 379 return _current_read; 380 } 381 382 void popFront() { 383 _range.popFront(); 384 findNext(); 385 } 386 387 private: 388 R _range; 389 uint _ref_id; 390 BamRegion _region; 391 BamRegion[] _regions; // non-overlapping and sorted 392 bool _empty; 393 ElementType!R _current_read; 394 395 void findNext() { 396 if (_regions.empty || _range.empty) { 397 _empty = true; 398 return; 399 } 400 401 while (!_range.empty) { 402 _current_read = _range.front; 403 404 // BamReads are sorted first by ref. ID. 405 auto current_ref_id = cast(uint)_current_read.ref_id; 406 // ref_id can't be -1 unless the index is fucked up 407 if (current_ref_id > _ref_id) { 408 // no more records for this _ref_id 409 _empty = true; 410 return; 411 } else if (current_ref_id < _ref_id) { 412 // skip reads referring to sequences 413 // with ID less than ours 414 _range.popFront(); 415 continue; 416 } 417 418 if (_current_read.position >= _region.end) { 419 // As reads are sorted by leftmost coordinate, 420 // all remaining alignments in _range 421 // will not overlap the current interval as well. 422 // 423 // [-----) 424 // . [-----------) 425 // . [---) 426 // . [-------) 427 // . [-) 428 // [beg ..... end) 429 _regions.popFront(); 430 // TODO: potentially binary search may be faster, 431 // but it needs to be checked 432 if (_regions.empty) { 433 _empty = true; 434 return; 435 } else { 436 _region = _regions.front; 437 continue; 438 } 439 } 440 441 if (_current_read.position > _region.start) { 442 return; // definitely overlaps 443 } 444 445 if (_current_read.position + 446 _current_read.basesCovered() <= _region.start) 447 { 448 /// ends before beginning of the region 449 /// [-----------) 450 /// [beg .......... end) 451 _range.popFront(); 452 /// Zero-length reads are also considered non-overlapping, 453 /// so for consistency the inequality 12 lines above is strict. 454 } else { 455 return; /// _current_read overlaps the region 456 } 457 } 458 _empty = true; 459 } 460 } 461 462 // Get range of alignments sorted by leftmost coordinate, 463 // together with an interval [beg, end), 464 // and return another range of alignments which overlap the region. 465 static auto filterBamReads(R)(R r, BamRegion[] regions) 466 { 467 return BamReadFilter!R(r, regions); 468 } 469 }