1 /*
2     New style BGZF writer. This file is part of Sambamba.
3     Copyright (C) 2017 Pjotr Prins <pjotr.prins@thebird.nl>
4 
5     Sambamba is free software; you can redistribute it and/or modify
6     it under the terms of the GNU General Public License as published
7     by the Free Software Foundation; either version 2 of the License,
8     or (at your option) any later version.
9 
10     Sambamba is distributed in the hope that it will be useful, but
11     WITHOUT ANY WARRANTY; without even the implied warranty of
12     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13     General Public License for more details.
14 
15     You should have received a copy of the GNU General Public License
16     along with this program; if not, write to the Free Software
17     Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
18     02111-1307 USA
19 
20 */
21 
22 // Based on the original by Artem Tarasov.
23 
24 module bio.std.experimental.hts.bgzf_writer;
25 
26 // import core.stdc.stdlib : malloc, free;
27 import core.memory: pureMalloc, pureFree;
28 import core.stdc.stdio: fopen, fread, fclose;
29 import std.bitmanip;
30 import std.conv;
31 import std.exception;
32 import std.typecons;
33 import std.parallelism;
34 import std.array;
35 import std.algorithm : max;
36 import std.stdio;
37 import std.typecons;
38 
39 // depends on old 
40 import bio.core.bgzf.compress;
41 import bio.core.utils.roundbuf;
42 
43 // import undead.stream;
44 
45 import bio.std.hts.bam.constants: BGZF_MAX_BLOCK_SIZE, BGZF_BLOCK_SIZE, BGZF_EOF;
46 import bio.std.experimental.hts.bgzf;
47 import bio.std.experimental.hts.constants;
48 
49 alias void delegate(ubyte[], ubyte[]) BlockWriteHandler;
50 
51 /// Convenience function for Taskpool handler
52 Tuple!(ubyte[], ubyte[], BlockWriteHandler) bgzfCompressFunc(ubyte[] input,
53                                                              int level,
54                                                              ubyte[] output_buffer,
55                                                              BlockWriteHandler handler)
56 {
57   auto output = bgzfCompress(input, level, output_buffer);
58   return tuple(input, output, handler);
59 }
60 
61 /// BGZF compression - this is a port of the original that used the
62 /// undead.stream library.
63 struct BgzfWriter {
64 
65 private:
66   File f;
67   TaskPool task_pool;
68 
69   ubyte[] buffer; // a slice into compression_buf (uncompressed data)
70   ubyte[] tmp;    // a slice into compression_buf (compressed data)
71   size_t current_size;
72   int compression_level;
73 
74   alias Task!(bgzfCompressFunc,
75               ubyte[], int, ubyte[], BlockWriteHandler) CompressionTask;
76   RoundBuf!(CompressionTask*) _compression_tasks;
77   ubyte[] compression_buf;
78 
79 public:
80 
81   /// Create new BGZF output stream with a multi-threaded writer
82   this(string fn, int _compression_level=-1) {
83     f = File(fn,"wb");
84     enforce1(-1 <= compression_level && compression_level <= 9,
85             "BGZF compression level must be a number in interval [-1, 9]");
86     size_t max_block_size = BGZF_MAX_BLOCK_SIZE;
87     size_t block_size     = BGZF_BLOCK_SIZE;
88     task_pool             = taskPool(),
89     compression_level     = _compression_level;
90 
91     // create a ring buffer that is large enough
92     size_t n_tasks = max(task_pool.size, 1) * 16;
93     _compression_tasks = RoundBuf!(CompressionTask*)(n_tasks);
94 
95     // create extra block to which we can write while n_tasks are
96     // executed
97     auto comp_buf_size = (2 * n_tasks + 2) * max_block_size;
98     auto p = cast(ubyte*)pureMalloc(comp_buf_size);
99     compression_buf = p[0 .. comp_buf_size];
100     buffer          = compression_buf[0 .. block_size];
101     tmp             = compression_buf[max_block_size .. max_block_size * 2];
102   }
103 
104   ~this() {
105     close();
106   }
107 
108   @disable this(this); // BgzfWriter does not have copy semantics;
109 
110   void throwBgzfException(string msg, string file = __FILE__, size_t line = __LINE__) {
111     throw new BgzfException("Error writing BGZF block starting in "~f.name ~
112                             " (" ~ file ~ ":" ~ to!string(line) ~ "): " ~ msg);
113   }
114 
115   void enforce1(bool check, lazy string msg, string file = __FILE__, int line = __LINE__) {
116     if (!check)
117       throwBgzfException(msg,file,line);
118   }
119 
120   void write(const void* buf, size_t size) {
121     // stderr.writeln("HEY1 writing bytes ",size);
122     if (size + current_size >= buffer.length) {
123       size_t room;
124       ubyte[] data = (cast(ubyte*)buf)[0 .. size];
125 
126       while (data.length + current_size >= buffer.length) {
127         room = buffer.length - current_size;
128         buffer[$ - room .. $] = data[0 .. room];
129         data = data[room .. $];
130 
131         current_size = buffer.length;
132 
133         flush_block();
134       }
135 
136       buffer[0 .. data.length] = data[];
137       current_size = data.length;
138     } else {
139       buffer[current_size .. current_size + size] = (cast(ubyte*)buf)[0 .. size];
140       current_size += size;
141     }
142     // return size;
143   }
144 
145   void write(ubyte[] buf) {
146     write(buf.ptr, buf.length);
147   }
148 
149   void write(string s) {
150     write(cast(ubyte[])s);
151   }
152 
153   void write(T)(T value) { // int values
154     // ubyte[T.sizeof] buf;
155     ubyte[] buf = [0,0,0,0,0,0,0,0,0,0];
156     assert(T.sizeof < buf.length);
157     buf.write!(T,Endian.littleEndian)(value,0);
158     // writeln("HEY T.sizeof: ",T.sizeof," value ",value," ",buf[0..T.sizeof]);
159     write(buf[0..T.sizeof]);
160   }
161 
162   /// Force flushing current block, even if it is not yet filled.
163   /// Should also be used when it's not desired to have records
164   /// crossing block borders.
165   void flush_block() {
166     if (current_size == 0)
167       return;
168 
169     Tuple!(ubyte[], ubyte[], BlockWriteHandler) front_result;
170     if (_compression_tasks.full) {
171       front_result = _compression_tasks.front.yieldForce();
172       _compression_tasks.popFront();
173     }
174 
175     auto compression_task = task!bgzfCompressFunc(buffer[0 .. current_size],
176                                                   compression_level, tmp,
177                                                   _before_write);
178     _compression_tasks.put(compression_task);
179     task_pool.put(compression_task);
180 
181     size_t offset = buffer.ptr - compression_buf.ptr;
182     immutable N = tmp.length;
183     offset += 2 * N;
184     if (offset == compression_buf.length)
185       offset = 0;
186     buffer = compression_buf[offset .. offset + buffer.length];
187     tmp = compression_buf[offset + N .. offset + 2 * N];
188     current_size = 0;
189 
190     if (front_result[0] !is null)
191       writeResult(front_result);
192 
193     while (!_compression_tasks.empty) {
194       auto task = _compression_tasks.front;
195       if (!task.done())
196         break;
197       auto result = task.yieldForce();
198       writeResult(result);
199       _compression_tasks.popFront();
200     }
201   }
202 
203   private void delegate(ubyte[], ubyte[]) _before_write;
204   void setWriteHandler(void delegate(ubyte[], ubyte[]) handler) {
205     _before_write = handler;
206   }
207 
208   private void writeResult(Tuple!(ubyte[], ubyte[], BlockWriteHandler) block) {
209     auto uncompressed = block[0];
210     auto compressed = block[1];
211     auto handler = block[2];
212     if (handler) {// write handler enabled
213       handler(uncompressed, compressed);
214     }
215     // _stream.writeExact(compressed.ptr, compressed.length);
216     f.rawWrite(compressed);
217   }
218 
219   /// Flush all remaining BGZF blocks and underlying stream.
220   void flush() {
221     flush_block();
222 
223     while (!_compression_tasks.empty) {
224       auto task = _compression_tasks.front;
225       auto block = task.yieldForce();
226       writeResult(block);
227       _compression_tasks.popFront();
228     }
229 
230     f.flush();
231     current_size = 0;
232   }
233 
234   /// Flush all remaining BGZF blocks and close source stream.
235   /// Automatically adds empty block at the end, serving as indicator
236   /// of end of stream. This function is automatically called on
237   /// destruction.
238   void close() {
239     flush();
240     f.rawWrite(BGZF_EOF);
241     f.close();
242     pureFree(compression_buf.ptr);
243   }
244 }