1 /* 2 This file is part of BioD. 3 Copyright (C) 2012-2015 Artem Tarasov <lomereiter@gmail.com> 4 5 Permission is hereby granted, free of charge, to any person obtaining a 6 copy of this software and associated documentation files (the "Software"), 7 to deal in the Software without restriction, including without limitation 8 the rights to use, copy, modify, merge, publish, distribute, sublicense, 9 and/or sell copies of the Software, and to permit persons to whom the 10 Software is furnished to do so, subject to the following conditions: 11 12 The above copyright notice and this permission notice shall be included in 13 all copies or substantial portions of the Software. 14 15 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 16 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 17 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 18 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 19 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 20 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER 21 DEALINGS IN THE SOFTWARE. 22 23 */ 24 module bio.std.hts.bam.writer; 25 26 import bio.std.hts.bam.referenceinfo; 27 import bio.std.hts.sam.header; 28 import bio.std.hts.bam.constants; 29 import bio.std.hts.bam.bai.indexing; 30 import bio.std.hts.bam.read; 31 import bio.std.hts.bam.readrange; 32 import bio.core.bgzf.outputstream; 33 import bio.core.bgzf.virtualoffset; 34 import bio.core.utils.stream; 35 36 import std.parallelism; 37 import std.exception; 38 import contrib.undead.stream; 39 import std.traits; 40 import std.system; 41 import std.algorithm; 42 import std.array; 43 import std.bitmanip; 44 45 /** Class for outputting BAM. 46 $(BR) 47 Compresses BGZF blocks in parallel. 48 Tries to write reads so that they don't cross BGZF block borders. 49 $(BR) 50 Usage is very simple, see example below. 51 52 Example: 53 -------------------------------------- 54 import bio.std.hts.bam.writer, bio.std.hts.bam.reader; 55 ... 56 auto src = new BamReader("in.bam"); 57 auto dst = new BamWriter("out.bam", 9); // maximal compression 58 scope (exit) dst.finish(); // close the stream at exit 59 dst.writeSamHeader(src.header); // copy header and reference sequence info 60 dst.writeReferenceSequenceInfo(src.reference_sequences); 61 foreach (read; src.reads) { 62 if (read.mapping_quality > 10) // skip low-quality reads 63 dst.writeRecord(read); 64 } 65 -------------------------------------- 66 */ 67 final class BamWriter { 68 69 /// Creates new BAM writer outputting to file or $(I stream). 70 /// Automatically writes BAM magic number (4 bytes). 71 /// 72 /// Params: 73 /// compression_level = compression level, must be in range -1..9 74 /// task_pool = task pool to use for parallel compression 75 /// buffer_size = size of BgzfOutputStream buffer 76 this(contrib.undead.stream.Stream stream, 77 int compression_level=-1, 78 std.parallelism.TaskPool task_pool=std.parallelism.taskPool, 79 size_t buffer_size=0) 80 { 81 _stream = new BgzfOutputStream(stream, compression_level, 82 task_pool, buffer_size); 83 _stream.setWriteHandler((ubyte[] uncompressed, ubyte[] compressed) { 84 _bytes_written += compressed.length; 85 }); 86 87 writeString(BAM_MAGIC); 88 } 89 90 /// ditto 91 this(string filename, 92 int compression_level=-1, 93 std.parallelism.TaskPool task_pool=std.parallelism.taskPool) 94 { 95 _filename = filename; 96 auto filestream = new bio.core.utils.stream.File(filename, "wb+"); 97 this(filestream, compression_level, task_pool); 98 } 99 100 /// Can be called right after the stream constructor, only once 101 void setFilename(string output_filename) { 102 enforce(_filename is null, "Can't set output filename twice"); 103 _filename = output_filename; 104 } 105 106 /// By default, the writer attempts to automatically create index 107 /// when writing coordinate-sorted files. If this behaviour is not 108 /// desired, it can be switched off before writing SAM header. 109 void disableAutoIndexCreation() { 110 _disable_index_creation = true; 111 } 112 113 package void writeByteArray(const(ubyte[]) array) { 114 _stream.writeExact(array.ptr, array.length); 115 } 116 117 package void writeString(string str) { 118 writeByteArray(cast(ubyte[])str); 119 } 120 121 package void writeInteger(T)(T integer) if (isIntegral!T) 122 { 123 ubyte[T.sizeof] buf = nativeToLittleEndian(integer); 124 _stream.writeExact(buf.ptr, buf.length); 125 } 126 127 private { 128 size_t _bytes_written; 129 bool _create_index = false; 130 bool _disable_index_creation = false; 131 bool _record_writing_mode = false; 132 string _filename; 133 134 IndexBuilder _index_builder; 135 136 VirtualOffset _start_vo, _end_vo; 137 ubyte[] _pending_read_data_buf; 138 ubyte[] _pending_read_data; 139 140 void appendReadData(ubyte[] data) { 141 auto required = _pending_read_data.length + data.length; 142 if (_pending_read_data_buf.length < required) 143 _pending_read_data_buf.length = max(required, _pending_read_data_buf.length * 2); 144 _pending_read_data_buf[_pending_read_data.length .. $][0 .. data.length] = data; 145 _pending_read_data = _pending_read_data_buf[0 .. required]; 146 } 147 148 size_t _len; 149 150 void indexBlock(ubyte[] uncompressed, ubyte[] compressed) { 151 ushort inner_offset = 0; 152 153 void indexRead(ubyte[] data) { 154 auto read = BamRead(data); 155 if (uncompressed.length > 0) { 156 _end_vo = VirtualOffset(_bytes_written, inner_offset); 157 } else { 158 _end_vo = VirtualOffset(_bytes_written + compressed.length, 0); 159 } 160 auto read_block = BamReadBlock(_start_vo, _end_vo, read); 161 _index_builder.put(read_block); 162 } 163 164 if (_pending_read_data !is null) { 165 if (uncompressed.length < _len) { 166 appendReadData(uncompressed); 167 _len -= uncompressed.length; 168 uncompressed = null; 169 } else { 170 appendReadData(uncompressed[0 .. _len]); 171 uncompressed = uncompressed[_len .. $]; 172 inner_offset = cast(ushort)_len; 173 indexRead(_pending_read_data); 174 _pending_read_data = null; 175 } 176 } 177 178 while (uncompressed.length > 0) { 179 _len = *cast(int*)(uncompressed.ptr); // assume LE... 180 _start_vo = VirtualOffset(_bytes_written, inner_offset); 181 if (_len + int.sizeof <= uncompressed.length) { 182 _pending_read_data = null; 183 auto read_data = uncompressed[int.sizeof .. int.sizeof + _len]; 184 uncompressed = uncompressed[int.sizeof + _len .. $]; 185 inner_offset += _len + int.sizeof; 186 indexRead(read_data); 187 } else { // read spans multiple BGZF blocks 188 appendReadData(uncompressed[int.sizeof .. $]); 189 _len -= _pending_read_data.length; 190 break; 191 } 192 } 193 _bytes_written += compressed.length; 194 } 195 } 196 197 /// Writes SAM header. Should be called after construction. 198 void writeSamHeader(bio.std.hts.sam.header.SamHeader header) { 199 writeSamHeader(header.text); 200 } 201 202 /// ditto 203 void writeSamHeader(string header_text) { 204 _create_index = !_disable_index_creation && 205 !header_text.find("SO:coordinate").empty && 206 _filename.length >= 4 && 207 _filename[$ - 4 .. $] == ".bam"; 208 writeInteger(cast(int)header_text.length); 209 writeString(header_text); 210 } 211 212 /// Writes reference sequence information. Should be called after 213 /// dumping SAM header. Writer will store this array to use later for 214 /// resolving read reference IDs to names. 215 /// 216 /// Flushes current BGZF block. 217 void writeReferenceSequenceInfo(const(bio.std.hts.bam.referenceinfo.ReferenceSequenceInfo)[] reference_sequences) 218 { 219 _reference_sequences = reference_sequences; 220 221 auto n_refs = cast(int)reference_sequences.length; 222 writeInteger(n_refs); 223 foreach (sequence; reference_sequences) { 224 writeInteger(cast(int)(sequence.name.length + 1)); 225 writeString(sequence.name); 226 writeInteger(cast(ubyte)'\0'); 227 writeInteger(cast(int)sequence.length); 228 } 229 230 if (_create_index) { 231 auto index = new bio.core.utils.stream.File(_filename ~ ".bai", "wb+"); 232 _index_builder = IndexBuilder(index, n_refs); 233 _index_builder.check_bins = true; 234 } 235 236 _stream.flushCurrentBlock(); 237 } 238 239 private void indexingWriteHandler(ubyte[] uncomp, ubyte[] comp) { 240 indexBlock(uncomp, comp); 241 } 242 243 /// Writes BAM read. Throws exception if read reference ID is out of range. 244 void writeRecord(R)(R read) { 245 enforce(read.ref_id == -1 || read.ref_id < _reference_sequences.length, 246 "Read reference ID is out of range"); 247 248 if (!_record_writing_mode) { 249 if (_create_index) { 250 _record_writing_mode = true; 251 _stream.setWriteHandler(&indexingWriteHandler); 252 } else { 253 _stream.setWriteHandler(null); 254 } 255 } 256 257 read._recalculate_bin(); 258 259 auto read_size = read.size_in_bytes; 260 if (read_size + _current_size > BGZF_BLOCK_SIZE) { 261 _stream.flushCurrentBlock(); 262 read.write(this); 263 _current_size = read_size; 264 } else { 265 read.write(this); 266 _current_size += read_size; 267 } 268 } 269 270 /// Flushes current BGZF block. 271 void flush() { 272 _stream.flush(); 273 } 274 275 /// Flushes buffer and closes output stream. Adds BAM EOF block automatically. 276 void finish() { 277 _stream.close(); 278 if (_create_index) 279 _index_builder.finish(); 280 } 281 282 private { 283 BgzfOutputStream _stream; 284 const(ReferenceSequenceInfo)[] _reference_sequences; 285 size_t _current_size; // number of bytes written to the current BGZF block 286 } 287 }