1 /*
2     This file is part of BioD.
3 
4     Copyright (C) 2018 Pjotr Prins <pjotr.prins@thebird.nl>
5 */
6 
7 module bio.std.decompress;
8 
9 /**
10    Streaming line reader which can be used for gzipped files. Note the
11    current edition (still) uses the garbage collector. It may help to
12    switch it off or to use the BioD decompressor used by bgzf.
13 
14    For a comparison with gzip a 2GB file decompressed with
15 
16    real    0m53.701s
17    user    0m53.820s
18    sys     0m0.572s
19 
20    while gzip took
21 
22    real    0m11.528s
23    user    0m10.288s
24    sys     0m0.936s
25 
26    So, that is something to aim for.
27 
28    Conversion can happen between different encodings, provided the
29    line terminator is ubyte = '\n'. GzipbyLine logic is modeled on
30    ByLineImpl and readln function from std.stdio.
31 */
32 
33 import std.algorithm;
34 // import std.concurrency;
35 import std.conv;
36 import std.exception;
37 import std.file;
38 import std.parallelism;
39 import std.stdio: File;
40 import std.zlib: UnCompress;
41 
42 struct GzipbyLine(R) {
43 
44   File f;
45   UnCompress decompress;
46   R line;
47   uint _bufsize;
48 
49   this(string gzipfn, uint bufsize=0x4000) {
50     enforce(gzipfn.isFile);
51     f = File(gzipfn,"r");
52     decompress = new UnCompress();
53     _bufsize = bufsize;
54   }
55 
56   @disable this(this); // disable copy semantics;
57 
58   int opApply(scope int delegate(int line, R) dg) {
59 
60     int line = 0;
61     // chunk_byLine takes a buffer and splits on \n.
62     R chunk_byLine(R head, R rest) {
63       auto split = findSplitAfter(rest,"\n");
64       // If a new line is found split the in left and right.
65       auto left = split[0]; // includes eol splitter
66       auto right = split[1];
67       if (left.length > 0) { // we have a match!
68         dg(line++, head ~ left);
69         return chunk_byLine([], right);
70       }
71       // no match
72       return head ~ right;
73     }
74 
75     R tail; // tail of previous buffer
76     foreach (ubyte[] buffer; f.byChunk(_bufsize))
77     {
78       auto buf = cast(R)decompress.uncompress(buffer);
79       tail = chunk_byLine(tail,buf);
80     }
81     if (tail.length > 0) dg(line++, tail);
82     return 0;
83   }
84 }
85 
86 
87 unittest {
88 
89   import std.algorithm.comparison : equal;
90 
91   // writeln("Testing GzipbyLine");
92   int[] a = [ 1, 2, 4, 7, 7, 2, 4, 7, 3, 5];
93   auto b = findSplitAfter(a, [7]);
94   assert(equal(b[0],[1, 2, 4, 7]));
95   assert(equal(b[1],[7, 2, 4, 7, 3, 5]));
96   auto b1 = findSplitAfter(b[1], [7]);
97   assert(equal(b1[0],[7]));
98   assert(equal(b1[1],[2, 4, 7, 3, 5]));
99   auto b2 = findSplitAfter([2, 4, 3], [7]);
100   assert(equal(b2[0],cast(ubyte[])[]));
101   assert(equal(b2[1],[2,4,3]));
102 
103   uint chars = 0;
104   int lines = 0;
105   foreach(line, ubyte[] s; GzipbyLine!(ubyte[])("test/data/BXD_geno.txt.gz")) {
106     // test file contains 7320 lines 4707218 characters
107     // write(cast(string)s);
108     chars += s.length;
109     lines = line;
110   }
111   // These fail on recent versions of ldc
112   // assert(lines == 7319,"genotype lines " ~ to!string(lines+1)); // fails with ldc2 < 1.10!
113   // assert(chars == 4707218,"chars " ~ to!string(chars));
114 }
115 
116 /**
117    Mmfile threaded version of streaming line reader which can be used
118    for gzipped files. Note the current edition is slower than
119    GzipbyLine above and (still) uses the garbage collector. It may
120    help to switch it off or to use the BioD decompressor used by bgzf.
121 
122    Conversion can happen between different encodings, provided the
123    line terminator is ubyte = '\n'. GzipbyLine logic is modeled on
124    ByLineImpl and readln function from std.stdio.
125 */
126 
127 import std.mmfile;
128 import core.thread;
129 
130 struct GzipbyLineThreaded(R) {
131 
132   string fn;
133   UnCompress decompress;
134   R line;
135   // Nullable!ubyte[] uncompressed_buf;
136   uint _bufsize;
137 
138   this(string gzipfn, uint bufsize=0x4000) {
139     enforce(gzipfn.isFile);
140     fn = gzipfn;
141     decompress = new UnCompress();
142     _bufsize = bufsize;
143   }
144 
145   @disable this(this); // disable copy semantics;
146 
147   int opApply(scope int delegate(int line, R) dg) {
148 
149     int line = 0;
150     // chunk_byLine takes a buffer and splits on \n.
151     R chunk_byLine(R head, R rest) {
152       auto split = findSplitAfter(rest,"\n");
153       // If a new line is found split the in left and right.
154       auto left = split[0]; // includes eol splitter
155       auto right = split[1];
156       if (left.length > 0) { // we have a match!
157         dg(line++, head ~ left);
158         return chunk_byLine([], right);
159       }
160       // no match
161       return head ~ right;
162     }
163 
164     R decompressor(ubyte[] buffer) {
165       return cast(R)decompress.uncompress(buffer);
166     }
167 
168     auto mmf = new MmFile(fn);
169     immutable mmf_length = mmf.length();
170     long rest = mmf_length;
171     R tail; // tail of previous buffer
172 
173     // Decompress the first chunk
174     auto buffer1 = cast(ubyte[])mmf[0.._bufsize];
175     rest -= buffer1.length;
176     auto buf = decompressor(buffer1);
177 
178     uint chunknum = 1;
179     while(rest>0) {
180       // Get the next chunk
181       ulong pos2 = (chunknum+1)*_bufsize;
182       if (pos2 > mmf_length) pos2 = cast(ulong)mmf_length;
183       auto buffer2 = cast(ubyte[])mmf[chunknum*_bufsize..mmf_length];
184       rest -= buffer2.length;
185       // Set up decompressing the next chunk
186       auto t = task(&decompressor, buffer2);
187       // auto t = task!decompressor(buffer2);
188       t.executeInNewThread();
189       // now invoke the delegate
190       tail = chunk_byLine(tail,buf);
191       buf = t.yieldForce();
192       chunknum += 1;
193     }
194     tail = chunk_byLine(tail,buf);
195     if (tail.length > 0) dg(line++, tail);
196     return 0;
197   }
198 }
199 
200 unittest {
201   int lines = 0;
202   uint chars = 0;
203   foreach(line, ubyte[] s; GzipbyLineThreaded!(ubyte[])("test/data/BXD_geno.txt.gz")) {
204     // test file contains 7320 lines 4707218 characters
205     // write(cast(string)s);
206     chars += s.length;
207     lines = line;
208   }
209   /*
210   These fail on recent versions of ldc
211   assert(lines == 7319,"genotype lines " ~ to!string(lines+1));
212   assert(chars == 4707218,"chars " ~ to!string(chars));
213   */
214 }