1 /* 2 This file is part of BioD. 3 Copyright (C) 2012-2014 Artem Tarasov <lomereiter@gmail.com> 4 5 Permission is hereby granted, free of charge, to any person obtaining a 6 copy of this software and associated documentation files (the "Software"), 7 to deal in the Software without restriction, including without limitation 8 the rights to use, copy, modify, merge, publish, distribute, sublicense, 9 and/or sell copies of the Software, and to permit persons to whom the 10 Software is furnished to do so, subject to the following conditions: 11 12 The above copyright notice and this permission notice shall be included in 13 all copies or substantial portions of the Software. 14 15 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 16 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 17 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 18 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 19 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 20 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER 21 DEALINGS IN THE SOFTWARE. 22 23 */ 24 module bio.std.hts.utils.samheadermerger; 25 26 import bio.std.hts.sam.header; 27 import bio.std.hts.bam.validation.samheader; 28 29 import std.array; 30 import std.range; 31 import std.algorithm; 32 import std.conv; 33 import std.typecons; 34 import std.exception; 35 36 import bio.std.hts.utils.graph; 37 38 /// Class encapsulating functionality of merging several SAM headers 39 /// into one. (In fact, its role is to just group several variables, 40 /// so it could be replaced by a function returning a struct.) 41 class SamHeaderMerger { 42 43 enum Strategy { 44 simple, 45 usingIndex 46 } 47 48 Strategy strategy; 49 50 /// Takes array of SAM headers as an input. 51 this(SamHeader[] headers, bool validate_headers=false) { 52 _headers = headers; 53 _len = _headers.length; 54 55 merged_header = new SamHeader(); 56 ref_id_map = new size_t[size_t][_len]; 57 ref_id_reverse_map = new size_t[size_t][_len]; 58 program_id_map = new string[string][_len]; 59 readgroup_id_map = new string[string][_len]; 60 61 if (validate_headers) { 62 // TODO: make custom validator for producing better error message 63 foreach (size_t i, header; _headers) { 64 if (!isValid(header)) { 65 throw new Exception("header #" ~ to!string(i) ~ " is invalid, can't merge"); 66 } 67 } 68 } 69 70 auto expected = _headers[0].sorting_order; 71 if (expected != SortingOrder.coordinate && expected != SortingOrder.queryname) { 72 throw new Exception("file headers indicate that some files are not sorted"); 73 } 74 foreach (header; _headers) { 75 if (header.sorting_order != expected) { 76 throw new Exception("sorting orders of files don't agree, can't merge"); 77 } 78 } 79 merged_header.sorting_order = expected; 80 81 mergeSequenceDictionaries(); 82 mergeReadGroups(); 83 mergeProgramRecords(); 84 mergeComments(); 85 } 86 87 /// The main result of merging -- new SamHeader 88 SamHeader merged_header; 89 90 /// Map: index of SamHeader in input array of headers -> old refID -> new refID 91 size_t[size_t][] ref_id_map; 92 93 /// the same for read group identifiers 94 string[string][] readgroup_id_map; 95 96 /// the same for program record identifiers 97 string[string][] program_id_map; 98 99 /// Map: index of SamHeader in input array of headers -> new refID -> old refID 100 size_t[size_t][] ref_id_reverse_map; 101 102 private: 103 104 // NOTE FOR DEVELOPER: 105 // for more info on what's going on here, read comments in sambamba/sambamba/merge.d 106 107 SamHeader[] _headers; 108 size_t _len; // number of headers 109 110 static void addVerticeToDict(ref SqLine[string] dict, ref SqLine line) { 111 if (line.name in dict) { 112 if (line.length != dict[line.name].length) { 113 // those two @SQ lines are highly unlikely to refer to the same 114 // reference sequence if lengths are different 115 throw new Exception("can't merge SAM headers: one of references with " ~ 116 "name " ~ line.name ~ " has length " ~ 117 to!string(dict[line.name].length) ~ 118 " while another one with the same name has length " ~ 119 to!string(line.length)); 120 } 121 // TODO: merge individual tags? 122 } else { 123 dict[line.name] = line; 124 } 125 } 126 127 void mergeSequenceDictionaries() { 128 // make a directed graph out of reference sequences and do a topological sort 129 130 SqLine[string] dict; 131 132 // create a graph 133 auto g = new DirectedGraph(); 134 foreach (header; _headers) { 135 auto sequences = header.sequences.values; 136 auto prev = sequences.front; 137 addVerticeToDict(dict, prev); 138 g.addNode(prev.name); 139 sequences.popFront(); 140 while (!sequences.empty) { 141 auto cur = sequences.front; 142 addVerticeToDict(dict, cur); 143 g.addEdge(prev.name, cur.name); 144 prev = cur; 145 sequences.popFront(); 146 } 147 } 148 149 // get topologically sorted nodes 150 try { 151 foreach (v; g.topologicalSort()) { 152 merged_header.sequences.add(dict[v]); 153 } 154 strategy = Strategy.simple; 155 } catch (Exception e) { 156 // failed, try another strategy which requires index files 157 foreach (sq_line; sort!((a, b) => a.name < b.name)(dict.values)) { 158 merged_header.sequences.add(sq_line); 159 } 160 strategy = Strategy.usingIndex; 161 } 162 163 // make mappings 164 foreach (size_t i, header; _headers) { 165 foreach (size_t j, SqLine sq; header.sequences) { 166 auto new_index = merged_header.sequences.getSequenceIndex(sq.name); 167 if (new_index < 0) { 168 import std.stdio; 169 stderr.writeln("merged header sequence dictionary: \n", 170 merged_header.sequences.values); 171 throw new Exception("BUG: " ~ sq.name ~ " is not in merged header dictionary"); 172 } 173 ref_id_map[i][j] = to!size_t(new_index); 174 ref_id_reverse_map[i][to!size_t(new_index)] = j; 175 } 176 } 177 } 178 179 // The reason to pass by reference is that when merging program records, 180 // this function is called in a loop, and we need to keep some structures between calls. 181 // 182 // $(D dict) is a collection of Line structs, which will finally be part of the header; 183 // $(D record_id_map) is an array of mappings (for each header) where old record identifier 184 // is mapped into a new one; 185 static void mergeHeaderLines(Line, R)(R records_with_file_ids, size_t file_count, 186 ref HeaderLineDictionary!Line dict, 187 ref string[string][] record_id_map) 188 if (is(typeof(Line.identifier) == string) && 189 is(ElementType!R == Tuple!(Line, size_t)) && 190 (is(Line == RgLine) || is(Line == PgLine))) 191 { 192 // Map: record identifier -> record -> list of files 193 size_t[][Line][string] id_to_record; 194 195 foreach (record_and_file; records_with_file_ids) { 196 auto rec = record_and_file[0]; 197 auto file_id = record_and_file[1]; 198 id_to_record[rec.identifier][rec] ~= file_id; 199 } 200 201 // Loop through all identifiers 202 foreach (record_id, records_with_same_id; id_to_record) { 203 204 // Several read groups/program records can share the 205 // common identifier, and each one of them can be 206 // presented in several files. 207 // 208 // If read groups/program records are equal 209 // (i.e. all fields are equal) then they are treated 210 // as a single read group/program record 211 // 212 // Here we iterate over those read groups/program records 213 // and files where they were seen, renaming identifiers 214 // in order to avoid collisions where necessary. 215 foreach (rec, file_ids; records_with_same_id) { 216 string new_id = record_id; 217 if (record_id in dict) { 218 // if already used ID is encountered, 219 // find unused ID by adding ".N" to the old ID 220 for (int i = 1; ; ++i) { 221 new_id = record_id ~ "." ~ to!string(i); 222 if (new_id !in dict) { 223 break; 224 } 225 } 226 } 227 228 // save mapping 229 foreach (file_id; file_ids) { 230 record_id_map[file_id][record_id] = new_id; 231 } 232 233 // update merged header 234 rec.identifier = new_id; 235 dict.add(rec); 236 } 237 } 238 } 239 240 void mergeReadGroups() { 241 Tuple!(RgLine, size_t)[] readgroups_with_file_ids; 242 for (size_t i = 0; i < _len; i++) 243 foreach (rg; _headers[i].read_groups.values) 244 readgroups_with_file_ids ~= tuple(rg, i); 245 246 auto dict = new RgLineDictionary(); 247 248 mergeHeaderLines(readgroups_with_file_ids, _len, 249 dict, readgroup_id_map); 250 251 merged_header.read_groups = dict; 252 } 253 254 void mergeProgramRecords() { 255 Tuple!(PgLine, size_t)[] programs_with_file_ids; 256 for (size_t i = 0; i < _len; i++) 257 foreach (pg; _headers[i].programs.values) 258 programs_with_file_ids ~= tuple(pg, i); 259 260 auto vertices = partition!"a[0].previous_program !is null"(programs_with_file_ids); 261 programs_with_file_ids = programs_with_file_ids[0 .. $ - vertices.length]; 262 263 auto dict = new PgLineDictionary(); 264 265 while (!vertices.empty) { 266 // populates dict and program_id_map 267 mergeHeaderLines!PgLine(vertices, _len, dict, program_id_map); 268 269 // find children of current vertices 270 auto old_ids = map!"tuple(a[0].identifier, a[1])"(vertices); 271 vertices = partition!((Tuple!(PgLine, size_t) a) { 272 return !canFind(old_ids, tuple(a[0].previous_program, a[1])); 273 })(programs_with_file_ids); 274 programs_with_file_ids = programs_with_file_ids[0 .. $ - vertices.length]; 275 276 // update PP tags in children 277 278 foreach (ref pg_with_file_id; vertices) { 279 auto pg = pg_with_file_id[0]; 280 auto file_id = pg_with_file_id[1]; 281 282 if (pg.previous_program !is null && 283 pg.previous_program in program_id_map[file_id]) 284 { 285 auto new_id = program_id_map[file_id][pg.previous_program]; 286 if (new_id != pg.previous_program) { 287 pg.previous_program = new_id; 288 } 289 } 290 291 pg_with_file_id = tuple(pg, file_id); 292 } 293 } 294 295 merged_header.programs = dict; 296 } 297 298 void mergeComments() { 299 merged_header.comments = join(map!"a.comments"(_headers)); 300 } 301 } 302 303 unittest { 304 import std.stdio; 305 import std.algorithm; 306 307 // stderr.writeln("Testing SAM header merging..."); 308 auto h1 = new SamHeader(); 309 auto h2 = new SamHeader(); 310 auto h3 = new SamHeader(); 311 h1.sorting_order = SortingOrder.coordinate; 312 h2.sorting_order = SortingOrder.coordinate; 313 h3.sorting_order = SortingOrder.coordinate; 314 315 // ---- fill reference sequence dictionaries ------------------------------- 316 317 h1.sequences.add(SqLine("A", 100)); 318 h1.sequences.add(SqLine("B", 200)); 319 h1.sequences.add(SqLine("C", 300)); 320 321 h2.sequences.add(SqLine("D", 100)); 322 h2.sequences.add(SqLine("B", 200)); 323 h2.sequences.add(SqLine("E", 300)); 324 325 h3.sequences.add(SqLine("A", 100)); 326 h3.sequences.add(SqLine("E", 300)); 327 h3.sequences.add(SqLine("C", 300)); 328 329 // expected: A B C 330 // D E 331 332 // ---- add a few read group records --------------------------------------- 333 334 h1.read_groups.add(RgLine("A", "CN1")); 335 h1.read_groups.add(RgLine("C", "CN3")); 336 337 h2.read_groups.add(RgLine("B", "CN2")); 338 h2.read_groups.add(RgLine("C", "CN4")); 339 340 h3.read_groups.add(RgLine("B", "CN2")); 341 h3.read_groups.add(RgLine("A", "CN4")); 342 343 // ---- add some program records with a lot of name clashes ---------------- 344 345 h1.programs.add(PgLine("A", "X")); // .> C 346 h1.programs.add(PgLine("B", "Y", "", "A")); // / 347 h1.programs.add(PgLine("C", "Z", "", "B")); // A -> B -> D 348 h1.programs.add(PgLine("D", "T", "", "B")); // 349 350 h2.programs.add(PgLine("B", "Z")); // B -> A -> C 351 h2.programs.add(PgLine("A", "Y", "", "B")); 352 h2.programs.add(PgLine("C", "T", "", "A")); 353 354 h3.programs.add(PgLine("D", "Y")); // D -> C -> B 355 h3.programs.add(PgLine("C", "T", "", "D")); 356 h3.programs.add(PgLine("B", "X", "", "C")); 357 358 // expected result: 359 // 360 // .> C.1 361 // / 362 // A -> B.1 -> D.1 363 // 364 // B -> A.1 -> C.2 365 // 366 // D -> C -> B.2 367 368 // ---- add some comments - just for the sake of completeness -------------- 369 370 h1.comments ~= "abc"; 371 h2.comments ~= ["def", "ghi"]; 372 373 // ------------------ merge these three headers ---------------------------- 374 375 { 376 auto merger = new SamHeaderMerger([h1, h2, h3]); 377 auto h = merger.merged_header; 378 379 assert(h.sorting_order == SortingOrder.coordinate); 380 381 assert(equal(h.sequences.values, 382 [SqLine("A", 100), SqLine("D", 100), SqLine("B", 200), 383 SqLine("E", 300), SqLine("C", 300)])); 384 385 assert(h.comments == ["abc", "def", "ghi"]); 386 387 assert(equal(sort(array(map!"a.identifier"(h.programs.values))), 388 ["A", "A.1", "B", "B.1", "B.2", "C", "C.1", "C.2", "D", "D.1"])); 389 390 assert(equal(sort(array(map!"a.identifier"(h.read_groups.values))), 391 ["A", "A.1", "B", "C", "C.1"])); 392 } 393 394 // sambamba issue 110 395 { 396 auto h0 = new SamHeader(); 397 h0.sorting_order = SortingOrder.coordinate; 398 h0.sequences.add(SqLine("A", 100)); 399 400 auto merger = new SamHeaderMerger([h0]); 401 auto h = merger.merged_header; 402 assert(equal(h.sequences.values, h0.sequences.values)); 403 } 404 }