1 /*
2 This file is part of Sambamba.
3 Copyright (C) 2017 Pjotr Prins <pjotr.prins@thebird.nl>
4
5 Sambamba is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2 of the License,
8 or (at your option) any later version.
9
10 Sambamba is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
18 02111-1307 USA
19
20 */
21 module bio.std.experimental.hts.pileup;
22
23 import std.conv;
24 import std.exception;
25 import std.stdio;
26 import std.traits;
27 import std.typecons;
28
29 import std.experimental.logger;
30
31 import bio.std.experimental.hts.constants;
32 import bio.core.utils.exception;
33
34 immutable ulong DEFAULT_BUFFER_SIZE = 1_000_000;
35
36 /**
37 Cyclic buffer or ringbuffer based on Artem's original. Uses copy
38 semantics to copy a read in a pre-allocated buffer. New items get
39 added to the tail, and used items get popped from the head
40 (FIFO). Basically it is empty when pointers align and full when
41 head - tail equals length. Not that the pointers are of size_t
42 which puts a theoretical limit on the number of items that can be
43 pushed.
44 */
45
46 import core.stdc..string : memcpy;
47
48 // alias ulong RingBufferIndex;
49
50 struct RingBufferIndex {
51 alias Representation = ulong;
52 private ulong value = 0;
53
54 this(ulong v) {
55 value = v;
56 }
57
58 // @disable this(this); // disable copy semantics;
59
60 auto get() inout {
61 return value;
62 }
63
64 auto max() @property {
65 return value.max;
66 }
67
68 void opAssign(U)(U rhs) if (is(typeof(Checked!(T, Hook)(rhs)))) {
69 value = rhs;
70 }
71
72 bool opEquals(U, this _)(U rhs) {
73 return value == rhs;
74 }
75
76 auto opCmp(U, this _)(const U rhs) {
77 return value < rhs ? -1 : value > rhs;
78 }
79
80 ulong opUnary(string s)() if (s == "++") {
81 return ++value;
82 }
83
84 }
85
86
87 struct RingBuffer(T) {
88
89 T[] _items;
90 RingBufferIndex _head;
91 RingBufferIndex _tail;
92 size_t max_size = 0;
93
94 /** initializes round buffer of size $(D n) */
95 this(size_t n) {
96 _items = new T[n];
97 // _items.reserve(n);
98 }
99
100 @disable this(this); // disable copy semantics;
101
102 /*
103 Does not work because data is no longer available!
104 ~this() {
105 // assert(is_empty); // make sure all items have been popped
106 }
107 */
108
109 bool empty() @property @nogc nothrow const {
110 return _tail == _head;
111 }
112
113 alias empty is_empty;
114
115 auto ref front() @property {
116 enforce(!is_empty, "ringbuffer is empty");
117 return _items[_head.get() % $];
118 }
119 alias back last;
120
121 auto ref back() @property {
122 enforce(!is_empty, "ringbuffer is empty");
123 return _items[(_tail.get() - 1) % $];
124 }
125
126 alias front first;
127
128 bool is_tail(RingBufferIndex idx) {
129 return idx == _tail.get()-1;
130 }
131
132 ref T get_at(RingBufferIndex idx) {
133 enforce(!is_empty, "ringbuffer is empty");
134 enforce(idx >= _head, "ringbuffer range error (idx before front)");
135 enforce(idx != _tail, "ringbuffer range error (idx at end)");
136 enforce(idx < _tail, "ringbuffer range error (idx after end)");
137 return _items[idx.get() % $];
138 }
139
140 bool is_valid(RingBufferIndex idx) {
141 enforce(!is_empty, "ringbuffer is empty");
142 enforce(idx >= _head, "ringbuffer range error (idx before front)");
143 enforce(idx != _tail, "ringbuffer range error (idx at end)");
144 enforce(idx < _tail, "ringbuffer range error (idx after end)");
145 return true;
146 }
147
148 // This function is a hack.
149 void update_at(RingBufferIndex idx, T item) {
150 is_valid(idx);
151 _items[idx.get() % $] = item; // uses copy semantics
152 }
153
154 RingBufferIndex popFront() {
155 enforce(!is_empty, "ringbuffer is empty");
156 static if (__traits(compiles, _items[0].cleanupx)) {
157 // write("x");
158 _items[_head.get() % $].cleanup();
159 }
160 ++_head.value;
161 return _head;
162 }
163
164 /// Puts item on the stack and returns the index
165 RingBufferIndex put(T item) {
166 enforce(!is_full, "ringbuffer is full - you need to expand buffer");
167 enforce(_tail < _tail.max, "ringbuffer overflow");
168 max_size = length > max_size ? length : max_size;
169 _items[_tail.get() % $] = item; // uses copy semantics
170 auto prev = _tail;
171 ++_tail.value;
172 return prev;
173 }
174
175 ulong length() @property const {
176 // writeln(_tail.get(),":",_head.get(),"= len ",_tail.get()-_head.get());
177 return _tail.get() - _head.get();
178 }
179
180 bool is_full() @property const {
181 return _items.length == length();
182 }
183
184 bool in_range(RingBufferIndex idx) @property const {
185 return idx >= _head && idx < _tail;
186 }
187
188 ulong pushed() @property const {
189 return _tail.value;
190 }
191 ulong popped() @property const {
192 return _head.value;
193 }
194
195 @property void cleanup() {
196 _head = RingBufferIndex();
197 _tail = RingBufferIndex();
198 }
199
200 string toString() {
201 string res = "ring ";
202 for(RingBufferIndex i = _head; i<_tail; i++)
203 res ~= to!string(get_at(i));
204 return res;
205 }
206
207 @property string stats() {
208 return "Ringbuffer pushed " ~ to!string(pushed) ~ " popped " ~ to!string(popped) ~ " max-size " ~
209 to!string(max_size) // , "/", (pileup.ring.max_size+1)/pileup.ring.length);
210 ;
211 }
212 }
213
214 unittest {
215 auto buf = RingBuffer!int(4);
216 assert(buf.is_empty);
217
218 buf.put(1);
219 buf.put(2);
220 assert(buf.length == 2);
221 assert(buf.front == 1);
222 buf.popFront(); // 1
223 buf.popFront(); // 2
224 buf.put(2);
225 buf.put(1);
226 buf.put(0);
227 buf.put(3);
228 assert(buf.is_full);
229 assert(buf.front == 2);
230 buf.popFront();
231 assert(buf.front == 1);
232 buf.put(4);
233 buf.popFront();
234 assert(buf.front == 0);
235 buf.popFront();
236 assert(buf.front == 3);
237 buf.popFront();
238 assert(buf.front == 4);
239 buf.popFront();
240 assert(buf.is_empty);
241 }
242
243 /**
244 Represent a pileup of reads in a buffer.
245 */
246
247 /*
248 Read RingBuffer with current pointer, so you have three states
249 (first, current, last).
250 */
251
252 class PileUp(R) {
253 RingBuffer!R ring;
254 Nullable!RingBufferIndex current;
255
256 this(ulong bufsize=DEFAULT_BUFFER_SIZE) {
257 ring = RingBuffer!R(bufsize);
258 set_current_to_head;
259 }
260
261 RingBufferIndex push(R r) { return ring.put(r); }
262 bool empty() @property const { return ring.is_empty();}
263 bool is_full() @property const { return ring.is_full();}
264 RingBufferIndex popFront() { return ring.popFront(); }
265 ref R front() { return ring.front(); }
266 alias front leftmost;
267 ref R rightmost() { return ring.back(); }
268 ref R read(RingBufferIndex idx) {
269 enforce(ring.in_range(idx), "idx should be set for PileUp.read");
270 return ring.get_at(idx);
271 }
272 ref R read_current() {
273 enforce(!current.isNull, "current should be set for PileUp.read_current");
274 return read(current);
275 }
276 bool is_at_end(RingBufferIndex idx) { return ring.is_tail(idx); }
277
278 @property void current_inc() {
279 asserte(!empty);
280 asserte(!ring.is_tail(current));
281 ++current;
282 }
283
284 @property void set_current_to_head() {
285 current = ring._head; // note pileup can be empty
286 }
287
288 void current_reset() {
289 current = RingBufferIndex();
290 }
291
292 @property bool current_is_tail() {
293 return ring.is_tail(current);
294 }
295
296 void each(void delegate(R) dg) {
297 auto idx = ring._head;
298 while(!ring.is_tail(idx)) {
299 auto r = read(idx);
300 dg(r);
301 idx++;
302 }
303 }
304
305 void each_left_of_current(void delegate(RingBufferIndex, R) dg) {
306 R cur = read_current;
307 if (cur.is_unmapped) return;
308 auto idx = ring._head;
309 while(!ring.is_tail(idx)) {
310 auto r = read(idx);
311 if (r.is_mapped && r.end_pos >= cur.start_pos)
312 return;
313 dg(idx,r);
314 idx++;
315 }
316 }
317
318 void purge_while(bool delegate(R) dg) {
319 while(!empty) {
320 if (!dg(front))
321 return; // skip the rest
322 popFront();
323 }
324 }
325
326 void purge(void delegate(R) dg) {
327 while(!empty) {
328 dg(front);
329 popFront();
330 }
331 set_current_to_head();
332 }
333
334 @property string stats() {
335 return ring.stats();
336 }
337
338 override string toString() {
339 return stats;
340 }
341 }