1 /* 2 New style BGZF writer. This file is part of Sambamba. 3 Copyright (C) 2017 Pjotr Prins <pjotr.prins@thebird.nl> 4 5 Sambamba is free software; you can redistribute it and/or modify 6 it under the terms of the GNU General Public License as published 7 by the Free Software Foundation; either version 2 of the License, 8 or (at your option) any later version. 9 10 Sambamba is distributed in the hope that it will be useful, but 11 WITHOUT ANY WARRANTY; without even the implied warranty of 12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13 General Public License for more details. 14 15 You should have received a copy of the GNU General Public License 16 along with this program; if not, write to the Free Software 17 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 18 02111-1307 USA 19 20 */ 21 22 // Based on the original by Artem Tarasov. 23 24 module bio.std.experimental.hts.bgzf_writer; 25 26 // import core.stdc.stdlib : malloc, free; 27 import core.memory: pureMalloc, pureFree; 28 import core.stdc.stdio: fopen, fread, fclose; 29 import std.bitmanip; 30 import std.conv; 31 import std.exception; 32 import std.typecons; 33 import std.parallelism; 34 import std.array; 35 import std.algorithm : max; 36 import std.stdio; 37 import std.typecons; 38 39 // depends on old 40 import bio.core.bgzf.compress; 41 import bio.core.utils.roundbuf; 42 43 // import undead.stream; 44 45 import bio.std.hts.bam.constants: BGZF_MAX_BLOCK_SIZE, BGZF_BLOCK_SIZE, BGZF_EOF; 46 import bio.std.experimental.hts.bgzf; 47 import bio.std.experimental.hts.constants; 48 49 alias void delegate(ubyte[], ubyte[]) BlockWriteHandler; 50 51 /// Convenience function for Taskpool handler 52 Tuple!(ubyte[], ubyte[], BlockWriteHandler) bgzfCompressFunc(ubyte[] input, 53 int level, 54 ubyte[] output_buffer, 55 BlockWriteHandler handler) 56 { 57 auto output = bgzfCompress(input, level, output_buffer); 58 return tuple(input, output, handler); 59 } 60 61 /// BGZF compression - this is a port of the original that used the 62 /// undead.stream library. 63 struct BgzfWriter { 64 65 private: 66 File f; 67 TaskPool task_pool; 68 69 ubyte[] buffer; // a slice into compression_buf (uncompressed data) 70 ubyte[] tmp; // a slice into compression_buf (compressed data) 71 size_t current_size; 72 int compression_level; 73 74 alias Task!(bgzfCompressFunc, 75 ubyte[], int, ubyte[], BlockWriteHandler) CompressionTask; 76 RoundBuf!(CompressionTask*) _compression_tasks; 77 ubyte[] compression_buf; 78 79 public: 80 81 /// Create new BGZF output stream with a multi-threaded writer 82 this(string fn, int _compression_level=-1) { 83 f = File(fn,"wb"); 84 enforce1(-1 <= compression_level && compression_level <= 9, 85 "BGZF compression level must be a number in interval [-1, 9]"); 86 size_t max_block_size = BGZF_MAX_BLOCK_SIZE; 87 size_t block_size = BGZF_BLOCK_SIZE; 88 task_pool = taskPool(), 89 compression_level = _compression_level; 90 91 // create a ring buffer that is large enough 92 size_t n_tasks = max(task_pool.size, 1) * 16; 93 _compression_tasks = RoundBuf!(CompressionTask*)(n_tasks); 94 95 // create extra block to which we can write while n_tasks are 96 // executed 97 auto comp_buf_size = (2 * n_tasks + 2) * max_block_size; 98 auto p = cast(ubyte*)pureMalloc(comp_buf_size); 99 compression_buf = p[0 .. comp_buf_size]; 100 buffer = compression_buf[0 .. block_size]; 101 tmp = compression_buf[max_block_size .. max_block_size * 2]; 102 } 103 104 ~this() { 105 close(); 106 } 107 108 @disable this(this); // BgzfWriter does not have copy semantics; 109 110 void throwBgzfException(string msg, string file = __FILE__, size_t line = __LINE__) { 111 throw new BgzfException("Error writing BGZF block starting in "~f.name ~ 112 " (" ~ file ~ ":" ~ to!string(line) ~ "): " ~ msg); 113 } 114 115 void enforce1(bool check, lazy string msg, string file = __FILE__, int line = __LINE__) { 116 if (!check) 117 throwBgzfException(msg,file,line); 118 } 119 120 void write(const void* buf, size_t size) { 121 // stderr.writeln("HEY1 writing bytes ",size); 122 if (size + current_size >= buffer.length) { 123 size_t room; 124 ubyte[] data = (cast(ubyte*)buf)[0 .. size]; 125 126 while (data.length + current_size >= buffer.length) { 127 room = buffer.length - current_size; 128 buffer[$ - room .. $] = data[0 .. room]; 129 data = data[room .. $]; 130 131 current_size = buffer.length; 132 133 flush_block(); 134 } 135 136 buffer[0 .. data.length] = data[]; 137 current_size = data.length; 138 } else { 139 buffer[current_size .. current_size + size] = (cast(ubyte*)buf)[0 .. size]; 140 current_size += size; 141 } 142 // return size; 143 } 144 145 void write(ubyte[] buf) { 146 write(buf.ptr, buf.length); 147 } 148 149 void write(string s) { 150 write(cast(ubyte[])s); 151 } 152 153 void write(T)(T value) { // int values 154 // ubyte[T.sizeof] buf; 155 ubyte[] buf = [0,0,0,0,0,0,0,0,0,0]; 156 assert(T.sizeof < buf.length); 157 buf.write!(T,Endian.littleEndian)(value,0); 158 // writeln("HEY T.sizeof: ",T.sizeof," value ",value," ",buf[0..T.sizeof]); 159 write(buf[0..T.sizeof]); 160 } 161 162 /// Force flushing current block, even if it is not yet filled. 163 /// Should also be used when it's not desired to have records 164 /// crossing block borders. 165 void flush_block() { 166 if (current_size == 0) 167 return; 168 169 Tuple!(ubyte[], ubyte[], BlockWriteHandler) front_result; 170 if (_compression_tasks.full) { 171 front_result = _compression_tasks.front.yieldForce(); 172 _compression_tasks.popFront(); 173 } 174 175 auto compression_task = task!bgzfCompressFunc(buffer[0 .. current_size], 176 compression_level, tmp, 177 _before_write); 178 _compression_tasks.put(compression_task); 179 task_pool.put(compression_task); 180 181 size_t offset = buffer.ptr - compression_buf.ptr; 182 immutable N = tmp.length; 183 offset += 2 * N; 184 if (offset == compression_buf.length) 185 offset = 0; 186 buffer = compression_buf[offset .. offset + buffer.length]; 187 tmp = compression_buf[offset + N .. offset + 2 * N]; 188 current_size = 0; 189 190 if (front_result[0] !is null) 191 writeResult(front_result); 192 193 while (!_compression_tasks.empty) { 194 auto task = _compression_tasks.front; 195 if (!task.done()) 196 break; 197 auto result = task.yieldForce(); 198 writeResult(result); 199 _compression_tasks.popFront(); 200 } 201 } 202 203 private void delegate(ubyte[], ubyte[]) _before_write; 204 void setWriteHandler(void delegate(ubyte[], ubyte[]) handler) { 205 _before_write = handler; 206 } 207 208 private void writeResult(Tuple!(ubyte[], ubyte[], BlockWriteHandler) block) { 209 auto uncompressed = block[0]; 210 auto compressed = block[1]; 211 auto handler = block[2]; 212 if (handler) {// write handler enabled 213 handler(uncompressed, compressed); 214 } 215 // _stream.writeExact(compressed.ptr, compressed.length); 216 f.rawWrite(compressed); 217 } 218 219 /// Flush all remaining BGZF blocks and underlying stream. 220 void flush() { 221 flush_block(); 222 223 while (!_compression_tasks.empty) { 224 auto task = _compression_tasks.front; 225 auto block = task.yieldForce(); 226 writeResult(block); 227 _compression_tasks.popFront(); 228 } 229 230 f.flush(); 231 current_size = 0; 232 } 233 234 /// Flush all remaining BGZF blocks and close source stream. 235 /// Automatically adds empty block at the end, serving as indicator 236 /// of end of stream. This function is automatically called on 237 /// destruction. 238 void close() { 239 flush(); 240 f.rawWrite(BGZF_EOF); 241 f.close(); 242 pureFree(compression_buf.ptr); 243 } 244 }