1 /*
2     This file is part of BioD.
3     Copyright (C) 2012-2014    Artem Tarasov <lomereiter@gmail.com>
4 
5     Permission is hereby granted, free of charge, to any person obtaining a
6     copy of this software and associated documentation files (the "Software"),
7     to deal in the Software without restriction, including without limitation
8     the rights to use, copy, modify, merge, publish, distribute, sublicense,
9     and/or sell copies of the Software, and to permit persons to whom the
10     Software is furnished to do so, subject to the following conditions:
11     
12     The above copyright notice and this permission notice shall be included in
13     all copies or substantial portions of the Software.
14     
15     THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16     IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17     FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18     AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19     LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20     FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21     DEALINGS IN THE SOFTWARE.
22 
23 */
24 module bio.std.hts.utils.samheadermerger;
25 
26 import bio.std.hts.sam.header;
27 import bio.std.hts.bam.validation.samheader;
28 
29 import std.array;
30 import std.range;
31 import std.algorithm;
32 import std.conv;
33 import std.typecons;
34 import std.exception;
35 
36 import bio.std.hts.utils.graph;
37 
38 /// Class encapsulating functionality of merging several SAM headers
39 /// into one. (In fact, its role is to just group several variables,
40 /// so it could be replaced by a function returning a struct.)
41 class SamHeaderMerger {
42 
43     enum Strategy {
44         simple,
45         usingIndex
46     }
47 
48     Strategy strategy;
49 
50     /// Takes array of SAM headers as an input.
51     this(SamHeader[] headers, bool validate_headers=false) {
52         _headers = headers;
53         _len = _headers.length;
54 
55         merged_header = new SamHeader();
56         ref_id_map = new size_t[size_t][_len];
57         ref_id_reverse_map = new size_t[size_t][_len];
58         program_id_map = new string[string][_len];
59         readgroup_id_map = new string[string][_len];
60 
61         if (validate_headers) {
62             // TODO: make custom validator for producing better error message
63             foreach (size_t i, header; _headers) {
64                 if (!isValid(header)) {
65                     throw new Exception("header #" ~ to!string(i) ~ " is invalid, can't merge");
66                 }
67             }
68         }
69 
70         auto expected = _headers[0].sorting_order;
71         if (expected != SortingOrder.coordinate && expected != SortingOrder.queryname) {
72             throw new Exception("file headers indicate that some files are not sorted");
73         }
74         foreach (header; _headers) {
75             if (header.sorting_order != expected) {
76                 throw new Exception("sorting orders of files don't agree, can't merge");
77             }
78         }
79         merged_header.sorting_order = expected;
80 
81         mergeSequenceDictionaries();
82         mergeReadGroups();
83         mergeProgramRecords();
84         mergeComments();
85     }
86 
87     /// The main result of merging -- new SamHeader
88     SamHeader merged_header;
89 
90     /// Map: index of SamHeader in input array of headers -> old refID -> new refID
91     size_t[size_t][] ref_id_map;
92 
93     /// the same for read group identifiers
94     string[string][] readgroup_id_map;
95 
96     /// the same for program record identifiers
97     string[string][] program_id_map;
98 
99     /// Map: index of SamHeader in input array of headers -> new refID -> old refID
100     size_t[size_t][] ref_id_reverse_map;
101 
102 private:
103 
104     // NOTE FOR DEVELOPER:
105     // for more info on what's going on here, read comments in sambamba/sambamba/merge.d
106 
107     SamHeader[] _headers;
108     size_t _len; // number of headers
109 
110     static void addVerticeToDict(ref SqLine[string] dict, ref SqLine line) {
111         if (line.name in dict) {
112             if (line.length != dict[line.name].length) {
113                 // those two @SQ lines are highly unlikely to refer to the same
114                 // reference sequence if lengths are different
115                 throw new Exception("can't merge SAM headers: one of references with " ~
116                         "name " ~ line.name ~ " has length " ~ 
117                         to!string(dict[line.name].length) ~ 
118                         " while another one with the same name has length " ~ 
119                         to!string(line.length));
120             }
121             // TODO: merge individual tags?
122         } else {
123             dict[line.name] = line;
124         }
125     }
126 
127     void mergeSequenceDictionaries() {
128         // make a directed graph out of reference sequences and do a topological sort
129        
130         SqLine[string] dict;
131 
132         // create a graph
133         auto g = new DirectedGraph();
134         foreach (header; _headers) {
135             auto sequences = header.sequences.values;
136             auto prev = sequences.front;
137             addVerticeToDict(dict, prev);
138             g.addNode(prev.name);
139             sequences.popFront();
140             while (!sequences.empty) {
141                 auto cur = sequences.front;
142                 addVerticeToDict(dict, cur);
143                 g.addEdge(prev.name, cur.name);
144                 prev = cur;
145                 sequences.popFront();
146             }
147         }
148 
149         // get topologically sorted nodes
150         try {
151             foreach (v; g.topologicalSort()) {
152                 merged_header.sequences.add(dict[v]);
153             }
154             strategy = Strategy.simple;
155         } catch (Exception e) {
156             // failed, try another strategy which requires index files
157             foreach (sq_line; sort!((a, b) => a.name < b.name)(dict.values)) {
158                 merged_header.sequences.add(sq_line);
159             }
160             strategy = Strategy.usingIndex;
161         }
162 
163         // make mappings
164         foreach (size_t i, header; _headers) {
165             foreach (size_t j, SqLine sq; header.sequences) {
166                 auto new_index = merged_header.sequences.getSequenceIndex(sq.name);
167                 if (new_index < 0) {
168                   import std.stdio;
169                   stderr.writeln("merged header sequence dictionary: \n",
170                       merged_header.sequences.values);
171                   throw new Exception("BUG: " ~ sq.name ~ " is not in merged header dictionary");
172                 }
173                 ref_id_map[i][j] = to!size_t(new_index);
174                 ref_id_reverse_map[i][to!size_t(new_index)] = j;
175             }
176         }
177     }
178 
179     // The reason to pass by reference is that when merging program records,
180     // this function is called in a loop, and we need to keep some structures between calls.
181     //
182     // $(D dict) is a collection of Line structs, which will finally be part of the header;
183     // $(D record_id_map) is an array of mappings (for each header) where old record identifier
184     // is mapped into a new one;
185     static void mergeHeaderLines(Line, R)(R records_with_file_ids, size_t file_count,
186                                           ref HeaderLineDictionary!Line dict,
187                                           ref string[string][] record_id_map)
188         if (is(typeof(Line.identifier) == string) &&
189             is(ElementType!R == Tuple!(Line, size_t)) &&
190             (is(Line == RgLine) || is(Line == PgLine)))
191     {
192         // Map: record identifier -> record -> list of files
193         size_t[][Line][string] id_to_record;
194 
195         foreach (record_and_file; records_with_file_ids) {
196             auto rec = record_and_file[0];
197             auto file_id = record_and_file[1];
198             id_to_record[rec.identifier][rec] ~= file_id;
199         }
200 
201         // Loop through all identifiers
202         foreach (record_id, records_with_same_id; id_to_record) {
203 
204             // Several read groups/program records can share the 
205             // common identifier, and each one of them can be 
206             // presented in several files.
207             // 
208             // If read groups/program records are equal 
209             // (i.e. all fields are equal) then they are treated 
210             // as a single read group/program record
211             // 
212             // Here we iterate over those read groups/program records
213             // and files where they were seen, renaming identifiers
214             // in order to avoid collisions where necessary.
215             foreach (rec, file_ids; records_with_same_id) {
216                 string new_id = record_id;
217                 if (record_id in dict) {
218                     // if already used ID is encountered,
219                     // find unused ID by adding ".N" to the old ID
220                     for (int i = 1; ; ++i) {
221                         new_id = record_id ~ "." ~ to!string(i);
222                         if (new_id !in dict) {
223                             break;
224                         }
225                     }
226                 }
227 
228                 // save mapping
229                 foreach (file_id; file_ids) {
230                     record_id_map[file_id][record_id] = new_id;
231                 }
232 
233                 // update merged header
234                 rec.identifier = new_id;
235                 dict.add(rec);
236             }
237         }
238     }
239 
240     void mergeReadGroups() {
241         Tuple!(RgLine, size_t)[] readgroups_with_file_ids;
242         for (size_t i = 0; i < _len; i++)
243             foreach (rg; _headers[i].read_groups.values)
244                 readgroups_with_file_ids ~= tuple(rg, i);
245 
246         auto dict = new RgLineDictionary();
247 
248         mergeHeaderLines(readgroups_with_file_ids, _len,
249                          dict, readgroup_id_map);
250 
251         merged_header.read_groups = dict;
252     }
253 
254     void mergeProgramRecords() {
255         Tuple!(PgLine, size_t)[] programs_with_file_ids;
256         for (size_t i = 0; i < _len; i++)
257             foreach (pg; _headers[i].programs.values)
258                 programs_with_file_ids ~= tuple(pg, i);
259 
260         auto vertices = partition!"a[0].previous_program !is null"(programs_with_file_ids);
261         programs_with_file_ids = programs_with_file_ids[0 .. $ - vertices.length];
262 
263         auto dict = new PgLineDictionary();
264 
265         while (!vertices.empty) {
266             // populates dict and program_id_map
267             mergeHeaderLines!PgLine(vertices, _len, dict, program_id_map); 
268 
269             // find children of current vertices
270             auto old_ids = map!"tuple(a[0].identifier, a[1])"(vertices);
271             vertices = partition!((Tuple!(PgLine, size_t) a) {
272                                     return !canFind(old_ids, tuple(a[0].previous_program, a[1]));
273                                   })(programs_with_file_ids);
274             programs_with_file_ids = programs_with_file_ids[0 .. $ - vertices.length];
275 
276             // update PP tags in children
277             
278             foreach (ref pg_with_file_id; vertices) {
279                 auto pg = pg_with_file_id[0];
280                 auto file_id = pg_with_file_id[1];
281                
282                 if (pg.previous_program !is null && 
283                     pg.previous_program in program_id_map[file_id]) 
284                 {
285                     auto new_id = program_id_map[file_id][pg.previous_program];
286                     if (new_id != pg.previous_program) {
287                         pg.previous_program = new_id;
288                     }
289                 }
290                 
291                 pg_with_file_id = tuple(pg, file_id);
292             }
293         }
294 
295         merged_header.programs = dict;
296     }
297 
298     void mergeComments() {
299         merged_header.comments = join(map!"a.comments"(_headers));
300     }
301 }
302 
303 unittest {
304     import std.stdio;
305     import std.algorithm;
306 
307     // stderr.writeln("Testing SAM header merging...");
308     auto h1 = new SamHeader();
309     auto h2 = new SamHeader();
310     auto h3 = new SamHeader();
311     h1.sorting_order = SortingOrder.coordinate;
312     h2.sorting_order = SortingOrder.coordinate;
313     h3.sorting_order = SortingOrder.coordinate;
314  
315     // ---- fill reference sequence dictionaries -------------------------------
316 
317     h1.sequences.add(SqLine("A", 100));
318     h1.sequences.add(SqLine("B", 200));
319     h1.sequences.add(SqLine("C", 300));
320 
321     h2.sequences.add(SqLine("D", 100));
322     h2.sequences.add(SqLine("B", 200));
323     h2.sequences.add(SqLine("E", 300));
324 
325     h3.sequences.add(SqLine("A", 100));
326     h3.sequences.add(SqLine("E", 300));
327     h3.sequences.add(SqLine("C", 300));
328 
329     // expected:        A       B       C
330     //                      D       E
331 
332     // ---- add a few read group records ---------------------------------------
333 
334     h1.read_groups.add(RgLine("A", "CN1"));
335     h1.read_groups.add(RgLine("C", "CN3"));
336 
337     h2.read_groups.add(RgLine("B", "CN2"));
338     h2.read_groups.add(RgLine("C", "CN4"));
339 
340     h3.read_groups.add(RgLine("B", "CN2"));
341     h3.read_groups.add(RgLine("A", "CN4"));
342 
343     // ---- add some program records with a lot of name clashes ----------------
344 
345     h1.programs.add(PgLine("A", "X"));             //        .> C
346     h1.programs.add(PgLine("B", "Y", "", "A"));    //       /
347     h1.programs.add(PgLine("C", "Z", "", "B"));    // A -> B -> D
348     h1.programs.add(PgLine("D", "T", "", "B"));    //
349 
350     h2.programs.add(PgLine("B", "Z"));             // B -> A -> C
351     h2.programs.add(PgLine("A", "Y", "", "B"));
352     h2.programs.add(PgLine("C", "T", "", "A"));
353 
354     h3.programs.add(PgLine("D", "Y"));             // D -> C -> B
355     h3.programs.add(PgLine("C", "T", "", "D"));
356     h3.programs.add(PgLine("B", "X", "", "C"));
357 
358     // expected result:
359     //
360     //            .> C.1
361     //           /
362     //  A -> B.1  -> D.1
363     //
364     //  B -> A.1 -> C.2
365     //
366     //  D -> C -> B.2
367 
368     // ---- add some comments - just for the sake of completeness --------------
369 
370     h1.comments ~= "abc";
371     h2.comments ~= ["def", "ghi"];
372     
373     // ------------------ merge these three headers ----------------------------
374 
375     {
376     auto merger = new SamHeaderMerger([h1, h2, h3]);
377     auto h = merger.merged_header;
378 
379     assert(h.sorting_order == SortingOrder.coordinate);
380 
381     assert(equal(h.sequences.values, 
382                  [SqLine("A", 100), SqLine("D", 100), SqLine("B", 200),
383                   SqLine("E", 300), SqLine("C", 300)]));
384 
385     assert(h.comments == ["abc", "def", "ghi"]);
386 
387     assert(equal(sort(array(map!"a.identifier"(h.programs.values))),
388                  ["A", "A.1", "B", "B.1", "B.2", "C", "C.1", "C.2", "D", "D.1"]));
389 
390     assert(equal(sort(array(map!"a.identifier"(h.read_groups.values))),
391                  ["A", "A.1", "B", "C", "C.1"]));
392     }
393 
394     // sambamba issue 110
395     {
396     auto h0 = new SamHeader();
397     h0.sorting_order = SortingOrder.coordinate;
398     h0.sequences.add(SqLine("A", 100));
399 
400     auto merger = new SamHeaderMerger([h0]);
401     auto h = merger.merged_header;
402     assert(equal(h.sequences.values, h0.sequences.values));
403     }
404 }