1 /* 2 This file is part of Sambamba. 3 Copyright (C) 2017 Pjotr Prins <pjotr.prins@thebird.nl> 4 5 Sambamba is free software; you can redistribute it and/or modify 6 it under the terms of the GNU General Public License as published 7 by the Free Software Foundation; either version 2 of the License, 8 or (at your option) any later version. 9 10 Sambamba is distributed in the hope that it will be useful, but 11 WITHOUT ANY WARRANTY; without even the implied warranty of 12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13 General Public License for more details. 14 15 You should have received a copy of the GNU General Public License 16 along with this program; if not, write to the Free Software 17 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 18 02111-1307 USA 19 20 */ 21 22 module bio2.bgzf; 23 24 /** 25 26 This is a new version of sambamba bgzf (under development). 27 28 The new version is a prototype for new sambamba architecture using 29 canonical D language features, including immutable and improved 30 laziness and a more functional programming style. It should provide 31 improved performance and minimize RAM use, as well as better 32 composability. 33 34 Authors: Pjotr Prins 35 36 */ 37 38 import core.stdc.string : memcpy; 39 40 import std.bitmanip; 41 import std.conv; 42 import std.exception; 43 import std.file; 44 import std.stdio; 45 import std.typecons; 46 import std.zlib : calc_crc32 = crc32, ZlibException; 47 48 import bio.bam.constants; 49 import bio.core.bgzf.block; 50 import bio.core.bgzf.constants; 51 import bio.core.utils.zlib : inflateInit2, inflate, inflateEnd, Z_OK, Z_FINISH, Z_STREAM_END; 52 53 import bio2.constants; 54 55 class BgzfException : Exception { 56 this(string msg) { super(msg); } 57 } 58 59 alias Nullable!ulong FilePos; 60 alias immutable(uint) CRC32; 61 62 alias BGZF_MAX_BLOCK_SIZE BLOCK_SIZE; 63 64 alias ubyte[BLOCK_SIZE] BlockBuffer; 65 66 @property ubyte read_ubyte(File f) { 67 ubyte[1] ubyte1; // read buffer 68 immutable ubyte[1] buf = f.rawRead(ubyte1); 69 return buf[0]; 70 } 71 72 @property ushort read_ushort(File f) { 73 ubyte[2] ubyte2; // read buffer 74 immutable ubyte[2] buf = f.rawRead(ubyte2); 75 return littleEndianToNative!ushort(buf); 76 } 77 @property uint read_uint(File f) { 78 ubyte[4] ubyte4; // read buffer 79 immutable ubyte[4] buf = f.rawRead(ubyte4); 80 return littleEndianToNative!uint(buf); 81 } 82 83 /** 84 Uncompress a zlib buffer (without header) 85 */ 86 ubyte[] deflate(ubyte[] uncompressed_buf, const ubyte[] compressed_buf, size_t uncompressed_size, CRC32 crc32) { 87 assert(uncompressed_buf.length == BLOCK_SIZE); 88 bio.core.utils.zlib.z_stream zs; 89 zs.next_in = cast(typeof(zs.next_in))compressed_buf; 90 zs.avail_in = to!uint(compressed_buf.length); 91 92 auto err = inflateInit2(&zs, /* winbits = */-15); 93 if (err != Z_OK) throw new ZlibException(err); 94 95 zs.next_out = cast(typeof(zs.next_out))uncompressed_buf.ptr; 96 zs.avail_out = cast(int)uncompressed_buf.length; 97 98 scope(exit) { inflateEnd(&zs); } 99 err = inflate(&zs, Z_FINISH); 100 if (err != Z_STREAM_END) throw new ZlibException(err); 101 102 assert(zs.total_out == uncompressed_size); 103 uncompressed_buf.length = uncompressed_size; 104 assert(crc32 == calc_crc32(0, uncompressed_buf[])); 105 106 return uncompressed_buf; 107 } 108 109 /** 110 BgzfReader is designed to run on a single thread. All it does is 111 fetch block headers and data, so the thread should easily keep up 112 with IO. All data processing is happening lazily in other threads. 113 */ 114 struct BgzfReader { 115 File f; 116 FilePos report_fpos; // for error handler - assumes one thread! 117 118 this(string fn) { 119 enforce(fn.isFile); 120 f = File(fn,"r"); 121 } 122 123 @disable this(this); // BgzfReader does not have copy semantics; 124 125 void throwBgzfException(string msg, string file = __FILE__, size_t line = __LINE__) { 126 throw new BgzfException("Error reading BGZF block starting in "~f.name ~" @ " ~ 127 to!string(report_fpos) ~ " (" ~ file ~ ":" ~ to!string(line) ~ "): " ~ msg); 128 } 129 130 void enforce1(bool check, lazy string msg, string file = __FILE__, int line = __LINE__) { 131 if (!check) 132 throwBgzfException(msg,file,line); 133 } 134 135 /** 136 Reads the block header and returns the contained compressed data 137 size with the file pointer positioned at the associated 138 compressed data. 139 */ 140 size_t read_block_header() { 141 ubyte[4] ubyte4; 142 auto magic = f.rawRead(ubyte4); 143 enforce1(magic.length == 4, "Premature end of file"); 144 enforce1(magic[0..4] == BGZF_MAGIC,"Invalid file format: expected bgzf magic number"); 145 ubyte[uint.sizeof + 2 * ubyte.sizeof] skip; 146 f.rawRead(skip); // skip gzip info 147 ushort gzip_extra_length = f.read_ushort(); 148 immutable fpos1 = f.tell; 149 size_t bsize = 0; 150 while (f.tell < fpos1 + gzip_extra_length) { 151 immutable subfield_id1 = f.read_ubyte(); 152 immutable subfield_id2 = f.read_ubyte(); 153 immutable subfield_len = f.read_ushort(); 154 if (subfield_id1 == BAM_SI1 && subfield_id2 == BAM_SI2) { 155 // BC identifier 156 enforce(gzip_extra_length == 6); 157 // FIXME: always picks first BC block 158 bsize = 1+f.read_ushort(); // BLOCK size 159 enforce1(subfield_len == 2, "BC subfield len should be 2"); 160 break; 161 } 162 else { 163 f.seek(subfield_len,SEEK_CUR); 164 } 165 enforce1(bsize!=0,"block size not found"); 166 f.seek(fpos1+gzip_extra_length); // skip any extra subfields - note we don't check for second BC 167 } 168 immutable compressed_size = bsize - 1 - gzip_extra_length - 19; 169 enforce1(compressed_size <= BLOCK_SIZE, "compressed size larger than allowed"); 170 171 // stderr.writeln("[compressed] size ", compressed_size, " bytes starting block @ ", report_fpos); 172 return compressed_size; 173 } 174 175 /** 176 Fetch the compressed data part of the block and return it with 177 the uncompressed size and CRC32. The file pointer is assumed to 178 be at the start of the compressed data and will be at the end of 179 that section after. 180 */ 181 Tuple!(ubyte[],immutable(uint),CRC32) read_compressed_data(ubyte[] buffer) { 182 auto compressed_buf = f.rawRead(buffer); 183 184 immutable CRC32 crc32 = f.read_uint(); 185 immutable uncompressed_size = f.read_uint(); 186 // stderr.writeln("[uncompressed] size ",uncompressed_size); 187 return tuple(compressed_buf,uncompressed_size,crc32); 188 } 189 190 /** 191 * Returns new tuple of the new file position, the compressed buffer and 192 * the CRC32 o the uncompressed data. file pos is NULL when done 193 */ 194 Tuple!(FilePos,ubyte[],size_t,CRC32) read_compressed_block(FilePos fpos, ubyte[] buffer) { 195 immutable start_offset = fpos; 196 try { 197 if (fpos.isNull) throwBgzfException("Trying to read past eof"); 198 report_fpos = fpos; 199 f.seek(fpos); 200 immutable compressed_size = read_block_header(); 201 auto ret = read_compressed_data(buffer[0..compressed_size]); 202 auto compressed_buf = ret[0]; 203 immutable uncompressed_size = ret[1]; 204 immutable crc32 = ret[2]; 205 206 if (uncompressed_size == 0) { 207 // check for eof marker, rereading block header 208 auto lastpos = f.tell(); 209 f.seek(start_offset); 210 ubyte[BGZF_EOF.length] buf; 211 f.rawRead(buf); 212 f.seek(lastpos); 213 if (buf == BGZF_EOF) 214 return tuple(FilePos(),compressed_buf,cast(ulong)0,crc32); // sets fpos to null 215 } 216 return tuple(FilePos(f.tell()),compressed_buf,cast(size_t)uncompressed_size,crc32); 217 } catch (Exception e) { throwBgzfException(e.msg,e.file,e.line); } 218 assert(0); // never reached 219 } 220 } 221 222 /** 223 Simple block iterator 224 */ 225 struct BgzfBlocks { 226 BgzfReader bgzf; 227 228 this(string fn) { 229 bgzf = BgzfReader(fn); 230 } 231 232 @disable this(this); // disable copy semantics; 233 234 int opApply(scope int delegate(ubyte[]) dg) { 235 FilePos fpos = 0; 236 237 try { 238 while (!fpos.isNull) { 239 BlockBuffer stack_buffer; 240 auto res = bgzf.read_compressed_block(fpos,stack_buffer); 241 fpos = res[0]; // point fpos to next block 242 if (fpos.isNull) break; 243 244 auto compressed_buf = res[1]; // same as stack_buffer 245 auto uncompressed_size = res[2]; 246 auto crc32 = res[3]; 247 BlockBuffer uncompressed_buf; 248 // call delegated function with new block 249 dg(deflate(uncompressed_buf,compressed_buf,uncompressed_size,crc32)); 250 } 251 } catch (Exception e) { bgzf.throwBgzfException(e.msg,e.file,e.line); } 252 return 0; 253 } 254 } 255 256 257 Tuple!(size_t,FilePos) read_blockx(ref BgzfReader bgzf, FilePos fpos, ref ubyte[] uncompressed_buf) { 258 BlockBuffer compressed_buf; 259 auto res = bgzf.read_compressed_block(fpos,compressed_buf); 260 fpos = res[0]; // point fpos to next block 261 if (fpos.isNull) return tuple(cast(size_t)0,fpos); 262 auto data = res[1]; 263 264 assert(data.ptr == compressed_buf.ptr); 265 size_t uncompressed_size = res[2]; 266 auto crc32 = res[3]; 267 deflate(uncompressed_buf,compressed_buf,uncompressed_size,crc32); 268 return tuple(uncompressed_size,fpos); 269 } 270 271 import std.parallelism; 272 273 int kick_off_reading_block_ahead(ubyte[] uncompressed_buf, ubyte[] compressed_buf, size_t uncompressed_size, CRC32 crc32) { 274 // writeln("HEY " ~ to!string(uncompressed_size)); 275 deflate(uncompressed_buf,compressed_buf,uncompressed_size,crc32); 276 return -1; 277 } 278 279 /** 280 */ 281 struct BlockReadAhead { 282 bool task_running = false, we_have_a_task = false; 283 Task!(kick_off_reading_block_ahead, ubyte[], ubyte[], size_t, CRC32)* t; 284 FilePos fpos2; 285 size_t uncompressed_size2 = 0; 286 BlockBuffer compressed_buf2; 287 BlockBuffer uncompressed_buf2; 288 289 private void read_next_block() { 290 } 291 292 private void add_deflate_task() { 293 } 294 295 private void copy_deflated_buffer() { 296 } 297 298 void setup_block_reader() { 299 read_next_block(); 300 add_deflate_task(); 301 throw new Exception("NYI"); 302 } 303 304 Tuple!(size_t,FilePos) read_block(ref BgzfReader bgzf, FilePos fpos, ref ubyte[] uncompressed_buf) { 305 assert(we_have_a_task); 306 copy_deflated_buffer(); 307 read_next_block(); 308 add_deflate_task(); 309 // return 310 311 if (task_running) { 312 int res = t.yieldForce; 313 // writeln(res); 314 task_running = false; 315 memcpy(uncompressed_buf.ptr,compressed_buf2.ptr,uncompressed_size2); 316 return tuple(uncompressed_size2, fpos2); 317 } 318 else { 319 BlockBuffer compressed_buf; 320 auto res = bgzf.read_compressed_block(fpos,compressed_buf); 321 fpos = res[0]; // point fpos to next block 322 if (fpos.isNull) return tuple(cast(size_t)0,fpos); 323 auto data = res[1]; 324 assert(data.ptr == compressed_buf.ptr); 325 size_t uncompressed_size = res[2]; 326 auto crc32 = res[3]; 327 328 deflate(uncompressed_buf,compressed_buf,uncompressed_size,crc32); 329 330 // now set up a new buffer 331 auto res2 = bgzf.read_compressed_block(fpos,compressed_buf2); 332 fpos2 = res[0]; // point fpos to next block 333 if (!fpos2.isNull) { 334 auto data2 = res2[1]; 335 uncompressed_size2 = res2[2]; 336 t = task!kick_off_reading_block_ahead(cast(ubyte[])uncompressed_buf2,cast(ubyte[])compressed_buf2,uncompressed_size2,res2[3]); 337 t.executeInNewThread(); 338 task_running = true; 339 } 340 return tuple(uncompressed_size,fpos); 341 } 342 } 343 } 344 345 /** 346 */ 347 struct BlockReadUnbuffered { 348 349 void setup_block_reader() { 350 } 351 352 Tuple!(ubyte[], size_t, FilePos) read_block(ref BgzfReader bgzf, in FilePos fpos, ubyte[] uncompressed_buf) { 353 BlockBuffer compressed_buf; 354 auto res = bgzf.read_compressed_block(fpos,compressed_buf); 355 auto fpos2 = res[0]; // point fpos to next block 356 if (fpos.isNull) return tuple(uncompressed_buf,cast(size_t)0,fpos2); 357 auto data = res[1]; 358 assert(data.ptr == compressed_buf.ptr); 359 size_t uncompressed_size = res[2]; 360 auto crc32 = res[3]; 361 362 auto buf = deflate(uncompressed_buf,compressed_buf,uncompressed_size,crc32); 363 assert(buf.ptr == uncompressed_buf.ptr); 364 return tuple(uncompressed_buf,uncompressed_size,fpos2); 365 } 366 } 367 368 /** 369 Streams bgzf data and fetch items by unit or buffer. These can go beyond 370 the size of individual blocks(!) 371 */ 372 373 struct BgzfStream { 374 BgzfReader bgzf; 375 FilePos fpos; // track file position 376 ubyte[] uncompressed_buf; // current data buffer 377 size_t uncompressed_size; // current data buffer size 378 Nullable!int block_pos; // position in block 379 BlockReadUnbuffered blockread; 380 381 this(string fn) { 382 bgzf = BgzfReader(fn); 383 uncompressed_buf = new ubyte[BLOCK_SIZE]; 384 fpos = 0; 385 } 386 387 @disable this(this); // disable copy semantics; 388 389 bool eof() { 390 return fpos.isNull; 391 } 392 393 /** 394 Fetch data into buffer. The size of the buffer can be larger than 395 one or more multiple blocks 396 */ 397 ubyte[] fetch(ubyte[] buffer) { 398 if (block_pos.isNull) { 399 blockread.setup_block_reader(); 400 auto res = blockread.read_block(bgzf,fpos,uncompressed_buf); // read first block 401 assert(res[0].ptr == uncompressed_buf.ptr); 402 uncompressed_size = res[1]; 403 fpos = res[2]; 404 block_pos = 0; 405 } 406 407 immutable buffer_length = buffer.length; 408 size_t buffer_pos = 0; 409 size_t remaining = buffer_length; 410 411 while (remaining > 0) { 412 if (block_pos + remaining < uncompressed_size) { 413 // full copy 414 assert(buffer_pos + remaining == buffer_length); 415 memcpy(buffer[buffer_pos..buffer_pos+remaining].ptr,uncompressed_buf[block_pos..block_pos+remaining].ptr,remaining); 416 block_pos += remaining; 417 remaining = 0; 418 } 419 else { 420 // read tail of buffer 421 immutable tail = uncompressed_size - block_pos; 422 memcpy(buffer[buffer_pos..buffer_pos+tail].ptr,uncompressed_buf[block_pos..uncompressed_size].ptr,tail); 423 buffer_pos += tail; 424 remaining -= tail; 425 auto res = blockread.read_block(bgzf,fpos,uncompressed_buf); 426 assert(res[0].ptr == uncompressed_buf.ptr); 427 uncompressed_size = res[1]; 428 fpos = res[2]; 429 block_pos = 0; 430 } 431 } 432 return buffer; 433 } 434 435 int read(T)() { // for integers 436 ubyte[T.sizeof] buf; 437 auto b = fetch(buf); 438 return b.read!(T,Endian.littleEndian)(); 439 } 440 441 string read(T)(size_t len) { 442 ubyte[] buf = new ubyte[len]; // heap allocation 443 fetch(buf); 444 return cast(T)buf; 445 } 446 447 T[] read(T)(T[] buffer) { return cast(T[])fetch(cast(ubyte[])buffer); }; 448 449 450 }