1 /*
2     This file is part of BioD.
3     Copyright (C) 2012    Artem Tarasov <lomereiter@gmail.com>
4 
5     Permission is hereby granted, free of charge, to any person obtaining a
6     copy of this software and associated documentation files (the "Software"),
7     to deal in the Software without restriction, including without limitation
8     the rights to use, copy, modify, merge, publish, distribute, sublicense,
9     and/or sell copies of the Software, and to permit persons to whom the
10     Software is furnished to do so, subject to the following conditions:
11     
12     The above copyright notice and this permission notice shall be included in
13     all copies or substantial portions of the Software.
14     
15     THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16     IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17     FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18     AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19     LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20     FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21     DEALINGS IN THE SOFTWARE.
22 
23 */
24 module bio.core.utils.range;
25 
26 import bio.core.utils.roundbuf;
27 
28 import std.range;
29 import std.exception;
30 import std.algorithm;
31 import std.parallelism;
32 import std.functional;
33 import std.array;
34 
35 /// Keeps a cyclic buffer of size $(D amount)
36 /// which is filled at the construction.
37 /// After that, each popFront() is accompanied
38 /// by fetching next element from the original range.
39 ///
40 /// The function is useful when, for instance, range of Tasks
41 /// is being decorated, because it allows to keep a certain amount
42 /// of them being executed simultaneously, utilizing all
43 /// CPU cores.
44 auto prefetch(Range)(Range r, size_t amount) {
45 
46     enforce(amount > 0, "Amount of elements to prefetch must be positive");
47 
48     struct Result {
49         alias ElementType!Range E;
50 
51         this(Range range, size_t amount) {
52             _roundbuf = RoundBuf!E(amount);
53             _range = range;
54             foreach (i; 0 .. amount) {
55                 if (_range.empty) {
56                     break;
57                 }
58                 _roundbuf.put(_range.front);
59                 _range.popFront();
60             }
61         }
62         
63         bool empty() @property {
64             return _range.empty && _roundbuf.empty;
65         }
66         
67         auto ref front() @property {
68             return _roundbuf.front;
69         }
70 
71         void popFront() @property {
72             assert(!_roundbuf.empty);
73 
74             if (_range.empty) {
75                 _roundbuf.popFront();
76                 return;
77             }
78 
79             _roundbuf.popFront();
80             _roundbuf.put(_range.front);
81 
82             _range.popFront();
83         }
84     private:
85         Range _range;
86         RoundBuf!E _roundbuf;
87     }
88 
89     return Result(r, amount);
90 }
91 
92 ///
93 struct Cached(R) {
94     private {
95         alias ElementType!R E;
96         R _range;
97         E _front;
98         bool _empty;
99     }
100 
101     this(R range) {
102         _range = range;
103         popFront();
104     }
105 
106     auto front() { return _front; }
107     bool empty() { return _empty; }
108     void popFront() {
109         if (_range.empty) {
110             _empty = true;
111         } else {
112             _front = _range.front;
113             _range.popFront();
114         }
115     }
116 }
117 
118 /// Caches front element.
119 auto cached(R)(R range) {
120     return Cached!R(range);
121 }
122 
123 unittest {
124     import std.algorithm;
125 
126     ubyte[] emptyrange = [];
127     assert(equal(emptyrange, prefetch(emptyrange, 42)));
128 
129     auto range = [1, 2, 3, 4, 5];
130     assert(equal(range, prefetch(range, 1)));
131     assert(equal(range, prefetch(range, 3)));
132     assert(equal(range, prefetch(range, 5)));
133     assert(equal(range, prefetch(range, 7)));
134 
135     assert(equal(range, cached(range)));
136 }
137 
138 /// Takes arbitrary input range as an input and returns
139 /// another range which produces arrays of original elements
140 /// of size $(D chunk_size).
141 ///
142 /// Useful for setting granularity in parallel applications.
143 /// $(D std.algorithm.joiner) composed with $(D chunked) 
144 /// produces same elements as were in the original range.
145 /// 
146 /// The difference from $(D std.range.chunks) is that
147 /// any input range is allowed, no slicing or length is required.
148 /// The cost is memory allocations for chunks.
149 auto chunked(R)(R range, uint chunk_size) {
150 
151     alias ElementType!R E;
152 
153     struct Result {
154 
155         this(R range, uint chunk_size) {
156             enforce(chunk_size > 0);
157             this.range = range;
158             this.chunk_size = chunk_size; 
159             fillBuffer();
160         }
161 
162         bool empty() @property {
163             return buffer.length == 0;
164         }
165 
166         E[] front() @property {
167             return buffer;    
168         }
169 
170         void popFront() {
171             fillBuffer();
172         }
173 
174     private:
175         R range;
176         uint chunk_size;
177 
178         E[] buffer;
179 
180         void fillBuffer() {
181             buffer = uninitializedArray!(E[])(chunk_size);
182             for (auto i = 0; i < chunk_size; i++) {
183                 if (range.empty) {
184                     buffer.length = i;
185                     break;
186                 }
187                 buffer[i] = range.front;
188                 range.popFront();
189             }
190         }
191     }
192 
193     return Result(range, chunk_size);
194 }
195 
196 unittest {
197     import std.algorithm;
198 
199     assert(equal(chunked(iota(1, 6), 2), [[1, 2], [3, 4], [5]]));
200     assert(equal(chunked(iota(1, 7), 2), [[1, 2], [3, 4], [5, 6]]));
201     assert(equal(chunked([1], 10), [[1]]));
202     assert(equal(chunked(iota(1, 10), 7), [[1, 2, 3, 4, 5, 6, 7], [8,9]]));
203 
204     auto r = iota(25);
205     assert(equal(joiner(chunked(r, 7)), r));
206 } 
207 
208 /// Version of parallel map using cyclic buffer with prefetching.
209 /// Uses combination of chunked, prefetch, joiner, and std.parallelism.
210 ///
211 /// The analogue in Haskell is Control.Parallel.Strategies.parBuffer
212 /// 
213 /// Params:
214 ///     prefetch_amount -   how many chunks will be prefetched
215 ///     chunk_size      -   the maximum size of each chunk
216 auto parallelTransform(alias func, Range)(Range r, 
217                                           uint chunk_size=1, 
218                                           uint prefetch_amount=totalCPUs-1)
219 {
220     alias ElementType!Range E;
221 
222     static auto createTask(E[] elements) {
223         auto task =  task!(pipe!(map!(unaryFun!func), array))(elements);
224         taskPool.put(task);
225         return task;
226     }
227 
228     if (prefetch_amount == 0) {
229         prefetch_amount = 1;
230     }
231 
232     auto chunks = chunked(r, chunk_size);
233     auto tasks = map!createTask(chunks);
234     auto prefetched = prefetch(tasks, prefetch_amount);
235     return joiner(map!"a.yieldForce()"(prefetched));
236 }
237 
238 unittest {
239     auto range = iota(100);
240     assert(equal(parallelTransform!"a * a"(range), map!"a * a"(range)));
241 }
242 
243 struct PrefixSum(S) {
244     private {
245         S _sequence;
246         ElementType!S _sum;
247     }
248 
249     this(S sequence) {
250         _sequence = sequence;
251         if (!_sequence.empty) {
252             _sum = _sequence.front;
253         }
254     }
255 
256     bool empty() @property {
257         return _sequence.empty;
258     }
259 
260     ElementType!S front() @property {
261         return _sum;
262     }
263 
264     void popFront() {
265         _sequence.popFront();
266         if (!_sequence.empty) {
267             _sum += _sequence.front;
268         }
269     }
270 }
271 
272 /// Prefix sum.
273 PrefixSum!S prefixSum(S)(S sequence) {
274     return PrefixSum!S(sequence);
275 }
276 
277 unittest {
278     auto range = iota(5);
279     assert(equal(prefixSum(range), [0, 1, 3, 6, 10]));
280 }