1 /* 2 This file is part of Sambamba. 3 Copyright (C) 2017 Pjotr Prins <pjotr.prins@thebird.nl> 4 5 Sambamba is free software; you can redistribute it and/or modify 6 it under the terms of the GNU General Public License as published 7 by the Free Software Foundation; either version 2 of the License, 8 or (at your option) any later version. 9 10 Sambamba is distributed in the hope that it will be useful, but 11 WITHOUT ANY WARRANTY; without even the implied warranty of 12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13 General Public License for more details. 14 15 You should have received a copy of the GNU General Public License 16 along with this program; if not, write to the Free Software 17 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 18 02111-1307 USA 19 20 */ 21 22 module bio.std.experimental.hts.bgzf; 23 24 /** 25 26 This is a new version of sambamba bgzf (under development). Bgzf is 27 a blocked version of the ubiquous gzip format. By making it blocked 28 it allows for seeking in gz files. Note that without seeking it 29 can take standard gz files too. 30 31 The new version is a prototype for new sambamba architecture using 32 canonical D language features, including immutable and improved 33 laziness and a more functional programming style. It should provide 34 improved performance and minimize RAM use, as well as better 35 composability. 36 37 Authors: Pjotr Prins 38 39 */ 40 41 import core.stdc..string : memcpy; 42 43 import std.bitmanip; 44 import std.conv; 45 import std.exception; 46 import std.file; 47 import std.stdio; 48 import std.typecons; 49 import std.zlib : calc_crc32 = crc32, ZlibException; 50 51 import bio.std.hts.bam.constants; 52 import bio.core.bgzf.block; 53 import bio.core.bgzf.constants; 54 import bio.core.utils.zlib : inflateInit2, inflate, inflateEnd, Z_OK, Z_FINISH, Z_STREAM_END; 55 56 import bio.std.experimental.hts.constants; 57 58 class BgzfException : Exception { 59 this(string msg) { super(msg); } 60 } 61 62 alias Nullable!ulong FilePos; 63 alias immutable(uint) CRC32; 64 65 alias BGZF_MAX_BLOCK_SIZE BLOCK_SIZE; 66 67 alias ubyte[BLOCK_SIZE] BlockBuffer; 68 69 @property ubyte read_ubyte(File f) { 70 ubyte[1] ubyte1; // read buffer 71 immutable ubyte[1] buf = f.rawRead(ubyte1); 72 return buf[0]; 73 } 74 75 @property ushort read_ushort(File f) { 76 ubyte[2] ubyte2; // read buffer 77 immutable ubyte[2] buf = f.rawRead(ubyte2); 78 return littleEndianToNative!ushort(buf); 79 } 80 @property uint read_uint(File f) { 81 ubyte[4] ubyte4; // read buffer 82 immutable ubyte[4] buf = f.rawRead(ubyte4); 83 return littleEndianToNative!uint(buf); 84 } 85 86 /** 87 Uncompress a zlib buffer (without header) 88 */ 89 ubyte[] deflate(ubyte[] uncompressed_buf, const ubyte[] compressed_buf, size_t uncompressed_size, CRC32 crc32) { 90 assert(uncompressed_buf.length == BLOCK_SIZE); 91 bio.core.utils.zlib.z_stream zs; 92 zs.next_in = cast(typeof(zs.next_in))compressed_buf; 93 zs.avail_in = to!uint(compressed_buf.length); 94 95 auto err = inflateInit2(&zs, /* winbits = */-15); 96 if (err != Z_OK) throw new ZlibException(err); 97 98 zs.next_out = cast(typeof(zs.next_out))uncompressed_buf.ptr; 99 zs.avail_out = cast(int)uncompressed_buf.length; 100 101 scope(exit) { inflateEnd(&zs); } 102 err = inflate(&zs, Z_FINISH); 103 if (err != Z_STREAM_END) throw new ZlibException(err); 104 105 assert(zs.total_out == uncompressed_size); 106 uncompressed_buf.length = uncompressed_size; 107 assert(crc32 == calc_crc32(0, uncompressed_buf[])); 108 109 return uncompressed_buf; 110 } 111 112 /** 113 BgzfReader is designed to run on a single thread. All it does is 114 fetch block headers and data, so the thread should easily keep up 115 with IO. All data processing is happening lazily in other threads. 116 */ 117 struct BgzfReader { 118 File f; 119 FilePos report_fpos; // for error handler - assumes one thread! 120 121 this(string fn) { 122 enforce(fn.isFile); 123 f = File(fn,"r"); 124 } 125 126 @disable this(this); // BgzfReader does not have copy semantics; 127 128 void throwBgzfException(string msg, string file = __FILE__, size_t line = __LINE__) { 129 throw new BgzfException("Error reading BGZF block starting in "~f.name ~" @ " ~ 130 to!string(report_fpos) ~ " (" ~ file ~ ":" ~ to!string(line) ~ "): " ~ msg); 131 } 132 133 void enforce1(bool check, lazy string msg, string file = __FILE__, int line = __LINE__) { 134 if (!check) 135 throwBgzfException(msg,file,line); 136 } 137 138 /** 139 Reads the block header and returns the contained compressed data 140 size with the file pointer positioned at the associated 141 compressed data. 142 */ 143 size_t read_block_header() { 144 ubyte[4] ubyte4; 145 auto magic = f.rawRead(ubyte4); 146 enforce1(magic.length == 4, "Premature end of file"); 147 enforce1(magic[0..4] == BGZF_MAGIC,"Invalid file format: expected bgzf magic number"); 148 ubyte[uint.sizeof + 2 * ubyte.sizeof] skip; 149 f.rawRead(skip); // skip gzip info 150 ushort gzip_extra_length = f.read_ushort(); 151 immutable fpos1 = f.tell; 152 size_t bsize = 0; 153 while (f.tell < fpos1 + gzip_extra_length) { 154 immutable subfield_id1 = f.read_ubyte(); 155 immutable subfield_id2 = f.read_ubyte(); 156 immutable subfield_len = f.read_ushort(); 157 if (subfield_id1 == BAM_SI1 && subfield_id2 == BAM_SI2) { 158 // BC identifier 159 enforce(gzip_extra_length == 6); 160 // FIXME: always picks first BC block 161 bsize = 1+f.read_ushort(); // BLOCK size 162 enforce1(subfield_len == 2, "BC subfield len should be 2"); 163 break; 164 } 165 else { 166 f.seek(subfield_len,SEEK_CUR); 167 } 168 enforce1(bsize!=0,"block size not found"); 169 f.seek(fpos1+gzip_extra_length); // skip any extra subfields - note we don't check for second BC 170 } 171 immutable compressed_size = bsize - 1 - gzip_extra_length - 19; 172 enforce1(compressed_size <= BLOCK_SIZE, "compressed size larger than allowed"); 173 174 // stderr.writeln("[compressed] size ", compressed_size, " bytes starting block @ ", report_fpos); 175 return compressed_size; 176 } 177 178 /** 179 Fetch the compressed data part of the block and return it with 180 the uncompressed size and CRC32. The file pointer is assumed to 181 be at the start of the compressed data and will be at the end of 182 that section after. 183 */ 184 Tuple!(ubyte[],immutable(uint),CRC32) read_compressed_data(ubyte[] buffer) { 185 auto compressed_buf = f.rawRead(buffer); 186 187 immutable CRC32 crc32 = f.read_uint(); 188 immutable uncompressed_size = f.read_uint(); 189 // stderr.writeln("[uncompressed] size ",uncompressed_size); 190 return tuple(compressed_buf,uncompressed_size,crc32); 191 } 192 193 /** 194 * Returns new tuple of the new file position, the compressed buffer and 195 * the CRC32 o the uncompressed data. file pos is NULL when done 196 */ 197 Tuple!(FilePos,ubyte[],size_t,CRC32) read_compressed_block(FilePos fpos, ubyte[] buffer) { 198 immutable start_offset = fpos; 199 try { 200 if (fpos.isNull) throwBgzfException("Trying to read past eof"); 201 report_fpos = fpos; 202 f.seek(fpos); 203 immutable compressed_size = read_block_header(); 204 auto ret = read_compressed_data(buffer[0..compressed_size]); 205 auto compressed_buf = ret[0]; 206 immutable uncompressed_size = ret[1]; 207 immutable crc32 = ret[2]; 208 209 if (uncompressed_size == 0) { 210 // check for eof marker, rereading block header 211 auto lastpos = f.tell(); 212 f.seek(start_offset); 213 ubyte[BGZF_EOF.length] buf; 214 f.rawRead(buf); 215 f.seek(lastpos); 216 if (buf == BGZF_EOF) 217 return tuple(FilePos(),compressed_buf,cast(size_t)0,crc32); // sets fpos to null 218 } 219 return tuple(FilePos(f.tell()),compressed_buf,cast(size_t)uncompressed_size,crc32); 220 } catch (Exception e) { throwBgzfException(e.msg,e.file,e.line); } 221 assert(0); // never reached 222 } 223 } 224 225 /** 226 Simple block iterator 227 */ 228 struct BgzfBlocks { 229 BgzfReader bgzf; 230 231 this(string fn) { 232 bgzf = BgzfReader(fn); 233 } 234 235 @disable this(this); // disable copy semantics; 236 237 int opApply(scope int delegate(ubyte[]) dg) { 238 FilePos fpos = 0; 239 240 try { 241 while (!fpos.isNull) { 242 BlockBuffer stack_buffer; 243 auto res = bgzf.read_compressed_block(fpos,stack_buffer); 244 fpos = res[0]; // point fpos to next block 245 if (fpos.isNull) break; 246 247 auto compressed_buf = res[1]; // same as stack_buffer 248 auto uncompressed_size = res[2]; 249 auto crc32 = res[3]; 250 BlockBuffer uncompressed_buf; 251 // call delegated function with new block 252 dg(deflate(uncompressed_buf,compressed_buf,uncompressed_size,crc32)); 253 } 254 } catch (Exception e) { bgzf.throwBgzfException(e.msg,e.file,e.line); } 255 return 0; 256 } 257 } 258 259 260 Tuple!(size_t,FilePos) read_blockx(ref BgzfReader bgzf, FilePos fpos, ref ubyte[] uncompressed_buf) { 261 BlockBuffer compressed_buf; 262 auto res = bgzf.read_compressed_block(fpos,compressed_buf); 263 fpos = res[0]; // point fpos to next block 264 if (fpos.isNull) return tuple(cast(size_t)0,fpos); 265 auto data = res[1]; 266 267 assert(data.ptr == compressed_buf.ptr); 268 size_t uncompressed_size = res[2]; 269 auto crc32 = res[3]; 270 deflate(uncompressed_buf,compressed_buf,uncompressed_size,crc32); 271 return tuple(uncompressed_size,fpos); 272 } 273 274 import std.parallelism; 275 276 int kick_off_reading_block_ahead(ubyte[] uncompressed_buf, ubyte[] compressed_buf, size_t uncompressed_size, CRC32 crc32) { 277 // writeln("HEY " ~ to!string(uncompressed_size)); 278 deflate(uncompressed_buf,compressed_buf,uncompressed_size,crc32); 279 return -1; 280 } 281 282 /** 283 */ 284 struct BlockReadAhead { 285 bool task_running = false, we_have_a_task = false; 286 Task!(kick_off_reading_block_ahead, ubyte[], ubyte[], size_t, CRC32)* t; 287 FilePos fpos2; 288 size_t uncompressed_size2 = 0; 289 BlockBuffer compressed_buf2; 290 BlockBuffer uncompressed_buf2; 291 292 private void read_next_block() { 293 } 294 295 private void add_deflate_task() { 296 } 297 298 private void copy_deflated_buffer() { 299 } 300 301 void setup_block_reader() { 302 read_next_block(); 303 add_deflate_task(); 304 throw new Exception("NYI"); 305 } 306 307 Tuple!(size_t,FilePos) read_block(ref BgzfReader bgzf, FilePos fpos, ref ubyte[] uncompressed_buf) { 308 assert(we_have_a_task); 309 copy_deflated_buffer(); 310 read_next_block(); 311 add_deflate_task(); 312 // return 313 314 if (task_running) { 315 int res = t.yieldForce; 316 // writeln(res); 317 task_running = false; 318 memcpy(uncompressed_buf.ptr,compressed_buf2.ptr,uncompressed_size2); 319 return tuple(uncompressed_size2, fpos2); 320 } 321 else { 322 BlockBuffer compressed_buf; 323 auto res = bgzf.read_compressed_block(fpos,compressed_buf); 324 fpos = res[0]; // point fpos to next block 325 if (fpos.isNull) return tuple(cast(size_t)0,fpos); 326 auto data = res[1]; 327 assert(data.ptr == compressed_buf.ptr); 328 size_t uncompressed_size = res[2]; 329 auto crc32 = res[3]; 330 331 deflate(uncompressed_buf,compressed_buf,uncompressed_size,crc32); 332 333 // now set up a new buffer 334 auto res2 = bgzf.read_compressed_block(fpos,compressed_buf2); 335 fpos2 = res[0]; // point fpos to next block 336 if (!fpos2.isNull) { 337 auto data2 = res2[1]; 338 uncompressed_size2 = res2[2]; 339 t = task!kick_off_reading_block_ahead(cast(ubyte[])uncompressed_buf2,cast(ubyte[])compressed_buf2,uncompressed_size2,res2[3]); 340 t.executeInNewThread(); 341 task_running = true; 342 } 343 return tuple(uncompressed_size,fpos); 344 } 345 } 346 } 347 348 /** 349 */ 350 struct BlockReadUnbuffered { 351 352 void setup_block_reader() { 353 } 354 355 Tuple!(ubyte[], size_t, FilePos) read_block(ref BgzfReader bgzf, in FilePos fpos, ubyte[] uncompressed_buf) { 356 BlockBuffer compressed_buf; 357 auto res = bgzf.read_compressed_block(fpos,compressed_buf); 358 auto fpos2 = res[0]; // point fpos to next block 359 if (fpos.isNull) return tuple(uncompressed_buf,cast(size_t)0,fpos2); 360 auto data = res[1]; 361 assert(data.ptr == compressed_buf.ptr); 362 size_t uncompressed_size = res[2]; 363 auto crc32 = res[3]; 364 365 auto buf = deflate(uncompressed_buf,compressed_buf,uncompressed_size,crc32); 366 assert(buf.ptr == uncompressed_buf.ptr); 367 return tuple(uncompressed_buf,uncompressed_size,fpos2); 368 } 369 } 370 371 /** 372 Streams bgzf data and fetch items by unit or buffer. These can go beyond 373 the size of individual blocks(!) 374 */ 375 376 struct BgzfStream { 377 BgzfReader bgzf; 378 FilePos fpos; // track file position 379 ubyte[] uncompressed_buf; // current data buffer 380 size_t uncompressed_size; // current data buffer size 381 Nullable!int block_pos; // position in block 382 BlockReadUnbuffered blockread; 383 384 this(string fn) { 385 bgzf = BgzfReader(fn); 386 uncompressed_buf = new ubyte[BLOCK_SIZE]; 387 fpos = 0; 388 } 389 390 @disable this(this); // disable copy semantics; 391 392 @property bool eof() { 393 return fpos.isNull; 394 } 395 396 /** 397 Fetch data into buffer. The size of the buffer can be larger than 398 one or more multiple blocks 399 */ 400 ubyte[] fetch(ubyte[] buffer) { 401 if (block_pos.isNull) { 402 blockread.setup_block_reader(); 403 auto res = blockread.read_block(bgzf,fpos,uncompressed_buf); // read first block 404 assert(res[0].ptr == uncompressed_buf.ptr); 405 uncompressed_size = res[1]; 406 fpos = res[2]; 407 block_pos = 0; 408 } 409 410 immutable buffer_length = buffer.length; 411 size_t buffer_pos = 0; 412 size_t remaining = buffer_length; 413 414 while (remaining > 0) { 415 if (block_pos + remaining < uncompressed_size) { 416 // full copy 417 assert(buffer_pos + remaining == buffer_length); 418 memcpy(buffer[buffer_pos..buffer_pos+remaining].ptr,uncompressed_buf[block_pos..block_pos+remaining].ptr,remaining); 419 block_pos += remaining; 420 remaining = 0; 421 } 422 else { 423 // read tail of buffer 424 immutable tail = uncompressed_size - block_pos; 425 memcpy(buffer[buffer_pos..buffer_pos+tail].ptr,uncompressed_buf[block_pos..uncompressed_size].ptr,tail); 426 buffer_pos += tail; 427 remaining -= tail; 428 auto res = blockread.read_block(bgzf,fpos,uncompressed_buf); 429 assert(res[0].ptr == uncompressed_buf.ptr); 430 uncompressed_size = res[1]; 431 fpos = res[2]; 432 block_pos = 0; 433 } 434 } 435 return buffer; 436 } 437 438 int read(T)() { // for integers 439 ubyte[T.sizeof] buf; 440 auto b = fetch(buf); 441 return b.read!(T,Endian.littleEndian)(); 442 } 443 444 string read(T)(size_t len) { 445 ubyte[] buf = new ubyte[len]; // heap allocation 446 fetch(buf); 447 return cast(T)buf; 448 } 449 450 T[] read(T)(T[] buffer) { return cast(T[])fetch(cast(ubyte[])buffer); }; 451 }