1 /* 2 This file is part of Sambamba. 3 Copyright (C) 2017 Pjotr Prins <pjotr.prins@thebird.nl> 4 5 Sambamba is free software; you can redistribute it and/or modify 6 it under the terms of the GNU General Public License as published 7 by the Free Software Foundation; either version 2 of the License, 8 or (at your option) any later version. 9 10 Sambamba is distributed in the hope that it will be useful, but 11 WITHOUT ANY WARRANTY; without even the implied warranty of 12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13 General Public License for more details. 14 15 You should have received a copy of the GNU General Public License 16 along with this program; if not, write to the Free Software 17 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 18 02111-1307 USA 19 20 */ 21 module bio.std.experimental.hts.pileup; 22 23 import std.conv; 24 import std.exception; 25 import std.stdio; 26 import std.traits; 27 import std.typecons; 28 29 import std.experimental.logger; 30 31 import bio.std.experimental.hts.constants; 32 import bio.core.utils.exception; 33 34 immutable ulong DEFAULT_BUFFER_SIZE = 1_000_000; 35 36 /** 37 Cyclic buffer or ringbuffer based on Artem's original. Uses copy 38 semantics to copy a read in a pre-allocated buffer. New items get 39 added to the tail, and used items get popped from the head 40 (FIFO). Basically it is empty when pointers align and full when 41 head - tail equals length. Not that the pointers are of size_t 42 which puts a theoretical limit on the number of items that can be 43 pushed. 44 */ 45 46 import core.stdc..string : memcpy; 47 48 // alias ulong RingBufferIndex; 49 50 struct RingBufferIndex { 51 alias Representation = ulong; 52 private ulong value = 0; 53 54 this(ulong v) { 55 value = v; 56 } 57 58 // @disable this(this); // disable copy semantics; 59 60 auto get() inout { 61 return value; 62 } 63 64 auto max() @property { 65 return value.max; 66 } 67 68 void opAssign(U)(U rhs) if (is(typeof(Checked!(T, Hook)(rhs)))) { 69 value = rhs; 70 } 71 72 bool opEquals(U, this _)(U rhs) { 73 return value == rhs; 74 } 75 76 auto opCmp(U, this _)(const U rhs) { 77 return value < rhs ? -1 : value > rhs; 78 } 79 80 ulong opUnary(string s)() if (s == "++") { 81 return ++value; 82 } 83 84 } 85 86 87 struct RingBuffer(T) { 88 89 T[] _items; 90 RingBufferIndex _head; 91 RingBufferIndex _tail; 92 size_t max_size = 0; 93 94 /** initializes round buffer of size $(D n) */ 95 this(size_t n) { 96 _items = new T[n]; 97 // _items.reserve(n); 98 } 99 100 @disable this(this); // disable copy semantics; 101 102 /* 103 Does not work because data is no longer available! 104 ~this() { 105 // assert(is_empty); // make sure all items have been popped 106 } 107 */ 108 109 bool empty() @property @nogc nothrow const { 110 return _tail == _head; 111 } 112 113 alias empty is_empty; 114 115 auto ref front() @property { 116 enforce(!is_empty, "ringbuffer is empty"); 117 return _items[_head.get() % $]; 118 } 119 alias back last; 120 121 auto ref back() @property { 122 enforce(!is_empty, "ringbuffer is empty"); 123 return _items[(_tail.get() - 1) % $]; 124 } 125 126 alias front first; 127 128 bool is_tail(RingBufferIndex idx) { 129 return idx == _tail.get()-1; 130 } 131 132 ref T get_at(RingBufferIndex idx) { 133 enforce(!is_empty, "ringbuffer is empty"); 134 enforce(idx >= _head, "ringbuffer range error (idx before front)"); 135 enforce(idx != _tail, "ringbuffer range error (idx at end)"); 136 enforce(idx < _tail, "ringbuffer range error (idx after end)"); 137 return _items[idx.get() % $]; 138 } 139 140 bool is_valid(RingBufferIndex idx) { 141 enforce(!is_empty, "ringbuffer is empty"); 142 enforce(idx >= _head, "ringbuffer range error (idx before front)"); 143 enforce(idx != _tail, "ringbuffer range error (idx at end)"); 144 enforce(idx < _tail, "ringbuffer range error (idx after end)"); 145 return true; 146 } 147 148 // This function is a hack. 149 void update_at(RingBufferIndex idx, T item) { 150 is_valid(idx); 151 _items[idx.get() % $] = item; // uses copy semantics 152 } 153 154 RingBufferIndex popFront() { 155 enforce(!is_empty, "ringbuffer is empty"); 156 static if (__traits(compiles, _items[0].cleanupx)) { 157 // write("x"); 158 _items[_head.get() % $].cleanup(); 159 } 160 ++_head.value; 161 return _head; 162 } 163 164 /// Puts item on the stack and returns the index 165 RingBufferIndex put(T item) { 166 enforce(!is_full, "ringbuffer is full - you need to expand buffer"); 167 enforce(_tail < _tail.max, "ringbuffer overflow"); 168 max_size = length > max_size ? length : max_size; 169 _items[_tail.get() % $] = item; // uses copy semantics 170 auto prev = _tail; 171 ++_tail.value; 172 return prev; 173 } 174 175 ulong length() @property const { 176 // writeln(_tail.get(),":",_head.get(),"= len ",_tail.get()-_head.get()); 177 return _tail.get() - _head.get(); 178 } 179 180 bool is_full() @property const { 181 return _items.length == length(); 182 } 183 184 bool in_range(RingBufferIndex idx) @property const { 185 return idx >= _head && idx < _tail; 186 } 187 188 ulong pushed() @property const { 189 return _tail.value; 190 } 191 ulong popped() @property const { 192 return _head.value; 193 } 194 195 @property void cleanup() { 196 _head = RingBufferIndex(); 197 _tail = RingBufferIndex(); 198 } 199 200 string toString() { 201 string res = "ring "; 202 for(RingBufferIndex i = _head; i<_tail; i++) 203 res ~= to!string(get_at(i)); 204 return res; 205 } 206 207 @property string stats() { 208 return "Ringbuffer pushed " ~ to!string(pushed) ~ " popped " ~ to!string(popped) ~ " max-size " ~ 209 to!string(max_size) // , "/", (pileup.ring.max_size+1)/pileup.ring.length); 210 ; 211 } 212 } 213 214 unittest { 215 auto buf = RingBuffer!int(4); 216 assert(buf.is_empty); 217 218 buf.put(1); 219 buf.put(2); 220 assert(buf.length == 2); 221 assert(buf.front == 1); 222 buf.popFront(); // 1 223 buf.popFront(); // 2 224 buf.put(2); 225 buf.put(1); 226 buf.put(0); 227 buf.put(3); 228 assert(buf.is_full); 229 assert(buf.front == 2); 230 buf.popFront(); 231 assert(buf.front == 1); 232 buf.put(4); 233 buf.popFront(); 234 assert(buf.front == 0); 235 buf.popFront(); 236 assert(buf.front == 3); 237 buf.popFront(); 238 assert(buf.front == 4); 239 buf.popFront(); 240 assert(buf.is_empty); 241 } 242 243 /** 244 Represent a pileup of reads in a buffer. 245 */ 246 247 /* 248 Read RingBuffer with current pointer, so you have three states 249 (first, current, last). 250 */ 251 252 class PileUp(R) { 253 RingBuffer!R ring; 254 Nullable!RingBufferIndex current; 255 256 this(ulong bufsize=DEFAULT_BUFFER_SIZE) { 257 ring = RingBuffer!R(bufsize); 258 set_current_to_head; 259 } 260 261 RingBufferIndex push(R r) { return ring.put(r); } 262 bool empty() @property const { return ring.is_empty();} 263 bool is_full() @property const { return ring.is_full();} 264 RingBufferIndex popFront() { return ring.popFront(); } 265 ref R front() { return ring.front(); } 266 alias front leftmost; 267 ref R rightmost() { return ring.back(); } 268 ref R read(RingBufferIndex idx) { 269 enforce(ring.in_range(idx), "idx should be set for PileUp.read"); 270 return ring.get_at(idx); 271 } 272 ref R read_current() { 273 enforce(!current.isNull, "current should be set for PileUp.read_current"); 274 return read(current); 275 } 276 bool is_at_end(RingBufferIndex idx) { return ring.is_tail(idx); } 277 278 @property void current_inc() { 279 asserte(!empty); 280 asserte(!ring.is_tail(current)); 281 ++current; 282 } 283 284 @property void set_current_to_head() { 285 current = ring._head; // note pileup can be empty 286 } 287 288 void current_reset() { 289 current = RingBufferIndex(); 290 } 291 292 @property bool current_is_tail() { 293 return ring.is_tail(current); 294 } 295 296 void each(void delegate(R) dg) { 297 auto idx = ring._head; 298 while(!ring.is_tail(idx)) { 299 auto r = read(idx); 300 dg(r); 301 idx++; 302 } 303 } 304 305 void each_left_of_current(void delegate(RingBufferIndex, R) dg) { 306 R cur = read_current; 307 if (cur.is_unmapped) return; 308 auto idx = ring._head; 309 while(!ring.is_tail(idx)) { 310 auto r = read(idx); 311 if (r.is_mapped && r.end_pos >= cur.start_pos) 312 return; 313 dg(idx,r); 314 idx++; 315 } 316 } 317 318 void purge_while(bool delegate(R) dg) { 319 while(!empty) { 320 if (!dg(front)) 321 return; // skip the rest 322 popFront(); 323 } 324 } 325 326 void purge(void delegate(R) dg) { 327 while(!empty) { 328 dg(front); 329 popFront(); 330 } 331 set_current_to_head(); 332 } 333 334 @property string stats() { 335 return ring.stats(); 336 } 337 338 override string toString() { 339 return stats; 340 } 341 }