1 /*
2     This file is part of Sambamba.
3     Copyright (C) 2017 Pjotr Prins <pjotr.prins@thebird.nl>
4 
5     Sambamba is free software; you can redistribute it and/or modify
6     it under the terms of the GNU General Public License as published
7     by the Free Software Foundation; either version 2 of the License,
8     or (at your option) any later version.
9 
10     Sambamba is distributed in the hope that it will be useful, but
11     WITHOUT ANY WARRANTY; without even the implied warranty of
12     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13     General Public License for more details.
14 
15     You should have received a copy of the GNU General Public License
16     along with this program; if not, write to the Free Software
17     Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
18     02111-1307 USA
19 
20 */
21 
22 module bio2.bgzf;
23 
24 /**
25 
26    This is a new version of sambamba bgzf (under development).
27 
28    The new version is a prototype for new sambamba architecture using
29    canonical D language features, including immutable and improved
30    laziness and a more functional programming style. It should provide
31    improved performance and minimize RAM use, as well as better
32    composability.
33 
34    Authors: Pjotr Prins
35 
36  */
37 
38 import core.stdc.string : memcpy;
39 
40 import std.bitmanip;
41 import std.conv;
42 import std.exception;
43 import std.file;
44 import std.stdio;
45 import std.typecons;
46 import std.zlib : calc_crc32 = crc32, ZlibException;
47 
48 import bio.bam.constants;
49 import bio.core.bgzf.block;
50 import bio.core.bgzf.constants;
51 import bio.core.utils.zlib : inflateInit2, inflate, inflateEnd, Z_OK, Z_FINISH, Z_STREAM_END;
52 
53 import bio2.constants;
54 
55 class BgzfException : Exception {
56     this(string msg) { super(msg); }
57 }
58 
59 alias Nullable!ulong FilePos;
60 alias immutable(uint) CRC32;
61 
62 alias BGZF_MAX_BLOCK_SIZE BLOCK_SIZE;
63 
64 alias ubyte[BLOCK_SIZE] BlockBuffer;
65 
66 @property  ubyte read_ubyte(File f) {
67     ubyte[1] ubyte1; // read buffer
68     immutable ubyte[1] buf = f.rawRead(ubyte1);
69     return buf[0];
70 }
71 
72 @property ushort read_ushort(File f) {
73     ubyte[2] ubyte2; // read buffer
74     immutable ubyte[2] buf = f.rawRead(ubyte2);
75     return littleEndianToNative!ushort(buf);
76   }
77 @property uint read_uint(File f) {
78     ubyte[4] ubyte4; // read buffer
79     immutable ubyte[4] buf = f.rawRead(ubyte4);
80     return littleEndianToNative!uint(buf);
81   }
82 
83 /**
84    Uncompress a zlib buffer (without header)
85 */
86 ubyte[] deflate(ubyte[] uncompressed_buf, const ubyte[] compressed_buf, size_t uncompressed_size, CRC32 crc32) {
87   assert(uncompressed_buf.length == BLOCK_SIZE);
88   bio.core.utils.zlib.z_stream zs;
89   zs.next_in = cast(typeof(zs.next_in))compressed_buf;
90   zs.avail_in = to!uint(compressed_buf.length);
91 
92   auto err = inflateInit2(&zs, /* winbits = */-15);
93   if (err != Z_OK) throw new ZlibException(err);
94 
95   zs.next_out = cast(typeof(zs.next_out))uncompressed_buf.ptr;
96   zs.avail_out = cast(int)uncompressed_buf.length;
97 
98   scope(exit) { inflateEnd(&zs); }
99   err = inflate(&zs, Z_FINISH);
100   if (err != Z_STREAM_END) throw new ZlibException(err);
101 
102   assert(zs.total_out == uncompressed_size);
103   uncompressed_buf.length = uncompressed_size;
104   assert(crc32 == calc_crc32(0, uncompressed_buf[]));
105 
106   return uncompressed_buf;
107 }
108 
109 /**
110     BgzfReader is designed to run on a single thread. All it does is
111     fetch block headers and data, so the thread should easily keep up
112     with IO. All data processing is happening lazily in other threads.
113 */
114 struct BgzfReader {
115   File f;
116   FilePos report_fpos; // for error handler - assumes one thread!
117 
118   this(string fn) {
119     enforce(fn.isFile);
120     f = File(fn,"r");
121   }
122 
123   @disable this(this); // BgzfReader does not have copy semantics;
124 
125   void throwBgzfException(string msg, string file = __FILE__, size_t line = __LINE__) {
126     throw new BgzfException("Error reading BGZF block starting in "~f.name ~" @ " ~
127                             to!string(report_fpos) ~ " (" ~ file ~ ":" ~ to!string(line) ~ "): " ~ msg);
128   }
129 
130   void enforce1(bool check, lazy string msg, string file = __FILE__, int line = __LINE__) {
131     if (!check)
132       throwBgzfException(msg,file,line);
133   }
134 
135   /**
136       Reads the block header and returns the contained compressed data
137       size with the file pointer positioned at the associated
138       compressed data.
139   */
140   size_t read_block_header() {
141     ubyte[4] ubyte4;
142     auto magic = f.rawRead(ubyte4);
143     enforce1(magic.length == 4, "Premature end of file");
144     enforce1(magic[0..4] == BGZF_MAGIC,"Invalid file format: expected bgzf magic number");
145     ubyte[uint.sizeof + 2 * ubyte.sizeof] skip;
146     f.rawRead(skip); // skip gzip info
147     ushort gzip_extra_length = f.read_ushort();
148     immutable fpos1 = f.tell;
149     size_t bsize = 0;
150     while (f.tell < fpos1 + gzip_extra_length) {
151       immutable subfield_id1 = f.read_ubyte();
152       immutable subfield_id2 = f.read_ubyte();
153       immutable subfield_len = f.read_ushort();
154       if (subfield_id1 == BAM_SI1 && subfield_id2 == BAM_SI2) {
155         // BC identifier
156         enforce(gzip_extra_length == 6);
157         // FIXME: always picks first BC block
158         bsize = 1+f.read_ushort(); // BLOCK size
159         enforce1(subfield_len == 2, "BC subfield len should be 2");
160         break;
161       }
162       else {
163         f.seek(subfield_len,SEEK_CUR);
164       }
165       enforce1(bsize!=0,"block size not found");
166       f.seek(fpos1+gzip_extra_length); // skip any extra subfields - note we don't check for second BC
167     }
168     immutable compressed_size = bsize - 1 - gzip_extra_length - 19;
169     enforce1(compressed_size <= BLOCK_SIZE, "compressed size larger than allowed");
170 
171     // stderr.writeln("[compressed] size ", compressed_size, " bytes starting block @ ", report_fpos);
172     return compressed_size;
173   }
174 
175   /**
176      Fetch the compressed data part of the block and return it with
177      the uncompressed size and CRC32. The file pointer is assumed to
178      be at the start of the compressed data and will be at the end of
179      that section after.
180   */
181   Tuple!(ubyte[],immutable(uint),CRC32) read_compressed_data(ubyte[] buffer) {
182     auto compressed_buf = f.rawRead(buffer);
183 
184     immutable CRC32 crc32 = f.read_uint();
185     immutable uncompressed_size = f.read_uint();
186     // stderr.writeln("[uncompressed] size ",uncompressed_size);
187     return tuple(compressed_buf,uncompressed_size,crc32);
188   }
189 
190   /**
191    * Returns new tuple of the new file position, the compressed buffer and
192    * the CRC32 o the uncompressed data. file pos is NULL when done
193    */
194   Tuple!(FilePos,ubyte[],size_t,CRC32) read_compressed_block(FilePos fpos, ubyte[] buffer) {
195     immutable start_offset = fpos;
196     try {
197       if (fpos.isNull) throwBgzfException("Trying to read past eof");
198       report_fpos = fpos;
199       f.seek(fpos);
200       immutable compressed_size = read_block_header();
201       auto ret = read_compressed_data(buffer[0..compressed_size]);
202       auto compressed_buf = ret[0];
203       immutable uncompressed_size = ret[1];
204       immutable crc32 = ret[2];
205 
206       if (uncompressed_size == 0) {
207         // check for eof marker, rereading block header
208         auto lastpos = f.tell();
209         f.seek(start_offset);
210         ubyte[BGZF_EOF.length] buf;
211         f.rawRead(buf);
212         f.seek(lastpos);
213         if (buf == BGZF_EOF)
214           return tuple(FilePos(),compressed_buf,cast(ulong)0,crc32); // sets fpos to null
215       }
216       return tuple(FilePos(f.tell()),compressed_buf,cast(size_t)uncompressed_size,crc32);
217     } catch (Exception e) { throwBgzfException(e.msg,e.file,e.line); }
218     assert(0); // never reached
219   }
220 }
221 
222 /**
223    Simple block iterator
224 */
225 struct BgzfBlocks {
226   BgzfReader bgzf;
227 
228   this(string fn) {
229     bgzf = BgzfReader(fn);
230   }
231 
232   @disable this(this); // disable copy semantics;
233 
234   int opApply(scope int delegate(ubyte[]) dg) {
235     FilePos fpos = 0;
236 
237     try {
238       while (!fpos.isNull) {
239         BlockBuffer stack_buffer;
240         auto res = bgzf.read_compressed_block(fpos,stack_buffer);
241         fpos = res[0]; // point fpos to next block
242         if (fpos.isNull) break;
243 
244         auto compressed_buf = res[1]; // same as stack_buffer
245         auto uncompressed_size = res[2];
246         auto crc32 = res[3];
247         BlockBuffer uncompressed_buf;
248         // call delegated function with new block
249         dg(deflate(uncompressed_buf,compressed_buf,uncompressed_size,crc32));
250       }
251     } catch (Exception e) { bgzf.throwBgzfException(e.msg,e.file,e.line); }
252     return 0;
253   }
254 }
255 
256 
257 Tuple!(size_t,FilePos) read_blockx(ref BgzfReader bgzf, FilePos fpos, ref ubyte[] uncompressed_buf) {
258   BlockBuffer compressed_buf;
259   auto res = bgzf.read_compressed_block(fpos,compressed_buf);
260   fpos = res[0]; // point fpos to next block
261   if (fpos.isNull) return tuple(cast(size_t)0,fpos);
262   auto data = res[1];
263 
264   assert(data.ptr == compressed_buf.ptr);
265   size_t uncompressed_size = res[2];
266   auto crc32 = res[3];
267   deflate(uncompressed_buf,compressed_buf,uncompressed_size,crc32);
268   return tuple(uncompressed_size,fpos);
269 }
270 
271 import std.parallelism;
272 
273 int kick_off_reading_block_ahead(ubyte[] uncompressed_buf, ubyte[] compressed_buf, size_t uncompressed_size, CRC32 crc32) {
274   // writeln("HEY " ~ to!string(uncompressed_size));
275   deflate(uncompressed_buf,compressed_buf,uncompressed_size,crc32);
276   return -1;
277 }
278 
279 /**
280 */
281 struct BlockReadAhead {
282   bool task_running = false, we_have_a_task = false;
283   Task!(kick_off_reading_block_ahead, ubyte[], ubyte[], size_t, CRC32)* t;
284   FilePos fpos2;
285   size_t uncompressed_size2 = 0;
286   BlockBuffer compressed_buf2;
287   BlockBuffer uncompressed_buf2;
288 
289   private void read_next_block() {
290   }
291 
292   private void add_deflate_task() {
293   }
294 
295   private void copy_deflated_buffer() {
296   }
297 
298   void setup_block_reader() {
299     read_next_block();
300     add_deflate_task();
301     throw new Exception("NYI");
302   }
303 
304   Tuple!(size_t,FilePos) read_block(ref BgzfReader bgzf, FilePos fpos, ref ubyte[] uncompressed_buf) {
305     assert(we_have_a_task);
306     copy_deflated_buffer();
307     read_next_block();
308     add_deflate_task();
309     // return
310 
311     if (task_running) {
312       int res = t.yieldForce;
313       // writeln(res);
314       task_running = false;
315       memcpy(uncompressed_buf.ptr,compressed_buf2.ptr,uncompressed_size2);
316       return tuple(uncompressed_size2, fpos2);
317     }
318     else {
319       BlockBuffer compressed_buf;
320       auto res = bgzf.read_compressed_block(fpos,compressed_buf);
321       fpos = res[0]; // point fpos to next block
322       if (fpos.isNull) return tuple(cast(size_t)0,fpos);
323       auto data = res[1];
324       assert(data.ptr == compressed_buf.ptr);
325       size_t uncompressed_size = res[2];
326       auto crc32 = res[3];
327 
328       deflate(uncompressed_buf,compressed_buf,uncompressed_size,crc32);
329 
330       // now set up a new buffer
331       auto res2 = bgzf.read_compressed_block(fpos,compressed_buf2);
332       fpos2 = res[0]; // point fpos to next block
333       if (!fpos2.isNull) {
334         auto data2 = res2[1];
335         uncompressed_size2 = res2[2];
336         t = task!kick_off_reading_block_ahead(cast(ubyte[])uncompressed_buf2,cast(ubyte[])compressed_buf2,uncompressed_size2,res2[3]);
337         t.executeInNewThread();
338         task_running = true;
339       }
340       return tuple(uncompressed_size,fpos);
341     }
342   }
343 }
344 
345 /**
346 */
347 struct BlockReadUnbuffered {
348 
349   void setup_block_reader() {
350   }
351 
352   Tuple!(ubyte[], size_t, FilePos) read_block(ref BgzfReader bgzf, in FilePos fpos, ubyte[] uncompressed_buf) {
353     BlockBuffer compressed_buf;
354     auto res = bgzf.read_compressed_block(fpos,compressed_buf);
355     auto fpos2 = res[0]; // point fpos to next block
356     if (fpos.isNull) return tuple(uncompressed_buf,cast(size_t)0,fpos2);
357     auto data = res[1];
358     assert(data.ptr == compressed_buf.ptr);
359     size_t uncompressed_size = res[2];
360     auto crc32 = res[3];
361 
362     auto buf = deflate(uncompressed_buf,compressed_buf,uncompressed_size,crc32);
363     assert(buf.ptr == uncompressed_buf.ptr);
364     return tuple(uncompressed_buf,uncompressed_size,fpos2);
365   }
366 }
367 
368 /**
369    Streams bgzf data and fetch items by unit or buffer. These can go beyond
370    the size of individual blocks(!)
371 */
372 
373 struct BgzfStream {
374   BgzfReader bgzf;
375   FilePos fpos;             // track file position
376   ubyte[] uncompressed_buf; // current data buffer
377   size_t uncompressed_size; // current data buffer size
378   Nullable!int block_pos;   // position in block
379   BlockReadUnbuffered blockread;
380 
381   this(string fn) {
382     bgzf = BgzfReader(fn);
383     uncompressed_buf = new ubyte[BLOCK_SIZE];
384     fpos = 0;
385   }
386 
387   @disable this(this); // disable copy semantics;
388 
389   bool eof() {
390     return fpos.isNull;
391   }
392 
393   /**
394      Fetch data into buffer. The size of the buffer can be larger than
395      one or more multiple blocks
396   */
397   ubyte[] fetch(ubyte[] buffer) {
398     if (block_pos.isNull) {
399       blockread.setup_block_reader();
400       auto res = blockread.read_block(bgzf,fpos,uncompressed_buf); // read first block
401       assert(res[0].ptr == uncompressed_buf.ptr);
402       uncompressed_size = res[1];
403       fpos = res[2];
404       block_pos = 0;
405     }
406 
407     immutable buffer_length = buffer.length;
408     size_t buffer_pos = 0;
409     size_t remaining = buffer_length;
410 
411     while (remaining > 0) {
412       if (block_pos + remaining < uncompressed_size) {
413         // full copy
414         assert(buffer_pos + remaining == buffer_length);
415         memcpy(buffer[buffer_pos..buffer_pos+remaining].ptr,uncompressed_buf[block_pos..block_pos+remaining].ptr,remaining);
416         block_pos += remaining;
417         remaining = 0;
418       }
419       else {
420         // read tail of buffer
421         immutable tail = uncompressed_size - block_pos;
422         memcpy(buffer[buffer_pos..buffer_pos+tail].ptr,uncompressed_buf[block_pos..uncompressed_size].ptr,tail);
423         buffer_pos += tail;
424         remaining -= tail;
425         auto res = blockread.read_block(bgzf,fpos,uncompressed_buf);
426         assert(res[0].ptr == uncompressed_buf.ptr);
427         uncompressed_size = res[1];
428         fpos = res[2];
429         block_pos = 0;
430       }
431     }
432     return buffer;
433   }
434 
435   int read(T)() { // for integers
436     ubyte[T.sizeof] buf;
437     auto b = fetch(buf);
438     return b.read!(T,Endian.littleEndian)();
439   }
440 
441   string read(T)(size_t len) {
442     ubyte[] buf = new ubyte[len]; // heap allocation
443     fetch(buf);
444     return cast(T)buf;
445   }
446 
447   T[] read(T)(T[] buffer) { return cast(T[])fetch(cast(ubyte[])buffer); };
448 
449 
450 }