1 /* 2 This file is part of BioD. 3 Copyright (C) 2012-2014 Artem Tarasov <lomereiter@gmail.com> 4 5 Permission is hereby granted, free of charge, to any person obtaining a 6 copy of this software and associated documentation files (the "Software"), 7 to deal in the Software without restriction, including without limitation 8 the rights to use, copy, modify, merge, publish, distribute, sublicense, 9 and/or sell copies of the Software, and to permit persons to whom the 10 Software is furnished to do so, subject to the following conditions: 11 12 The above copyright notice and this permission notice shall be included in 13 all copies or substantial portions of the Software. 14 15 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 16 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 17 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 18 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 19 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 20 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER 21 DEALINGS IN THE SOFTWARE. 22 23 */ 24 /** 25 Module for random access operations on BAM file. 26 */ 27 module bio.std.hts.bam.randomaccessmanager; 28 29 import bio.std.hts.bam.constants; 30 import bio.std.hts.bam.reader; 31 import bio.std.hts.bam.read; 32 import bio.std.hts.bam.readrange; 33 import bio.std.hts.bam.baifile; 34 import bio.std.hts.bam.region; 35 import bio.core.utils.algo; 36 37 import bio.core.bgzf.block; 38 import bio.core.bgzf.virtualoffset; 39 import bio.core.bgzf.inputstream; 40 import bio.core.bgzf.constants; 41 import bio.core.bgzf.chunk; 42 import bio.core.utils.range; 43 import bio.core.utils.stream; 44 45 import std.system; 46 import std.algorithm; 47 import std.array; 48 import std.range; 49 import std.traits; 50 import std.exception; 51 import std.container; 52 import std.parallelism; 53 static import std.file; 54 55 debug { 56 import std.stdio; 57 } 58 59 private { 60 auto nonOverlappingChunks(R)(R chunks) { 61 static ref auto chunkB(ref Chunk chunk) { return chunk.beg; } 62 static ref auto chunkE(ref Chunk chunk) { return chunk.end; } 63 return nonOverlapping!(chunkB, chunkE)(chunks); 64 } 65 } 66 67 /// Class which random access tasks are delegated to. 68 class RandomAccessManager { 69 70 // void setCache(BgzfBlockCache cache) { 71 // _cache = cache; 72 //} 73 74 void setTaskPool(TaskPool task_pool) { 75 _task_pool = task_pool; 76 } 77 78 void setBufferSize(size_t buffer_size) { 79 _buffer_size = buffer_size; 80 } 81 82 /// Constructs new manager for BAM file 83 this(string filename) { 84 _filename = filename; 85 } 86 87 /// ditto 88 this(BamReader reader) { 89 _reader = reader; 90 _filename = reader.filename; 91 } 92 93 /// Constructs new manager with given index file. 94 /// This allows to do random-access interval queries. 95 /// 96 /// Params: 97 /// filename = location of BAM file 98 /// bai = index file 99 this(string filename, ref BaiFile bai) { 100 _filename = filename; 101 _bai = bai; 102 _found_index_file = true; 103 } 104 105 /// ditto 106 this(BamReader reader, ref BaiFile bai) { 107 _reader = reader; 108 _filename = reader.filename; 109 _bai = bai; 110 _found_index_file = true; 111 } 112 113 /// If file ends with EOF block, returns virtual offset of the start of EOF block. 114 /// Otherwise, returns virtual offset of the physical end of file. 115 VirtualOffset eofVirtualOffset() const { 116 ulong file_offset = std.file.getSize(_filename); 117 if (hasEofBlock()) { 118 return VirtualOffset(file_offset - BAM_EOF.length, 0); 119 } else { 120 return VirtualOffset(file_offset, 0); 121 } 122 } 123 124 /// Returns true if the file ends with EOF block, and false otherwise. 125 bool hasEofBlock() const { 126 auto _stream = new bio.core.utils.stream.File(_filename); 127 if (_stream.size < BAM_EOF.length) { 128 return false; 129 } 130 131 ubyte[BAM_EOF.length] buf; 132 _stream.seekEnd(-cast(int)BAM_EOF.length); 133 134 _stream.readExact(&buf, BAM_EOF.length); 135 if (buf != BAM_EOF) { 136 return false; 137 } 138 139 return true; 140 } 141 142 /// Get new BgzfInputStream starting from specified virtual offset. 143 BgzfInputStream createStreamStartingFrom(VirtualOffset offset) 144 { 145 auto _stream = new bio.core.utils.stream.File(_filename); 146 auto _compressed_stream = new EndianStream(_stream, Endian.littleEndian); 147 _compressed_stream.seekSet(cast(size_t)(offset.coffset)); 148 auto supplier = new StreamSupplier(_compressed_stream, offset.uoffset); 149 // auto bgzf_stream = new BgzfInputStream(supplier, _task_pool, _cache); 150 auto bgzf_stream = new BgzfInputStream(supplier, _task_pool); 151 return bgzf_stream; 152 } 153 154 /// Get single read at a given virtual offset. 155 /// Every time new stream is used. 156 BamRead getReadAt(VirtualOffset offset) { 157 auto stream = createStreamStartingFrom(offset); 158 159 bool old_mode = _reader._seqprocmode; 160 _reader._seqprocmode = true; 161 auto read = bamReadRange(stream, _reader).front.dup; 162 _reader._seqprocmode = old_mode; 163 return read; 164 } 165 166 /// Get BGZF block at a given offset. 167 BgzfBlock getBgzfBlockAt(ulong offset) { 168 auto fstream = new bio.core.utils.stream.File(_filename); 169 auto stream = new EndianStream(fstream, Endian.littleEndian); 170 stream.seekSet(offset); 171 BgzfBlock block = void; 172 ubyte[BGZF_MAX_BLOCK_SIZE] buf = void; 173 fillBgzfBufferFromStream(stream, true, &block, buf.ptr); 174 block._buffer = block._buffer.dup; 175 return block; 176 } 177 178 /// Get reads between two virtual offsets. First virtual offset must point 179 /// to a start of an alignment record. 180 auto getReadsBetween(VirtualOffset from, VirtualOffset to) { 181 auto stream = createStreamStartingFrom(from); 182 183 static bool offsetTooBig(BamReadBlock record, VirtualOffset vo) { 184 return record.end_virtual_offset > vo; 185 } 186 187 return until!offsetTooBig(bamReadRange!withOffsets(stream, _reader), to); 188 } 189 190 bool found_index_file() @property const { 191 return _found_index_file; 192 } 193 private bool _found_index_file = false; // overwritten in constructor if filename is provided 194 195 /// BAI file 196 ref const(BaiFile) getBai() const { 197 enforce(found_index_file, "BAM index file (.bai) must be provided"); 198 return _bai; 199 } 200 201 private void checkIndexExistence() { 202 enforce(found_index_file, "BAM index file (.bai) must be provided"); 203 } 204 205 private void checkRefId(uint ref_id) { 206 enforce(ref_id < _bai.indices.length, "Invalid reference sequence index"); 207 } 208 209 private void appendChunks(ref Chunk[] chunks, Bin bin, VirtualOffset min_offset) { 210 foreach (chunk; bin.chunks) { 211 if (chunk.end > min_offset) { 212 chunks ~= chunk; 213 214 // optimization 215 if (chunks[$-1].beg < min_offset) 216 chunks[$-1].beg = min_offset; 217 } 218 } 219 } 220 221 /// Get BAI chunks containing all alignment records overlapping the region 222 Chunk[] getChunks(BamRegion region) { 223 auto ref_id = region.ref_id; 224 auto beg = region.start; 225 auto end = region.end; 226 checkIndexExistence(); 227 checkRefId(ref_id); 228 229 // Select all bins that overlap with [beg, end). 230 // Then from such bins select all chunks that end to the right of min_offset. 231 // Sort these chunks by leftmost coordinate and remove all overlaps. 232 233 auto min_offset = _bai.indices[ref_id].getMinimumOffset(beg); 234 235 Chunk[] bai_chunks; 236 foreach (b; _bai.indices[ref_id].bins) { 237 if (!b.canOverlapWith(beg, end)) 238 continue; 239 appendChunks(bai_chunks, b, min_offset); 240 } 241 242 sort(bai_chunks); 243 return bai_chunks.nonOverlappingChunks().array(); 244 } 245 246 // regions should be from the same reference sequence 247 private Chunk[] getGroupChunks(BamRegion[] regions) { 248 auto bitset = Array!bool(); 249 enforce(regions.length > 0); 250 bitset.length = BAI_MAX_BIN_ID; 251 bitset[0] = true; 252 foreach (region; regions) { 253 auto beg = region.start; 254 auto end = region.end; 255 int i = 0, k; 256 enforce(beg < end); 257 --end; 258 for (k = 1 + (beg>>26); k <= 1 + (end>>26); ++k) bitset[k] = true; 259 for (k = 9 + (beg>>23); k <= 9 + (end>>23); ++k) bitset[k] = true; 260 for (k = 73 + (beg>>20); k <= 73 + (end>>20); ++k) bitset[k] = true; 261 for (k = 585 + (beg>>17); k <= 585 + (end>>17); ++k) bitset[k] = true; 262 for (k = 4681 + (beg>>14); k <= 4681 + (end>>14); ++k) bitset[k] = true; 263 } 264 265 auto ref_id = regions.front.ref_id; 266 checkIndexExistence(); 267 checkRefId(ref_id); 268 269 // Select all bins that overlap with [beg, end). 270 // Then from such bins select all chunks that end to the right of min_offset. 271 // Sort these chunks by leftmost coordinate and remove all overlaps. 272 273 auto min_offset = _bai.indices[ref_id].getMinimumOffset(regions.front.start); 274 275 version(extraVerbose) { 276 import std.stdio; 277 stderr.writeln("min offset = ", min_offset); 278 } 279 280 Chunk[] bai_chunks; 281 auto bins = _bai.indices[ref_id].bins; 282 foreach (bin; bins) 283 if (bitset[bin.id]) 284 appendChunks(bai_chunks, bin, min_offset); 285 sort(bai_chunks); 286 287 version(extraVerbose) { 288 stderr.writeln("[chunks before normalization]"); 289 foreach(chunk; bai_chunks) 290 stderr.writeln(chunk.beg, " - ", chunk.end); 291 } 292 293 return bai_chunks.nonOverlappingChunks().array(); 294 } 295 296 private auto filteredReads(alias IteratePolicy)(BamRegion[] regions) { 297 auto chunks = getGroupChunks(regions); 298 version(extraVerbose) { 299 import std.stdio; 300 stderr.writeln("[chunks]"); 301 foreach (chunk; chunks) 302 stderr.writeln(chunk.beg, " - ", chunk.end); 303 } 304 auto reads = readsFromChunks!IteratePolicy(chunks); 305 return filterBamReads(reads, regions); 306 } 307 308 /// Fetch alignments with given reference sequence id, overlapping [beg..end) 309 auto getReads(alias IteratePolicy=withOffsets)(BamRegion region) 310 { 311 auto chunks = getChunks(region); 312 auto reads = readsFromChunks!IteratePolicy(chunks); 313 return filterBamReads(reads, [region]); 314 } 315 316 auto getReads(alias IteratePolicy=withOffsets)(BamRegion[] regions) { 317 auto sorted_regions = regions.sort(); 318 BamRegion[][] regions_by_ref; 319 // TODO: replace with groupBy once it's included into Phobos 320 uint last_ref_id = uint.max; 321 foreach (region; sorted_regions) { 322 if (region.ref_id == last_ref_id) { 323 regions_by_ref.back ~= region; 324 } else { 325 regions_by_ref ~= [region]; 326 last_ref_id = region.ref_id; 327 } 328 } 329 330 static ref auto regB(ref BamRegion region) { return region.start; } 331 static ref auto regE(ref BamRegion region) { return region.end; } 332 foreach (ref group; regions_by_ref) 333 group = nonOverlapping!(regB, regE)(group).array(); 334 335 return regions_by_ref.zip(repeat(this)) 336 .map!(gt => gt[1].filteredReads!IteratePolicy(gt[0]))() 337 .joiner(); 338 } 339 340 private: 341 auto readsFromChunks(alias IteratePolicy, R)(R chunks) { 342 auto fstream = new bio.core.utils.stream.File(_filename); 343 auto compressed_stream = new EndianStream(fstream, Endian.littleEndian); 344 auto supplier = new StreamChunksSupplier(compressed_stream, chunks); 345 // auto stream = new BgzfInputStream(supplier, _task_pool, _cache); 346 auto stream = new BgzfInputStream(supplier, _task_pool); 347 return bamReadRange!IteratePolicy(stream, _reader); 348 } 349 350 string _filename; 351 BaiFile _bai; 352 BamReader _reader; 353 TaskPool _task_pool; 354 size_t _buffer_size; 355 356 // BgzfBlockCache _cache; 357 358 TaskPool task_pool() @property { 359 if (_task_pool is null) 360 _task_pool = taskPool; 361 return _task_pool; 362 } 363 364 public: 365 366 static struct BamReadFilter(R) { 367 this(R r, BamRegion[] regions) { 368 _range = r; 369 _regions = regions; 370 enforce(regions.length > 0); 371 _region = _regions.front; 372 _ref_id = _region.ref_id; // assumed to be constant 373 findNext(); 374 } 375 376 bool empty() @property { 377 return _empty; 378 } 379 380 ElementType!R front() @property { 381 return _current_read; 382 } 383 384 void popFront() { 385 _range.popFront(); 386 findNext(); 387 } 388 389 private: 390 R _range; 391 uint _ref_id; 392 BamRegion _region; 393 BamRegion[] _regions; // non-overlapping and sorted 394 bool _empty; 395 ElementType!R _current_read; 396 397 void findNext() { 398 if (_regions.empty || _range.empty) { 399 _empty = true; 400 return; 401 } 402 403 while (!_range.empty) { 404 _current_read = _range.front; 405 406 // BamReads are sorted first by ref. ID. 407 auto current_ref_id = cast(uint)_current_read.ref_id; 408 // ref_id can't be -1 unless the index is fucked up 409 if (current_ref_id > _ref_id) { 410 // no more records for this _ref_id 411 _empty = true; 412 return; 413 } else if (current_ref_id < _ref_id) { 414 // skip reads referring to sequences 415 // with ID less than ours 416 _range.popFront(); 417 continue; 418 } 419 420 if (_current_read.position >= _region.end) { 421 // As reads are sorted by leftmost coordinate, 422 // all remaining alignments in _range 423 // will not overlap the current interval as well. 424 // 425 // [-----) 426 // . [-----------) 427 // . [---) 428 // . [-------) 429 // . [-) 430 // [beg ..... end) 431 _regions.popFront(); 432 // TODO: potentially binary search may be faster, 433 // but it needs to be checked 434 if (_regions.empty) { 435 _empty = true; 436 return; 437 } else { 438 _region = _regions.front; 439 continue; 440 } 441 } 442 443 if (_current_read.position > _region.start) { 444 return; // definitely overlaps 445 } 446 447 if (_current_read.position + 448 _current_read.basesCovered() <= _region.start) 449 { 450 /// ends before beginning of the region 451 /// [-----------) 452 /// [beg .......... end) 453 _range.popFront(); 454 /// Zero-length reads are also considered non-overlapping, 455 /// so for consistency the inequality 12 lines above is strict. 456 } else { 457 return; /// _current_read overlaps the region 458 } 459 } 460 _empty = true; 461 } 462 } 463 464 // Get range of alignments sorted by leftmost coordinate, 465 // together with an interval [beg, end), 466 // and return another range of alignments which overlap the region. 467 static auto filterBamReads(R)(R r, BamRegion[] regions) 468 { 469 return BamReadFilter!R(r, regions); 470 } 471 }