1 /*
2     This file is part of Sambamba.
3     Copyright (C) 2017 Pjotr Prins <pjotr.prins@thebird.nl>
4 
5     Sambamba is free software; you can redistribute it and/or modify
6     it under the terms of the GNU General Public License as published
7     by the Free Software Foundation; either version 2 of the License,
8     or (at your option) any later version.
9 
10     Sambamba is distributed in the hope that it will be useful, but
11     WITHOUT ANY WARRANTY; without even the implied warranty of
12     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13     General Public License for more details.
14 
15     You should have received a copy of the GNU General Public License
16     along with this program; if not, write to the Free Software
17     Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
18     02111-1307 USA
19 
20 */
21 module bio.std.experimental.hts.pileup;
22 
23 import std.conv;
24 import std.exception;
25 import std.stdio;
26 import std.traits;
27 import std.typecons;
28 
29 import std.experimental.logger;
30 
31 import bio.std.experimental.hts.constants;
32 import bio.core.utils.exception;
33 
34 immutable ulong DEFAULT_BUFFER_SIZE = 1_000_000;
35 
36 /**
37    Cyclic buffer or ringbuffer based on Artem's original. Uses copy
38    semantics to copy a read in a pre-allocated buffer. New items get
39    added to the tail, and used items get popped from the head
40    (FIFO). Basically it is empty when pointers align and full when
41    head - tail equals length. Not that the pointers are of size_t
42    which puts a theoretical limit on the number of items that can be
43    pushed.
44 */
45 
46 import core.stdc..string : memcpy;
47 
48 // alias ulong RingBufferIndex;
49 
50 struct RingBufferIndex {
51   alias Representation = ulong;
52   private ulong value = 0;
53 
54   this(ulong v) {
55     value = v;
56   }
57 
58   // @disable this(this); // disable copy semantics;
59 
60   auto get() inout {
61     return value;
62   }
63 
64   auto max() @property {
65     return value.max;
66   }
67 
68   void opAssign(U)(U rhs) if (is(typeof(Checked!(T, Hook)(rhs)))) {
69     value = rhs;
70   }
71 
72   bool opEquals(U, this _)(U rhs) {
73     return value == rhs;
74   }
75 
76   auto opCmp(U, this _)(const U rhs) {
77     return value < rhs ? -1 : value > rhs;
78   }
79 
80   ulong opUnary(string s)() if (s == "++") {
81     return ++value;
82   }
83 
84 }
85 
86 
87 struct RingBuffer(T) {
88 
89   T[] _items;
90   RingBufferIndex _head;
91   RingBufferIndex _tail;
92   size_t max_size = 0;
93 
94   /** initializes round buffer of size $(D n) */
95   this(size_t n) {
96     _items = new T[n];
97     // _items.reserve(n);
98   }
99 
100   @disable this(this); // disable copy semantics;
101 
102   /*
103   Does not work because data is no longer available!
104   ~this() {
105     // assert(is_empty); // make sure all items have been popped
106   }
107   */
108 
109   bool empty() @property @nogc nothrow const {
110     return _tail == _head;
111   }
112 
113   alias empty is_empty;
114 
115   auto ref front() @property {
116     enforce(!is_empty, "ringbuffer is empty");
117     return _items[_head.get() % $];
118   }
119   alias back last;
120 
121   auto ref back() @property {
122     enforce(!is_empty, "ringbuffer is empty");
123     return _items[(_tail.get() - 1) % $];
124   }
125 
126   alias front first;
127 
128   bool is_tail(RingBufferIndex idx) {
129     return idx == _tail.get()-1;
130   }
131 
132   ref T get_at(RingBufferIndex idx) {
133     enforce(!is_empty, "ringbuffer is empty");
134     enforce(idx >= _head, "ringbuffer range error (idx before front)");
135     enforce(idx != _tail, "ringbuffer range error (idx at end)");
136     enforce(idx < _tail, "ringbuffer range error (idx after end)");
137     return _items[idx.get() % $];
138   }
139 
140   bool is_valid(RingBufferIndex idx) {
141     enforce(!is_empty, "ringbuffer is empty");
142     enforce(idx >= _head, "ringbuffer range error (idx before front)");
143     enforce(idx != _tail, "ringbuffer range error (idx at end)");
144     enforce(idx < _tail, "ringbuffer range error (idx after end)");
145     return true;
146   }
147 
148   // This function is a hack.
149   void update_at(RingBufferIndex idx, T item) {
150     is_valid(idx);
151     _items[idx.get() % $] = item; // uses copy semantics
152   }
153 
154   RingBufferIndex popFront() {
155     enforce(!is_empty, "ringbuffer is empty");
156     static if (__traits(compiles, _items[0].cleanupx)) {
157       // write("x");
158       _items[_head.get() % $].cleanup();
159     }
160     ++_head.value;
161     return _head;
162   }
163 
164   /// Puts item on the stack and returns the index
165   RingBufferIndex put(T item) {
166     enforce(!is_full, "ringbuffer is full - you need to expand buffer");
167     enforce(_tail < _tail.max, "ringbuffer overflow");
168     max_size = length > max_size ? length : max_size;
169     _items[_tail.get() % $] = item; // uses copy semantics
170     auto prev = _tail;
171     ++_tail.value;
172     return prev;
173   }
174 
175   ulong length() @property const {
176     // writeln(_tail.get(),":",_head.get(),"= len ",_tail.get()-_head.get());
177     return _tail.get() - _head.get();
178   }
179 
180   bool is_full() @property const {
181     return _items.length == length();
182   }
183 
184   bool in_range(RingBufferIndex idx) @property const {
185     return idx >= _head && idx < _tail;
186   }
187 
188   ulong pushed() @property const {
189     return _tail.value;
190   }
191   ulong popped() @property const {
192     return _head.value;
193   }
194 
195   @property void cleanup() {
196     _head = RingBufferIndex();
197     _tail = RingBufferIndex();
198   }
199 
200   string toString() {
201     string res = "ring ";
202     for(RingBufferIndex i = _head; i<_tail; i++)
203       res ~= to!string(get_at(i));
204     return res;
205   }
206 
207   @property string stats() {
208     return "Ringbuffer pushed " ~ to!string(pushed) ~ " popped " ~ to!string(popped) ~ " max-size " ~
209       to!string(max_size) // , "/", (pileup.ring.max_size+1)/pileup.ring.length);
210       ;
211   }
212 }
213 
214 unittest {
215     auto buf = RingBuffer!int(4);
216     assert(buf.is_empty);
217 
218     buf.put(1);
219     buf.put(2);
220     assert(buf.length == 2);
221     assert(buf.front == 1);
222     buf.popFront(); // 1
223     buf.popFront(); // 2
224     buf.put(2);
225     buf.put(1);
226     buf.put(0);
227     buf.put(3);
228     assert(buf.is_full);
229     assert(buf.front == 2);
230     buf.popFront();
231     assert(buf.front == 1);
232     buf.put(4);
233     buf.popFront();
234     assert(buf.front == 0);
235     buf.popFront();
236     assert(buf.front == 3);
237     buf.popFront();
238     assert(buf.front == 4);
239     buf.popFront();
240     assert(buf.is_empty);
241 }
242 
243 /**
244    Represent a pileup of reads in a buffer.
245 */
246 
247 /*
248   Read RingBuffer with current pointer, so you have three states
249   (first, current, last).
250 */
251 
252 class PileUp(R) {
253   RingBuffer!R ring;
254   Nullable!RingBufferIndex current;
255 
256   this(ulong bufsize=DEFAULT_BUFFER_SIZE) {
257     ring = RingBuffer!R(bufsize);
258     set_current_to_head;
259   }
260 
261   RingBufferIndex push(R r) { return ring.put(r); }
262   bool empty() @property const { return ring.is_empty();}
263   bool is_full() @property const { return ring.is_full();}
264   RingBufferIndex popFront() { return ring.popFront(); }
265   ref R front() { return ring.front(); }
266   alias front leftmost;
267   ref R rightmost() { return ring.back(); }
268   ref R read(RingBufferIndex idx) {
269     enforce(ring.in_range(idx), "idx should be set for PileUp.read");
270     return ring.get_at(idx);
271   }
272   ref R read_current() {
273     enforce(!current.isNull, "current should be set for PileUp.read_current");
274     return read(current);
275   }
276   bool is_at_end(RingBufferIndex idx) { return ring.is_tail(idx); }
277 
278   @property void current_inc() {
279     asserte(!empty);
280     asserte(!ring.is_tail(current));
281     ++current;
282   }
283 
284   @property void set_current_to_head() {
285     current = ring._head; // note pileup can be empty
286   }
287 
288   void current_reset() {
289     current = RingBufferIndex();
290   }
291 
292   @property bool current_is_tail() {
293     return ring.is_tail(current);
294   }
295 
296   void each(void delegate(R) dg) {
297     auto idx = ring._head;
298     while(!ring.is_tail(idx)) {
299       auto r = read(idx);
300       dg(r);
301       idx++;
302     }
303   }
304 
305   void each_left_of_current(void delegate(RingBufferIndex, R) dg) {
306     R cur = read_current;
307     if (cur.is_unmapped) return;
308     auto idx = ring._head;
309     while(!ring.is_tail(idx)) {
310       auto r = read(idx);
311       if (r.is_mapped && r.end_pos >= cur.start_pos)
312         return;
313       dg(idx,r);
314       idx++;
315     }
316   }
317 
318   void purge_while(bool delegate(R) dg) {
319     while(!empty) {
320       if (!dg(front))
321         return; // skip the rest
322       popFront();
323     }
324   }
325 
326   void purge(void delegate(R) dg) {
327     while(!empty) {
328       dg(front);
329       popFront();
330     }
331     set_current_to_head();
332   }
333 
334   @property string stats() {
335     return ring.stats();
336   }
337 
338   override string toString() {
339     return stats;
340   }
341 }