1 /*
2     New style BGZF writer. This file is part of Sambamba.
3     Copyright (C) 2017 Pjotr Prins <pjotr.prins@thebird.nl>
4 
5     Sambamba is free software; you can redistribute it and/or modify
6     it under the terms of the GNU General Public License as published
7     by the Free Software Foundation; either version 2 of the License,
8     or (at your option) any later version.
9 
10     Sambamba is distributed in the hope that it will be useful, but
11     WITHOUT ANY WARRANTY; without even the implied warranty of
12     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13     General Public License for more details.
14 
15     You should have received a copy of the GNU General Public License
16     along with this program; if not, write to the Free Software
17     Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
18     02111-1307 USA
19 
20 */
21 
22 // Based on the original by Artem Tarasov.
23 
24 module bio2.bgzf_writer;
25 
26 // import core.stdc.stdlib : malloc, free;
27 import core.memory: pureMalloc, pureFree;
28 import core.stdc.stdio: fopen, fread, fclose;
29 import std.bitmanip;
30 import std.conv;
31 import std.exception;
32 import std.typecons;
33 import std.parallelism;
34 import std.array;
35 import std.algorithm : max;
36 import std.stdio;
37 import std.typecons;
38 
39 import bio.core.bgzf.compress;
40 import bio.core.utils.roundbuf;
41 
42 // import undead.stream;
43 
44 import bio.bam.constants: BGZF_MAX_BLOCK_SIZE, BGZF_BLOCK_SIZE, BGZF_EOF;
45 import bio2.bgzf;
46 import bio2.constants;
47 
48 alias void delegate(ubyte[], ubyte[]) BlockWriteHandler;
49 
50 /// Convenience function for Taskpool handler
51 Tuple!(ubyte[], ubyte[], BlockWriteHandler) bgzfCompressFunc(ubyte[] input,
52                                                              int level,
53                                                              ubyte[] output_buffer,
54                                                              BlockWriteHandler handler)
55 {
56   auto output = bgzfCompress(input, level, output_buffer);
57   return tuple(input, output, handler);
58 }
59 
60 /// BGZF compression - this is a port of the original that used the
61 /// undead.stream library.
62 struct BgzfWriter {
63 
64 private:
65   File f;
66   TaskPool task_pool;
67 
68   ubyte[] buffer; // a slice into compression_buf (uncompressed data)
69   ubyte[] tmp;    // a slice into compression_buf (compressed data)
70   size_t current_size;
71   int compression_level;
72 
73   alias Task!(bgzfCompressFunc,
74               ubyte[], int, ubyte[], BlockWriteHandler) CompressionTask;
75   RoundBuf!(CompressionTask*) _compression_tasks;
76   ubyte[] compression_buf;
77 
78 public:
79 
80   /// Create new BGZF output stream with a multi-threaded writer
81   this(string fn, int _compression_level=-1) {
82     f = File(fn,"wb");
83     enforce1(-1 <= compression_level && compression_level <= 9,
84             "BGZF compression level must be a number in interval [-1, 9]");
85     size_t max_block_size = BGZF_MAX_BLOCK_SIZE;
86     size_t block_size     = BGZF_BLOCK_SIZE;
87     task_pool             = taskPool(),
88     compression_level     = _compression_level;
89 
90     // create a ring buffer that is large enough
91     size_t n_tasks = max(task_pool.size, 1) * 16;
92     _compression_tasks = RoundBuf!(CompressionTask*)(n_tasks);
93 
94     // create extra block to which we can write while n_tasks are
95     // executed
96     auto comp_buf_size = (2 * n_tasks + 2) * max_block_size;
97     auto p = cast(ubyte*)pureMalloc(comp_buf_size);
98     compression_buf = p[0 .. comp_buf_size];
99     buffer          = compression_buf[0 .. block_size];
100     tmp             = compression_buf[max_block_size .. max_block_size * 2];
101   }
102 
103   ~this() {
104     close();
105   }
106 
107   @disable this(this); // BgzfWriter does not have copy semantics;
108 
109   void throwBgzfException(string msg, string file = __FILE__, size_t line = __LINE__) {
110     throw new BgzfException("Error writing BGZF block starting in "~f.name ~
111                             " (" ~ file ~ ":" ~ to!string(line) ~ "): " ~ msg);
112   }
113 
114   void enforce1(bool check, lazy string msg, string file = __FILE__, int line = __LINE__) {
115     if (!check)
116       throwBgzfException(msg,file,line);
117   }
118 
119   void write(const void* buf, size_t size) {
120     // stderr.writeln("HEY1 writing bytes ",size);
121     if (size + current_size >= buffer.length) {
122       size_t room;
123       ubyte[] data = (cast(ubyte*)buf)[0 .. size];
124 
125       while (data.length + current_size >= buffer.length) {
126         room = buffer.length - current_size;
127         buffer[$ - room .. $] = data[0 .. room];
128         data = data[room .. $];
129 
130         current_size = buffer.length;
131 
132         flush_block();
133       }
134 
135       buffer[0 .. data.length] = data[];
136       current_size = data.length;
137     } else {
138       buffer[current_size .. current_size + size] = (cast(ubyte*)buf)[0 .. size];
139       current_size += size;
140     }
141     // return size;
142   }
143 
144   void write(ubyte[] buf) {
145     write(buf.ptr, buf.length);
146   }
147 
148   void write(string s) {
149     write(cast(ubyte[])s);
150   }
151 
152   void write(T)(T value) { // int values
153     // ubyte[T.sizeof] buf;
154     ubyte[] buf = [0,0,0,0,0,0,0,0,0,0];
155     assert(T.sizeof < buf.length);
156     buf.write!(T,Endian.littleEndian)(value,0);
157     // writeln("HEY T.sizeof: ",T.sizeof," value ",value," ",buf[0..T.sizeof]);
158     write(buf[0..T.sizeof]);
159   }
160 
161   /// Force flushing current block, even if it is not yet filled.
162   /// Should also be used when it's not desired to have records
163   /// crossing block borders.
164   void flush_block() {
165     if (current_size == 0)
166       return;
167 
168     Tuple!(ubyte[], ubyte[], BlockWriteHandler) front_result;
169     if (_compression_tasks.full) {
170       front_result = _compression_tasks.front.yieldForce();
171       _compression_tasks.popFront();
172     }
173 
174     auto compression_task = task!bgzfCompressFunc(buffer[0 .. current_size],
175                                                   compression_level, tmp,
176                                                   _before_write);
177     _compression_tasks.put(compression_task);
178     task_pool.put(compression_task);
179 
180     size_t offset = buffer.ptr - compression_buf.ptr;
181     immutable N = tmp.length;
182     offset += 2 * N;
183     if (offset == compression_buf.length)
184       offset = 0;
185     buffer = compression_buf[offset .. offset + buffer.length];
186     tmp = compression_buf[offset + N .. offset + 2 * N];
187     current_size = 0;
188 
189     if (front_result[0] !is null)
190       writeResult(front_result);
191 
192     while (!_compression_tasks.empty) {
193       auto task = _compression_tasks.front;
194       if (!task.done())
195         break;
196       auto result = task.yieldForce();
197       writeResult(result);
198       _compression_tasks.popFront();
199     }
200   }
201 
202   private void delegate(ubyte[], ubyte[]) _before_write;
203   void setWriteHandler(void delegate(ubyte[], ubyte[]) handler) {
204     _before_write = handler;
205   }
206 
207   private void writeResult(Tuple!(ubyte[], ubyte[], BlockWriteHandler) block) {
208     auto uncompressed = block[0];
209     auto compressed = block[1];
210     auto handler = block[2];
211     if (handler) {// write handler enabled
212       handler(uncompressed, compressed);
213     }
214     // _stream.writeExact(compressed.ptr, compressed.length);
215     f.rawWrite(compressed);
216   }
217 
218   /// Flush all remaining BGZF blocks and underlying stream.
219   void flush() {
220     flush_block();
221 
222     while (!_compression_tasks.empty) {
223       auto task = _compression_tasks.front;
224       auto block = task.yieldForce();
225       writeResult(block);
226       _compression_tasks.popFront();
227     }
228 
229     f.flush();
230     current_size = 0;
231   }
232 
233   /// Flush all remaining BGZF blocks and close source stream.
234   /// Automatically adds empty block at the end, serving as indicator
235   /// of end of stream. This function is automatically called on
236   /// destruction.
237   void close() {
238     flush();
239     f.rawWrite(BGZF_EOF);
240     f.close();
241     pureFree(compression_buf.ptr);
242   }
243 }