1 /*
2     This file is part of BioD.
3     Copyright (C) 2012-2015    Artem Tarasov <lomereiter@gmail.com>
4 
5     Permission is hereby granted, free of charge, to any person obtaining a
6     copy of this software and associated documentation files (the "Software"),
7     to deal in the Software without restriction, including without limitation
8     the rights to use, copy, modify, merge, publish, distribute, sublicense,
9     and/or sell copies of the Software, and to permit persons to whom the
10     Software is furnished to do so, subject to the following conditions:
11 
12     The above copyright notice and this permission notice shall be included in
13     all copies or substantial portions of the Software.
14 
15     THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16     IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17     FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18     AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19     LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20     FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21     DEALINGS IN THE SOFTWARE.
22 
23 */
24 module bio.std.hts.bam.writer;
25 
26 import bio.std.hts.bam.referenceinfo;
27 import bio.std.hts.sam.header;
28 import bio.std.hts.bam.constants;
29 import bio.std.hts.bam.bai.indexing;
30 import bio.std.hts.bam.read;
31 import bio.std.hts.bam.readrange;
32 import bio.core.bgzf.outputstream;
33 import bio.core.bgzf.virtualoffset;
34 import bio.core.utils.stream;
35 
36 import std.parallelism;
37 import std.exception;
38 import contrib.undead.stream;
39 import std.traits;
40 import std.system;
41 import std.algorithm;
42 import std.array;
43 import std.bitmanip;
44 
45 /** Class for outputting BAM.
46     $(BR)
47     Compresses BGZF blocks in parallel.
48     Tries to write reads so that they don't cross BGZF block borders.
49     $(BR)
50     Usage is very simple, see example below.
51 
52     Example:
53     --------------------------------------
54     import bio.std.hts.bam.writer, bio.std.hts.bam.reader;
55     ...
56     auto src = new BamReader("in.bam");
57     auto dst = new BamWriter("out.bam", 9); // maximal compression
58     scope (exit) dst.finish();              // close the stream at exit
59     dst.writeSamHeader(src.header);         // copy header and reference sequence info
60     dst.writeReferenceSequenceInfo(src.reference_sequences);
61     foreach (read; src.reads) {
62         if (read.mapping_quality > 10)      // skip low-quality reads
63             dst.writeRecord(read);
64     }
65     --------------------------------------
66     */
67 final class BamWriter {
68 
69     /// Creates new BAM writer outputting to file or $(I stream).
70     /// Automatically writes BAM magic number (4 bytes).
71     ///
72     /// Params:
73     ///     compression_level  = compression level, must be in range -1..9
74     ///     task_pool          = task pool to use for parallel compression
75     ///     buffer_size        = size of BgzfOutputStream buffer
76     this(contrib.undead.stream.Stream stream,
77          int compression_level=-1,
78          std.parallelism.TaskPool task_pool=std.parallelism.taskPool,
79          size_t buffer_size=0)
80     {
81         _stream = new BgzfOutputStream(stream, compression_level,
82                                        task_pool, buffer_size);
83         _stream.setWriteHandler((ubyte[] uncompressed, ubyte[] compressed) {
84             _bytes_written += compressed.length;
85         });
86 
87         writeString(BAM_MAGIC);
88     }
89 
90     /// ditto
91     this(string filename,
92          int compression_level=-1,
93          std.parallelism.TaskPool task_pool=std.parallelism.taskPool)
94     {
95         _filename = filename;
96         auto filestream = new bio.core.utils.stream.File(filename, "wb+");
97         this(filestream, compression_level, task_pool);
98     }
99 
100     /// Can be called right after the stream constructor, only once
101     void setFilename(string output_filename) {
102         enforce(_filename is null, "Can't set output filename twice");
103         _filename = output_filename;
104     }
105 
106     /// By default, the writer attempts to automatically create index
107     /// when writing coordinate-sorted files. If this behaviour is not
108     /// desired, it can be switched off before writing SAM header.
109     void disableAutoIndexCreation() {
110         _disable_index_creation = true;
111     }
112 
113     package void writeByteArray(const(ubyte[]) array) {
114         _stream.writeExact(array.ptr, array.length);
115     }
116 
117     package void writeString(string str) {
118         writeByteArray(cast(ubyte[])str);
119     }
120 
121     package void writeInteger(T)(T integer) if (isIntegral!T)
122     {
123         ubyte[T.sizeof] buf = nativeToLittleEndian(integer);
124         _stream.writeExact(buf.ptr, buf.length);
125     }
126 
127     private {
128         size_t _bytes_written;
129         bool _create_index = false;
130         bool _disable_index_creation = false;
131         bool _record_writing_mode = false;
132         string _filename;
133 
134         IndexBuilder _index_builder;
135 
136         VirtualOffset _start_vo, _end_vo;
137         ubyte[] _pending_read_data_buf;
138         ubyte[] _pending_read_data;
139 
140         void appendReadData(ubyte[] data) {
141             auto required = _pending_read_data.length + data.length;
142             if (_pending_read_data_buf.length < required)
143                 _pending_read_data_buf.length = max(required, _pending_read_data_buf.length * 2);
144             _pending_read_data_buf[_pending_read_data.length .. $][0 .. data.length] = data;
145             _pending_read_data = _pending_read_data_buf[0 .. required];
146         }
147 
148         size_t _len;
149 
150         void indexBlock(ubyte[] uncompressed, ubyte[] compressed) {
151             ushort inner_offset = 0;
152 
153             void indexRead(ubyte[] data) {
154                 auto read = BamRead(data);
155                 if (uncompressed.length > 0) {
156                     _end_vo = VirtualOffset(_bytes_written, inner_offset);
157                 } else {
158                     _end_vo = VirtualOffset(_bytes_written + compressed.length, 0);
159                 }
160                 auto read_block = BamReadBlock(_start_vo, _end_vo, read);
161                 _index_builder.put(read_block);
162             }
163 
164             if (_pending_read_data !is null) {
165                 if (uncompressed.length < _len) {
166                     appendReadData(uncompressed);
167                     _len -= uncompressed.length;
168                     uncompressed = null;
169                 } else {
170                     appendReadData(uncompressed[0 .. _len]);
171                     uncompressed = uncompressed[_len .. $];
172                     inner_offset = cast(ushort)_len;
173                     indexRead(_pending_read_data);
174                     _pending_read_data = null;
175                 }
176             }
177 
178             while (uncompressed.length > 0) {
179                 _len = *cast(int*)(uncompressed.ptr); // assume LE...
180                 _start_vo = VirtualOffset(_bytes_written, inner_offset);
181                 if (_len + int.sizeof <= uncompressed.length) {
182                     _pending_read_data = null;
183                     auto read_data = uncompressed[int.sizeof .. int.sizeof + _len];
184                     uncompressed = uncompressed[int.sizeof + _len .. $];
185                     inner_offset += _len + int.sizeof;
186                     indexRead(read_data);
187                 } else { // read spans multiple BGZF blocks
188                     appendReadData(uncompressed[int.sizeof .. $]);
189                     _len -= _pending_read_data.length;
190                     break;
191                 }
192             }
193             _bytes_written += compressed.length;
194         }
195     }
196 
197     /// Writes SAM header. Should be called after construction.
198     void writeSamHeader(bio.std.hts.sam.header.SamHeader header) {
199         writeSamHeader(header.text);
200     }
201 
202     /// ditto
203     void writeSamHeader(string header_text) {
204         _create_index = !_disable_index_creation &&
205             !header_text.find("SO:coordinate").empty &&
206             _filename.length >= 4 &&
207             _filename[$ - 4 .. $] == ".bam";
208         writeInteger(cast(int)header_text.length);
209         writeString(header_text);
210     }
211 
212     /// Writes reference sequence information. Should be called after
213     /// dumping SAM header. Writer will store this array to use later for
214     /// resolving read reference IDs to names.
215     ///
216     /// Flushes current BGZF block.
217     void writeReferenceSequenceInfo(const(bio.std.hts.bam.referenceinfo.ReferenceSequenceInfo)[] reference_sequences)
218     {
219         _reference_sequences = reference_sequences;
220 
221         auto n_refs = cast(int)reference_sequences.length;
222         writeInteger(n_refs);
223         foreach (sequence; reference_sequences) {
224             writeInteger(cast(int)(sequence.name.length + 1));
225             writeString(sequence.name);
226             writeInteger(cast(ubyte)'\0');
227             writeInteger(cast(int)sequence.length);
228         }
229 
230         if (_create_index) {
231             auto index = new bio.core.utils.stream.File(_filename ~ ".bai", "wb+");
232             _index_builder = IndexBuilder(index, n_refs);
233             _index_builder.check_bins = true;
234         }
235 
236         _stream.flushCurrentBlock();
237     }
238 
239     private void indexingWriteHandler(ubyte[] uncomp, ubyte[] comp) {
240         indexBlock(uncomp, comp);
241     }
242 
243     /// Writes BAM read. Throws exception if read reference ID is out of range.
244     void writeRecord(R)(R read) {
245         enforce(read.ref_id == -1 || read.ref_id < _reference_sequences.length,
246                 "Read reference ID is out of range");
247 
248         if (!_record_writing_mode) {
249             if (_create_index) {
250                 _record_writing_mode = true;
251                 _stream.setWriteHandler(&indexingWriteHandler);
252             } else {
253                 _stream.setWriteHandler(null);
254             }
255         }
256 
257         read._recalculate_bin();
258 
259         auto read_size = read.size_in_bytes;
260         if (read_size + _current_size > BGZF_BLOCK_SIZE) {
261             _stream.flushCurrentBlock();
262             read.write(this);
263             _current_size = read_size;
264         } else {
265             read.write(this);
266             _current_size += read_size;
267         }
268     }
269 
270     /// Flushes current BGZF block.
271     void flush() {
272         _stream.flush();
273     }
274 
275     /// Flushes buffer and closes output stream. Adds BAM EOF block automatically.
276     void finish() {
277         _stream.close();
278         if (_create_index)
279             _index_builder.finish();
280     }
281 
282     private {
283         BgzfOutputStream _stream;
284         const(ReferenceSequenceInfo)[] _reference_sequences;
285         size_t _current_size; // number of bytes written to the current BGZF block
286     }
287 }