1 /* 2 This file is part of BioD. 3 4 Copyright (C) 2018 Pjotr Prins <pjotr.prins@thebird.nl> 5 */ 6 7 module bio.std.decompress; 8 9 /** 10 Streaming line reader which can be used for gzipped files. Note the 11 current edition (still) uses the garbage collector. It may help to 12 switch it off or to use the BioD decompressor used by bgzf. 13 14 For a comparison with gzip a 2GB file decompressed with 15 16 real 0m53.701s 17 user 0m53.820s 18 sys 0m0.572s 19 20 while gzip took 21 22 real 0m11.528s 23 user 0m10.288s 24 sys 0m0.936s 25 26 So, that is something to aim for. 27 28 Conversion can happen between different encodings, provided the 29 line terminator is ubyte = '\n'. GzipbyLine logic is modeled on 30 ByLineImpl and readln function from std.stdio. 31 */ 32 33 import std.algorithm; 34 // import std.concurrency; 35 import std.conv; 36 import std.exception; 37 import std.file; 38 import std.parallelism; 39 import std.stdio: File; 40 import std.zlib: UnCompress; 41 42 struct GzipbyLine(R) { 43 44 File f; 45 UnCompress decompress; 46 R line; 47 uint _bufsize; 48 49 this(string gzipfn, uint bufsize=0x4000) { 50 enforce(gzipfn.isFile); 51 f = File(gzipfn,"r"); 52 decompress = new UnCompress(); 53 _bufsize = bufsize; 54 } 55 56 @disable this(this); // disable copy semantics; 57 58 int opApply(scope int delegate(int line, R) dg) { 59 60 int line = 0; 61 // chunk_byLine takes a buffer and splits on \n. 62 R chunk_byLine(R head, R rest) { 63 auto split = findSplitAfter(rest,"\n"); 64 // If a new line is found split the in left and right. 65 auto left = split[0]; // includes eol splitter 66 auto right = split[1]; 67 if (left.length > 0) { // we have a match! 68 dg(line++, head ~ left); 69 return chunk_byLine([], right); 70 } 71 // no match 72 return head ~ right; 73 } 74 75 R tail; // tail of previous buffer 76 foreach (ubyte[] buffer; f.byChunk(_bufsize)) 77 { 78 auto buf = cast(R)decompress.uncompress(buffer); 79 tail = chunk_byLine(tail,buf); 80 } 81 if (tail.length > 0) dg(line++, tail); 82 return 0; 83 } 84 } 85 86 87 unittest { 88 89 import std.algorithm.comparison : equal; 90 91 // writeln("Testing GzipbyLine"); 92 int[] a = [ 1, 2, 4, 7, 7, 2, 4, 7, 3, 5]; 93 auto b = findSplitAfter(a, [7]); 94 assert(equal(b[0],[1, 2, 4, 7])); 95 assert(equal(b[1],[7, 2, 4, 7, 3, 5])); 96 auto b1 = findSplitAfter(b[1], [7]); 97 assert(equal(b1[0],[7])); 98 assert(equal(b1[1],[2, 4, 7, 3, 5])); 99 auto b2 = findSplitAfter([2, 4, 3], [7]); 100 assert(equal(b2[0],cast(ubyte[])[])); 101 assert(equal(b2[1],[2,4,3])); 102 103 uint chars = 0; 104 int lines = 0; 105 foreach(line, ubyte[] s; GzipbyLine!(ubyte[])("test/data/BXD_geno.txt.gz")) { 106 // test file contains 7320 lines 4707218 characters 107 // write(cast(string)s); 108 chars += s.length; 109 lines = line; 110 } 111 // These fail on recent versions of ldc 112 // assert(lines == 7319,"genotype lines " ~ to!string(lines+1)); // fails with ldc2 < 1.10! 113 // assert(chars == 4707218,"chars " ~ to!string(chars)); 114 } 115 116 /** 117 Mmfile threaded version of streaming line reader which can be used 118 for gzipped files. Note the current edition is slower than 119 GzipbyLine above and (still) uses the garbage collector. It may 120 help to switch it off or to use the BioD decompressor used by bgzf. 121 122 Conversion can happen between different encodings, provided the 123 line terminator is ubyte = '\n'. GzipbyLine logic is modeled on 124 ByLineImpl and readln function from std.stdio. 125 */ 126 127 import std.mmfile; 128 import core.thread; 129 130 struct GzipbyLineThreaded(R) { 131 132 string fn; 133 UnCompress decompress; 134 R line; 135 // Nullable!ubyte[] uncompressed_buf; 136 uint _bufsize; 137 138 this(string gzipfn, uint bufsize=0x4000) { 139 enforce(gzipfn.isFile); 140 fn = gzipfn; 141 decompress = new UnCompress(); 142 _bufsize = bufsize; 143 } 144 145 @disable this(this); // disable copy semantics; 146 147 int opApply(scope int delegate(int line, R) dg) { 148 149 int line = 0; 150 // chunk_byLine takes a buffer and splits on \n. 151 R chunk_byLine(R head, R rest) { 152 auto split = findSplitAfter(rest,"\n"); 153 // If a new line is found split the in left and right. 154 auto left = split[0]; // includes eol splitter 155 auto right = split[1]; 156 if (left.length > 0) { // we have a match! 157 dg(line++, head ~ left); 158 return chunk_byLine([], right); 159 } 160 // no match 161 return head ~ right; 162 } 163 164 R decompressor(ubyte[] buffer) { 165 return cast(R)decompress.uncompress(buffer); 166 } 167 168 auto mmf = new MmFile(fn); 169 immutable mmf_length = mmf.length(); 170 long rest = mmf_length; 171 R tail; // tail of previous buffer 172 173 // Decompress the first chunk 174 auto buffer1 = cast(ubyte[])mmf[0.._bufsize]; 175 rest -= buffer1.length; 176 auto buf = decompressor(buffer1); 177 178 uint chunknum = 1; 179 while(rest>0) { 180 // Get the next chunk 181 ulong pos2 = (chunknum+1)*_bufsize; 182 if (pos2 > mmf_length) pos2 = cast(ulong)mmf_length; 183 auto buffer2 = cast(ubyte[])mmf[chunknum*_bufsize..mmf_length]; 184 rest -= buffer2.length; 185 // Set up decompressing the next chunk 186 auto t = task(&decompressor, buffer2); 187 // auto t = task!decompressor(buffer2); 188 t.executeInNewThread(); 189 // now invoke the delegate 190 tail = chunk_byLine(tail,buf); 191 buf = t.yieldForce(); 192 chunknum += 1; 193 } 194 tail = chunk_byLine(tail,buf); 195 if (tail.length > 0) dg(line++, tail); 196 return 0; 197 } 198 } 199 200 unittest { 201 int lines = 0; 202 uint chars = 0; 203 foreach(line, ubyte[] s; GzipbyLineThreaded!(ubyte[])("test/data/BXD_geno.txt.gz")) { 204 // test file contains 7320 lines 4707218 characters 205 // write(cast(string)s); 206 chars += s.length; 207 lines = line; 208 } 209 /* 210 These fail on recent versions of ldc 211 assert(lines == 7319,"genotype lines " ~ to!string(lines+1)); 212 assert(chars == 4707218,"chars " ~ to!string(chars)); 213 */ 214 }