1 /* 2 This file is part of BioD. 3 Copyright (C) 2012 Artem Tarasov <lomereiter@gmail.com> 4 5 Permission is hereby granted, free of charge, to any person obtaining a 6 copy of this software and associated documentation files (the "Software"), 7 to deal in the Software without restriction, including without limitation 8 the rights to use, copy, modify, merge, publish, distribute, sublicense, 9 and/or sell copies of the Software, and to permit persons to whom the 10 Software is furnished to do so, subject to the following conditions: 11 12 The above copyright notice and this permission notice shall be included in 13 all copies or substantial portions of the Software. 14 15 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 16 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 17 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 18 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 19 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 20 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER 21 DEALINGS IN THE SOFTWARE. 22 23 */ 24 module bio.core.utils.range; 25 26 import bio.core.utils.roundbuf; 27 28 import std.range; 29 import std.exception; 30 import std.algorithm; 31 import std.parallelism; 32 import std.functional; 33 import std.array; 34 35 /// Keeps a cyclic buffer of size $(D amount) 36 /// which is filled at the construction. 37 /// After that, each popFront() is accompanied 38 /// by fetching next element from the original range. 39 /// 40 /// The function is useful when, for instance, range of Tasks 41 /// is being decorated, because it allows to keep a certain amount 42 /// of them being executed simultaneously, utilizing all 43 /// CPU cores. 44 auto prefetch(Range)(Range r, size_t amount) { 45 46 enforce(amount > 0, "Amount of elements to prefetch must be positive"); 47 48 struct Result { 49 alias ElementType!Range E; 50 51 this(Range range, size_t amount) { 52 _roundbuf = RoundBuf!E(amount); 53 _range = range; 54 foreach (i; 0 .. amount) { 55 if (_range.empty) { 56 break; 57 } 58 _roundbuf.put(_range.front); 59 _range.popFront(); 60 } 61 } 62 63 bool empty() @property { 64 return _range.empty && _roundbuf.empty; 65 } 66 67 auto ref front() @property { 68 return _roundbuf.front; 69 } 70 71 void popFront() @property { 72 assert(!_roundbuf.empty); 73 74 if (_range.empty) { 75 _roundbuf.popFront(); 76 return; 77 } 78 79 _roundbuf.popFront(); 80 _roundbuf.put(_range.front); 81 82 _range.popFront(); 83 } 84 private: 85 Range _range; 86 RoundBuf!E _roundbuf; 87 } 88 89 return Result(r, amount); 90 } 91 92 /// 93 struct Cached(R) { 94 private { 95 alias ElementType!R E; 96 R _range; 97 E _front; 98 bool _empty; 99 } 100 101 this(R range) { 102 _range = range; 103 popFront(); 104 } 105 106 auto front() { return _front; } 107 bool empty() { return _empty; } 108 void popFront() { 109 if (_range.empty) { 110 _empty = true; 111 } else { 112 _front = _range.front; 113 _range.popFront(); 114 } 115 } 116 } 117 118 /// Caches front element. 119 auto cached(R)(R range) { 120 return Cached!R(range); 121 } 122 123 unittest { 124 import std.algorithm; 125 126 ubyte[] emptyrange = []; 127 assert(equal(emptyrange, prefetch(emptyrange, 42))); 128 129 auto range = [1, 2, 3, 4, 5]; 130 assert(equal(range, prefetch(range, 1))); 131 assert(equal(range, prefetch(range, 3))); 132 assert(equal(range, prefetch(range, 5))); 133 assert(equal(range, prefetch(range, 7))); 134 135 assert(equal(range, cached(range))); 136 } 137 138 /// Takes arbitrary input range as an input and returns 139 /// another range which produces arrays of original elements 140 /// of size $(D chunk_size). 141 /// 142 /// Useful for setting granularity in parallel applications. 143 /// $(D std.algorithm.joiner) composed with $(D chunked) 144 /// produces same elements as were in the original range. 145 /// 146 /// The difference from $(D std.range.chunks) is that 147 /// any input range is allowed, no slicing or length is required. 148 /// The cost is memory allocations for chunks. 149 auto chunked(R)(R range, uint chunk_size) { 150 151 alias ElementType!R E; 152 153 struct Result { 154 155 this(R range, uint chunk_size) { 156 enforce(chunk_size > 0); 157 this.range = range; 158 this.chunk_size = chunk_size; 159 fillBuffer(); 160 } 161 162 bool empty() @property { 163 return buffer.length == 0; 164 } 165 166 E[] front() @property { 167 return buffer; 168 } 169 170 void popFront() { 171 fillBuffer(); 172 } 173 174 private: 175 R range; 176 uint chunk_size; 177 178 E[] buffer; 179 180 void fillBuffer() { 181 buffer = uninitializedArray!(E[])(chunk_size); 182 for (auto i = 0; i < chunk_size; i++) { 183 if (range.empty) { 184 buffer.length = i; 185 break; 186 } 187 buffer[i] = range.front; 188 range.popFront(); 189 } 190 } 191 } 192 193 return Result(range, chunk_size); 194 } 195 196 unittest { 197 import std.algorithm; 198 199 assert(equal(chunked(iota(1, 6), 2), [[1, 2], [3, 4], [5]])); 200 assert(equal(chunked(iota(1, 7), 2), [[1, 2], [3, 4], [5, 6]])); 201 assert(equal(chunked([1], 10), [[1]])); 202 assert(equal(chunked(iota(1, 10), 7), [[1, 2, 3, 4, 5, 6, 7], [8,9]])); 203 204 auto r = iota(25); 205 assert(equal(joiner(chunked(r, 7)), r)); 206 } 207 208 /// Version of parallel map using cyclic buffer with prefetching. 209 /// Uses combination of chunked, prefetch, joiner, and std.parallelism. 210 /// 211 /// The analogue in Haskell is Control.Parallel.Strategies.parBuffer 212 /// 213 /// Params: 214 /// prefetch_amount - how many chunks will be prefetched 215 /// chunk_size - the maximum size of each chunk 216 auto parallelTransform(alias func, Range)(Range r, 217 uint chunk_size=1, 218 uint prefetch_amount=totalCPUs-1) 219 { 220 alias ElementType!Range E; 221 222 static auto createTask(E[] elements) { 223 auto task = task!(pipe!(map!(unaryFun!func), array))(elements); 224 taskPool.put(task); 225 return task; 226 } 227 228 if (prefetch_amount == 0) { 229 prefetch_amount = 1; 230 } 231 232 auto chunks = chunked(r, chunk_size); 233 auto tasks = map!createTask(chunks); 234 auto prefetched = prefetch(tasks, prefetch_amount); 235 return joiner(map!"a.yieldForce()"(prefetched)); 236 } 237 238 unittest { 239 auto range = iota(100); 240 assert(equal(parallelTransform!"a * a"(range), map!"a * a"(range))); 241 } 242 243 struct PrefixSum(S) { 244 private { 245 S _sequence; 246 ElementType!S _sum; 247 } 248 249 this(S sequence) { 250 _sequence = sequence; 251 if (!_sequence.empty) { 252 _sum = _sequence.front; 253 } 254 } 255 256 bool empty() @property { 257 return _sequence.empty; 258 } 259 260 ElementType!S front() @property { 261 return _sum; 262 } 263 264 void popFront() { 265 _sequence.popFront(); 266 if (!_sequence.empty) { 267 _sum += _sequence.front; 268 } 269 } 270 } 271 272 /// Prefix sum. 273 PrefixSum!S prefixSum(S)(S sequence) { 274 return PrefixSum!S(sequence); 275 } 276 277 unittest { 278 auto range = iota(5); 279 assert(equal(prefixSum(range), [0, 1, 3, 6, 10])); 280 }