1 /* 2 New style BGZF writer. This file is part of Sambamba. 3 Copyright (C) 2017 Pjotr Prins <pjotr.prins@thebird.nl> 4 5 Sambamba is free software; you can redistribute it and/or modify 6 it under the terms of the GNU General Public License as published 7 by the Free Software Foundation; either version 2 of the License, 8 or (at your option) any later version. 9 10 Sambamba is distributed in the hope that it will be useful, but 11 WITHOUT ANY WARRANTY; without even the implied warranty of 12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13 General Public License for more details. 14 15 You should have received a copy of the GNU General Public License 16 along with this program; if not, write to the Free Software 17 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 18 02111-1307 USA 19 20 */ 21 22 // Based on the original by Artem Tarasov. 23 24 module bio2.bgzf_writer; 25 26 // import core.stdc.stdlib : malloc, free; 27 import core.memory: pureMalloc, pureFree; 28 import core.stdc.stdio: fopen, fread, fclose; 29 import std.bitmanip; 30 import std.conv; 31 import std.exception; 32 import std.typecons; 33 import std.parallelism; 34 import std.array; 35 import std.algorithm : max; 36 import std.stdio; 37 import std.typecons; 38 39 import bio.core.bgzf.compress; 40 import bio.core.utils.roundbuf; 41 42 // import undead.stream; 43 44 import bio.bam.constants: BGZF_MAX_BLOCK_SIZE, BGZF_BLOCK_SIZE, BGZF_EOF; 45 import bio2.bgzf; 46 import bio2.constants; 47 48 alias void delegate(ubyte[], ubyte[]) BlockWriteHandler; 49 50 /// Convenience function for Taskpool handler 51 Tuple!(ubyte[], ubyte[], BlockWriteHandler) bgzfCompressFunc(ubyte[] input, 52 int level, 53 ubyte[] output_buffer, 54 BlockWriteHandler handler) 55 { 56 auto output = bgzfCompress(input, level, output_buffer); 57 return tuple(input, output, handler); 58 } 59 60 /// BGZF compression - this is a port of the original that used the 61 /// undead.stream library. 62 struct BgzfWriter { 63 64 private: 65 File f; 66 TaskPool task_pool; 67 68 ubyte[] buffer; // a slice into compression_buf (uncompressed data) 69 ubyte[] tmp; // a slice into compression_buf (compressed data) 70 size_t current_size; 71 int compression_level; 72 73 alias Task!(bgzfCompressFunc, 74 ubyte[], int, ubyte[], BlockWriteHandler) CompressionTask; 75 RoundBuf!(CompressionTask*) _compression_tasks; 76 ubyte[] compression_buf; 77 78 public: 79 80 /// Create new BGZF output stream with a multi-threaded writer 81 this(string fn, int _compression_level=-1) { 82 f = File(fn,"wb"); 83 enforce1(-1 <= compression_level && compression_level <= 9, 84 "BGZF compression level must be a number in interval [-1, 9]"); 85 size_t max_block_size = BGZF_MAX_BLOCK_SIZE; 86 size_t block_size = BGZF_BLOCK_SIZE; 87 task_pool = taskPool(), 88 compression_level = _compression_level; 89 90 // create a ring buffer that is large enough 91 size_t n_tasks = max(task_pool.size, 1) * 16; 92 _compression_tasks = RoundBuf!(CompressionTask*)(n_tasks); 93 94 // create extra block to which we can write while n_tasks are 95 // executed 96 auto comp_buf_size = (2 * n_tasks + 2) * max_block_size; 97 auto p = cast(ubyte*)pureMalloc(comp_buf_size); 98 compression_buf = p[0 .. comp_buf_size]; 99 buffer = compression_buf[0 .. block_size]; 100 tmp = compression_buf[max_block_size .. max_block_size * 2]; 101 } 102 103 ~this() { 104 close(); 105 } 106 107 @disable this(this); // BgzfWriter does not have copy semantics; 108 109 void throwBgzfException(string msg, string file = __FILE__, size_t line = __LINE__) { 110 throw new BgzfException("Error writing BGZF block starting in "~f.name ~ 111 " (" ~ file ~ ":" ~ to!string(line) ~ "): " ~ msg); 112 } 113 114 void enforce1(bool check, lazy string msg, string file = __FILE__, int line = __LINE__) { 115 if (!check) 116 throwBgzfException(msg,file,line); 117 } 118 119 void write(const void* buf, size_t size) { 120 // stderr.writeln("HEY1 writing bytes ",size); 121 if (size + current_size >= buffer.length) { 122 size_t room; 123 ubyte[] data = (cast(ubyte*)buf)[0 .. size]; 124 125 while (data.length + current_size >= buffer.length) { 126 room = buffer.length - current_size; 127 buffer[$ - room .. $] = data[0 .. room]; 128 data = data[room .. $]; 129 130 current_size = buffer.length; 131 132 flush_block(); 133 } 134 135 buffer[0 .. data.length] = data[]; 136 current_size = data.length; 137 } else { 138 buffer[current_size .. current_size + size] = (cast(ubyte*)buf)[0 .. size]; 139 current_size += size; 140 } 141 // return size; 142 } 143 144 void write(ubyte[] buf) { 145 write(buf.ptr, buf.length); 146 } 147 148 void write(string s) { 149 write(cast(ubyte[])s); 150 } 151 152 void write(T)(T value) { // int values 153 // ubyte[T.sizeof] buf; 154 ubyte[] buf = [0,0,0,0,0,0,0,0,0,0]; 155 assert(T.sizeof < buf.length); 156 buf.write!(T,Endian.littleEndian)(value,0); 157 // writeln("HEY T.sizeof: ",T.sizeof," value ",value," ",buf[0..T.sizeof]); 158 write(buf[0..T.sizeof]); 159 } 160 161 /// Force flushing current block, even if it is not yet filled. 162 /// Should also be used when it's not desired to have records 163 /// crossing block borders. 164 void flush_block() { 165 if (current_size == 0) 166 return; 167 168 Tuple!(ubyte[], ubyte[], BlockWriteHandler) front_result; 169 if (_compression_tasks.full) { 170 front_result = _compression_tasks.front.yieldForce(); 171 _compression_tasks.popFront(); 172 } 173 174 auto compression_task = task!bgzfCompressFunc(buffer[0 .. current_size], 175 compression_level, tmp, 176 _before_write); 177 _compression_tasks.put(compression_task); 178 task_pool.put(compression_task); 179 180 size_t offset = buffer.ptr - compression_buf.ptr; 181 immutable N = tmp.length; 182 offset += 2 * N; 183 if (offset == compression_buf.length) 184 offset = 0; 185 buffer = compression_buf[offset .. offset + buffer.length]; 186 tmp = compression_buf[offset + N .. offset + 2 * N]; 187 current_size = 0; 188 189 if (front_result[0] !is null) 190 writeResult(front_result); 191 192 while (!_compression_tasks.empty) { 193 auto task = _compression_tasks.front; 194 if (!task.done()) 195 break; 196 auto result = task.yieldForce(); 197 writeResult(result); 198 _compression_tasks.popFront(); 199 } 200 } 201 202 private void delegate(ubyte[], ubyte[]) _before_write; 203 void setWriteHandler(void delegate(ubyte[], ubyte[]) handler) { 204 _before_write = handler; 205 } 206 207 private void writeResult(Tuple!(ubyte[], ubyte[], BlockWriteHandler) block) { 208 auto uncompressed = block[0]; 209 auto compressed = block[1]; 210 auto handler = block[2]; 211 if (handler) {// write handler enabled 212 handler(uncompressed, compressed); 213 } 214 // _stream.writeExact(compressed.ptr, compressed.length); 215 f.rawWrite(compressed); 216 } 217 218 /// Flush all remaining BGZF blocks and underlying stream. 219 void flush() { 220 flush_block(); 221 222 while (!_compression_tasks.empty) { 223 auto task = _compression_tasks.front; 224 auto block = task.yieldForce(); 225 writeResult(block); 226 _compression_tasks.popFront(); 227 } 228 229 f.flush(); 230 current_size = 0; 231 } 232 233 /// Flush all remaining BGZF blocks and close source stream. 234 /// Automatically adds empty block at the end, serving as indicator 235 /// of end of stream. This function is automatically called on 236 /// destruction. 237 void close() { 238 flush(); 239 f.rawWrite(BGZF_EOF); 240 f.close(); 241 pureFree(compression_buf.ptr); 242 } 243 }