1 /*
2     This file is part of Sambamba.
3     Copyright (C) 2017 Pjotr Prins <pjotr.prins@thebird.nl>
4 
5     Sambamba is free software; you can redistribute it and/or modify
6     it under the terms of the GNU General Public License as published
7     by the Free Software Foundation; either version 2 of the License,
8     or (at your option) any later version.
9 
10     Sambamba is distributed in the hope that it will be useful, but
11     WITHOUT ANY WARRANTY; without even the implied warranty of
12     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13     General Public License for more details.
14 
15     You should have received a copy of the GNU General Public License
16     along with this program; if not, write to the Free Software
17     Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
18     02111-1307 USA
19 
20 */
21 
22 module bio.std.experimental.hts.bgzf;
23 
24 /**
25 
26    This is a new version of sambamba bgzf (under development). Bgzf is
27    a blocked version of the ubiquous gzip format. By making it blocked
28    it allows for seeking in gz files. Note that without seeking it
29    can take standard gz files too.
30 
31    The new version is a prototype for new sambamba architecture using
32    canonical D language features, including immutable and improved
33    laziness and a more functional programming style. It should provide
34    improved performance and minimize RAM use, as well as better
35    composability.
36 
37    Authors: Pjotr Prins
38 
39  */
40 
41 import core.stdc..string : memcpy;
42 
43 import std.bitmanip;
44 import std.conv;
45 import std.exception;
46 import std.file;
47 import std.stdio;
48 import std.typecons;
49 import std.zlib : calc_crc32 = crc32, ZlibException;
50 
51 import bio.std.hts.bam.constants;
52 import bio.core.bgzf.block;
53 import bio.core.bgzf.constants;
54 import bio.core.utils.zlib : inflateInit2, inflate, inflateEnd, Z_OK, Z_FINISH, Z_STREAM_END;
55 
56 import bio.std.experimental.hts.constants;
57 
58 class BgzfException : Exception {
59     this(string msg) { super(msg); }
60 }
61 
62 alias Nullable!ulong FilePos;
63 alias immutable(uint) CRC32;
64 
65 alias BGZF_MAX_BLOCK_SIZE BLOCK_SIZE;
66 
67 alias ubyte[BLOCK_SIZE] BlockBuffer;
68 
69 @property  ubyte read_ubyte(File f) {
70     ubyte[1] ubyte1; // read buffer
71     immutable ubyte[1] buf = f.rawRead(ubyte1);
72     return buf[0];
73 }
74 
75 @property ushort read_ushort(File f) {
76     ubyte[2] ubyte2; // read buffer
77     immutable ubyte[2] buf = f.rawRead(ubyte2);
78     return littleEndianToNative!ushort(buf);
79   }
80 @property uint read_uint(File f) {
81     ubyte[4] ubyte4; // read buffer
82     immutable ubyte[4] buf = f.rawRead(ubyte4);
83     return littleEndianToNative!uint(buf);
84   }
85 
86 /**
87    Uncompress a zlib buffer (without header)
88 */
89 ubyte[] deflate(ubyte[] uncompressed_buf, const ubyte[] compressed_buf, size_t uncompressed_size, CRC32 crc32) {
90   assert(uncompressed_buf.length == BLOCK_SIZE);
91   bio.core.utils.zlib.z_stream zs;
92   zs.next_in = cast(typeof(zs.next_in))compressed_buf;
93   zs.avail_in = to!uint(compressed_buf.length);
94 
95   auto err = inflateInit2(&zs, /* winbits = */-15);
96   if (err != Z_OK) throw new ZlibException(err);
97 
98   zs.next_out = cast(typeof(zs.next_out))uncompressed_buf.ptr;
99   zs.avail_out = cast(int)uncompressed_buf.length;
100 
101   scope(exit) { inflateEnd(&zs); }
102   err = inflate(&zs, Z_FINISH);
103   if (err != Z_STREAM_END) throw new ZlibException(err);
104 
105   assert(zs.total_out == uncompressed_size);
106   uncompressed_buf.length = uncompressed_size;
107   assert(crc32 == calc_crc32(0, uncompressed_buf[]));
108 
109   return uncompressed_buf;
110 }
111 
112 /**
113     BgzfReader is designed to run on a single thread. All it does is
114     fetch block headers and data, so the thread should easily keep up
115     with IO. All data processing is happening lazily in other threads.
116 */
117 struct BgzfReader {
118   File f;
119   FilePos report_fpos; // for error handler - assumes one thread!
120 
121   this(string fn) {
122     enforce(fn.isFile);
123     f = File(fn,"r");
124   }
125 
126   @disable this(this); // BgzfReader does not have copy semantics;
127 
128   void throwBgzfException(string msg, string file = __FILE__, size_t line = __LINE__) {
129     throw new BgzfException("Error reading BGZF block starting in "~f.name ~" @ " ~
130                             to!string(report_fpos) ~ " (" ~ file ~ ":" ~ to!string(line) ~ "): " ~ msg);
131   }
132 
133   void enforce1(bool check, lazy string msg, string file = __FILE__, int line = __LINE__) {
134     if (!check)
135       throwBgzfException(msg,file,line);
136   }
137 
138   /**
139       Reads the block header and returns the contained compressed data
140       size with the file pointer positioned at the associated
141       compressed data.
142   */
143   size_t read_block_header() {
144     ubyte[4] ubyte4;
145     auto magic = f.rawRead(ubyte4);
146     enforce1(magic.length == 4, "Premature end of file");
147     enforce1(magic[0..4] == BGZF_MAGIC,"Invalid file format: expected bgzf magic number");
148     ubyte[uint.sizeof + 2 * ubyte.sizeof] skip;
149     f.rawRead(skip); // skip gzip info
150     ushort gzip_extra_length = f.read_ushort();
151     immutable fpos1 = f.tell;
152     size_t bsize = 0;
153     while (f.tell < fpos1 + gzip_extra_length) {
154       immutable subfield_id1 = f.read_ubyte();
155       immutable subfield_id2 = f.read_ubyte();
156       immutable subfield_len = f.read_ushort();
157       if (subfield_id1 == BAM_SI1 && subfield_id2 == BAM_SI2) {
158         // BC identifier
159         enforce(gzip_extra_length == 6);
160         // FIXME: always picks first BC block
161         bsize = 1+f.read_ushort(); // BLOCK size
162         enforce1(subfield_len == 2, "BC subfield len should be 2");
163         break;
164       }
165       else {
166         f.seek(subfield_len,SEEK_CUR);
167       }
168       enforce1(bsize!=0,"block size not found");
169       f.seek(fpos1+gzip_extra_length); // skip any extra subfields - note we don't check for second BC
170     }
171     immutable compressed_size = bsize - 1 - gzip_extra_length - 19;
172     enforce1(compressed_size <= BLOCK_SIZE, "compressed size larger than allowed");
173 
174     // stderr.writeln("[compressed] size ", compressed_size, " bytes starting block @ ", report_fpos);
175     return compressed_size;
176   }
177 
178   /**
179      Fetch the compressed data part of the block and return it with
180      the uncompressed size and CRC32. The file pointer is assumed to
181      be at the start of the compressed data and will be at the end of
182      that section after.
183   */
184   Tuple!(ubyte[],immutable(uint),CRC32) read_compressed_data(ubyte[] buffer) {
185     auto compressed_buf = f.rawRead(buffer);
186 
187     immutable CRC32 crc32 = f.read_uint();
188     immutable uncompressed_size = f.read_uint();
189     // stderr.writeln("[uncompressed] size ",uncompressed_size);
190     return tuple(compressed_buf,uncompressed_size,crc32);
191   }
192 
193   /**
194    * Returns new tuple of the new file position, the compressed buffer and
195    * the CRC32 o the uncompressed data. file pos is NULL when done
196    */
197   Tuple!(FilePos,ubyte[],size_t,CRC32) read_compressed_block(FilePos fpos, ubyte[] buffer) {
198     immutable start_offset = fpos;
199     try {
200       if (fpos.isNull) throwBgzfException("Trying to read past eof");
201       report_fpos = fpos;
202       f.seek(fpos);
203       immutable compressed_size = read_block_header();
204       auto ret = read_compressed_data(buffer[0..compressed_size]);
205       auto compressed_buf = ret[0];
206       immutable uncompressed_size = ret[1];
207       immutable crc32 = ret[2];
208 
209       if (uncompressed_size == 0) {
210         // check for eof marker, rereading block header
211         auto lastpos = f.tell();
212         f.seek(start_offset);
213         ubyte[BGZF_EOF.length] buf;
214         f.rawRead(buf);
215         f.seek(lastpos);
216         if (buf == BGZF_EOF)
217           return tuple(FilePos(),compressed_buf,cast(size_t)0,crc32); // sets fpos to null
218       }
219       return tuple(FilePos(f.tell()),compressed_buf,cast(size_t)uncompressed_size,crc32);
220     } catch (Exception e) { throwBgzfException(e.msg,e.file,e.line); }
221     assert(0); // never reached
222   }
223 }
224 
225 /**
226    Simple block iterator
227 */
228 struct BgzfBlocks {
229   BgzfReader bgzf;
230 
231   this(string fn) {
232     bgzf = BgzfReader(fn);
233   }
234 
235   @disable this(this); // disable copy semantics;
236 
237   int opApply(scope int delegate(ubyte[]) dg) {
238     FilePos fpos = 0;
239 
240     try {
241       while (!fpos.isNull) {
242         BlockBuffer stack_buffer;
243         auto res = bgzf.read_compressed_block(fpos,stack_buffer);
244         fpos = res[0]; // point fpos to next block
245         if (fpos.isNull) break;
246 
247         auto compressed_buf = res[1]; // same as stack_buffer
248         auto uncompressed_size = res[2];
249         auto crc32 = res[3];
250         BlockBuffer uncompressed_buf;
251         // call delegated function with new block
252         dg(deflate(uncompressed_buf,compressed_buf,uncompressed_size,crc32));
253       }
254     } catch (Exception e) { bgzf.throwBgzfException(e.msg,e.file,e.line); }
255     return 0;
256   }
257 }
258 
259 
260 Tuple!(size_t,FilePos) read_blockx(ref BgzfReader bgzf, FilePos fpos, ref ubyte[] uncompressed_buf) {
261   BlockBuffer compressed_buf;
262   auto res = bgzf.read_compressed_block(fpos,compressed_buf);
263   fpos = res[0]; // point fpos to next block
264   if (fpos.isNull) return tuple(cast(size_t)0,fpos);
265   auto data = res[1];
266 
267   assert(data.ptr == compressed_buf.ptr);
268   size_t uncompressed_size = res[2];
269   auto crc32 = res[3];
270   deflate(uncompressed_buf,compressed_buf,uncompressed_size,crc32);
271   return tuple(uncompressed_size,fpos);
272 }
273 
274 import std.parallelism;
275 
276 int kick_off_reading_block_ahead(ubyte[] uncompressed_buf, ubyte[] compressed_buf, size_t uncompressed_size, CRC32 crc32) {
277   // writeln("HEY " ~ to!string(uncompressed_size));
278   deflate(uncompressed_buf,compressed_buf,uncompressed_size,crc32);
279   return -1;
280 }
281 
282 /**
283 */
284 struct BlockReadAhead {
285   bool task_running = false, we_have_a_task = false;
286   Task!(kick_off_reading_block_ahead, ubyte[], ubyte[], size_t, CRC32)* t;
287   FilePos fpos2;
288   size_t uncompressed_size2 = 0;
289   BlockBuffer compressed_buf2;
290   BlockBuffer uncompressed_buf2;
291 
292   private void read_next_block() {
293   }
294 
295   private void add_deflate_task() {
296   }
297 
298   private void copy_deflated_buffer() {
299   }
300 
301   void setup_block_reader() {
302     read_next_block();
303     add_deflate_task();
304     throw new Exception("NYI");
305   }
306 
307   Tuple!(size_t,FilePos) read_block(ref BgzfReader bgzf, FilePos fpos, ref ubyte[] uncompressed_buf) {
308     assert(we_have_a_task);
309     copy_deflated_buffer();
310     read_next_block();
311     add_deflate_task();
312     // return
313 
314     if (task_running) {
315       int res = t.yieldForce;
316       // writeln(res);
317       task_running = false;
318       memcpy(uncompressed_buf.ptr,compressed_buf2.ptr,uncompressed_size2);
319       return tuple(uncompressed_size2, fpos2);
320     }
321     else {
322       BlockBuffer compressed_buf;
323       auto res = bgzf.read_compressed_block(fpos,compressed_buf);
324       fpos = res[0]; // point fpos to next block
325       if (fpos.isNull) return tuple(cast(size_t)0,fpos);
326       auto data = res[1];
327       assert(data.ptr == compressed_buf.ptr);
328       size_t uncompressed_size = res[2];
329       auto crc32 = res[3];
330 
331       deflate(uncompressed_buf,compressed_buf,uncompressed_size,crc32);
332 
333       // now set up a new buffer
334       auto res2 = bgzf.read_compressed_block(fpos,compressed_buf2);
335       fpos2 = res[0]; // point fpos to next block
336       if (!fpos2.isNull) {
337         auto data2 = res2[1];
338         uncompressed_size2 = res2[2];
339         t = task!kick_off_reading_block_ahead(cast(ubyte[])uncompressed_buf2,cast(ubyte[])compressed_buf2,uncompressed_size2,res2[3]);
340         t.executeInNewThread();
341         task_running = true;
342       }
343       return tuple(uncompressed_size,fpos);
344     }
345   }
346 }
347 
348 /**
349 */
350 struct BlockReadUnbuffered {
351 
352   void setup_block_reader() {
353   }
354 
355   Tuple!(ubyte[], size_t, FilePos) read_block(ref BgzfReader bgzf, in FilePos fpos, ubyte[] uncompressed_buf) {
356     BlockBuffer compressed_buf;
357     auto res = bgzf.read_compressed_block(fpos,compressed_buf);
358     auto fpos2 = res[0]; // point fpos to next block
359     if (fpos.isNull) return tuple(uncompressed_buf,cast(size_t)0,fpos2);
360     auto data = res[1];
361     assert(data.ptr == compressed_buf.ptr);
362     size_t uncompressed_size = res[2];
363     auto crc32 = res[3];
364 
365     auto buf = deflate(uncompressed_buf,compressed_buf,uncompressed_size,crc32);
366     assert(buf.ptr == uncompressed_buf.ptr);
367     return tuple(uncompressed_buf,uncompressed_size,fpos2);
368   }
369 }
370 
371 /**
372    Streams bgzf data and fetch items by unit or buffer. These can go beyond
373    the size of individual blocks(!)
374 */
375 
376 struct BgzfStream {
377   BgzfReader bgzf;
378   FilePos fpos;             // track file position
379   ubyte[] uncompressed_buf; // current data buffer
380   size_t uncompressed_size; // current data buffer size
381   Nullable!int block_pos;   // position in block
382   BlockReadUnbuffered blockread;
383 
384   this(string fn) {
385     bgzf = BgzfReader(fn);
386     uncompressed_buf = new ubyte[BLOCK_SIZE];
387     fpos = 0;
388   }
389 
390   @disable this(this); // disable copy semantics;
391 
392   @property bool eof() {
393     return fpos.isNull;
394   }
395 
396   /**
397      Fetch data into buffer. The size of the buffer can be larger than
398      one or more multiple blocks
399   */
400   ubyte[] fetch(ubyte[] buffer) {
401     if (block_pos.isNull) {
402       blockread.setup_block_reader();
403       auto res = blockread.read_block(bgzf,fpos,uncompressed_buf); // read first block
404       assert(res[0].ptr == uncompressed_buf.ptr);
405       uncompressed_size = res[1];
406       fpos = res[2];
407       block_pos = 0;
408     }
409 
410     immutable buffer_length = buffer.length;
411     size_t buffer_pos = 0;
412     size_t remaining = buffer_length;
413 
414     while (remaining > 0) {
415       if (block_pos + remaining < uncompressed_size) {
416         // full copy
417         assert(buffer_pos + remaining == buffer_length);
418         memcpy(buffer[buffer_pos..buffer_pos+remaining].ptr,uncompressed_buf[block_pos..block_pos+remaining].ptr,remaining);
419         block_pos += remaining;
420         remaining = 0;
421       }
422       else {
423         // read tail of buffer
424         immutable tail = uncompressed_size - block_pos;
425         memcpy(buffer[buffer_pos..buffer_pos+tail].ptr,uncompressed_buf[block_pos..uncompressed_size].ptr,tail);
426         buffer_pos += tail;
427         remaining -= tail;
428         auto res = blockread.read_block(bgzf,fpos,uncompressed_buf);
429         assert(res[0].ptr == uncompressed_buf.ptr);
430         uncompressed_size = res[1];
431         fpos = res[2];
432         block_pos = 0;
433       }
434     }
435     return buffer;
436   }
437 
438   int read(T)() { // for integers
439     ubyte[T.sizeof] buf;
440     auto b = fetch(buf);
441     return b.read!(T,Endian.littleEndian)();
442   }
443 
444   string read(T)(size_t len) {
445     ubyte[] buf = new ubyte[len]; // heap allocation
446     fetch(buf);
447     return cast(T)buf;
448   }
449 
450   T[] read(T)(T[] buffer) { return cast(T[])fetch(cast(ubyte[])buffer); };
451 }