Make most implicit integer truncations and sign conversions explicit
[folly.git] / folly / gen / Parallel-inl.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #ifndef FOLLY_GEN_PARALLEL_H_
18 #error This file may only be included from folly/gen/ParallelGen.h
19 #endif
20
21 #include <folly/MPMCQueue.h>
22 #include <folly/ScopeGuard.h>
23 #include <folly/experimental/EventCount.h>
24 #include <atomic>
25 #include <thread>
26 #include <vector>
27
28 namespace folly {
29 namespace gen {
30 namespace detail {
31
32 template <typename T>
33 class ClosableMPMCQueue {
34   MPMCQueue<T> queue_;
35   std::atomic<size_t> producers_{0};
36   std::atomic<size_t> consumers_{0};
37   folly::EventCount wakeProducer_;
38   folly::EventCount wakeConsumer_;
39
40  public:
41   explicit ClosableMPMCQueue(size_t capacity) : queue_(capacity) {}
42
43   ~ClosableMPMCQueue() {
44     CHECK(!producers());
45     CHECK(!consumers());
46   }
47
48   void openProducer() { ++producers_; }
49   void openConsumer() { ++consumers_; }
50
51   void closeInputProducer() {
52     int64_t producers = producers_--;
53     CHECK(producers);
54     if (producers == 1) { // last producer
55       wakeConsumer_.notifyAll();
56     }
57   }
58
59   void closeOutputConsumer() {
60     int64_t consumers = consumers_--;
61     CHECK(consumers);
62     if (consumers == 1) { // last consumer
63       wakeProducer_.notifyAll();
64     }
65   }
66
67   size_t producers() const {
68     return producers_.load(std::memory_order_acquire);
69   }
70
71   size_t consumers() const {
72     return consumers_.load(std::memory_order_acquire);
73   }
74
75   template <typename... Args>
76   bool writeUnlessFull(Args&&... args) noexcept {
77     if (queue_.write(std::forward<Args>(args)...)) {
78       // wake consumers to pick up new value
79       wakeConsumer_.notify();
80       return true;
81     }
82     return false;
83   }
84
85   template <typename... Args>
86   bool writeUnlessClosed(Args&&... args) {
87     // write if there's room
88     while (!queue_.writeIfNotFull(std::forward<Args>(args)...)) {
89       // if write fails, check if there are still consumers listening
90       auto key = wakeProducer_.prepareWait();
91       if (!consumers()) {
92         // no consumers left; bail out
93         wakeProducer_.cancelWait();
94         return false;
95       }
96       wakeProducer_.wait(key);
97     }
98     // wake consumers to pick up new value
99     wakeConsumer_.notify();
100     return true;
101   }
102
103   bool readUnlessEmpty(T& out) {
104     if (queue_.read(out)) {
105       // wake producers to fill empty space
106       wakeProducer_.notify();
107       return true;
108     }
109     return false;
110   }
111
112   bool readUnlessClosed(T& out) {
113     while (!queue_.readIfNotEmpty(out)) {
114       auto key = wakeConsumer_.prepareWait();
115       if (!producers()) {
116         // wake producers to fill empty space
117         wakeProducer_.notify();
118         return false;
119       }
120       wakeConsumer_.wait(key);
121     }
122     // wake writers blocked by full queue
123     wakeProducer_.notify();
124     return true;
125   }
126 };
127
128 template <class Sink>
129 class Sub : public Operator<Sub<Sink>> {
130   Sink sink_;
131
132  public:
133   explicit Sub(Sink sink) : sink_(sink) {}
134
135   template <class Value,
136             class Source,
137             class Result =
138                 decltype(std::declval<Sink>().compose(std::declval<Source>())),
139             class Just = SingleCopy<typename std::decay<Result>::type>>
140   Just compose(const GenImpl<Value, Source>& source) const {
141     return Just(source | sink_);
142   }
143 };
144
145 template <class Ops>
146 class Parallel : public Operator<Parallel<Ops>> {
147   Ops ops_;
148   size_t threads_;
149
150  public:
151   Parallel(Ops ops, size_t threads) : ops_(std::move(ops)), threads_(threads) {}
152
153   template <class Input,
154             class Source,
155             class InputDecayed = typename std::decay<Input>::type,
156             class Composed =
157                 decltype(std::declval<Ops>().compose(Empty<InputDecayed&&>())),
158             class Output = typename Composed::ValueType,
159             class OutputDecayed = typename std::decay<Output>::type>
160   class Generator : public GenImpl<OutputDecayed&&,
161                                    Generator<Input,
162                                              Source,
163                                              InputDecayed,
164                                              Composed,
165                                              Output,
166                                              OutputDecayed>> {
167     const Source source_;
168     const Ops ops_;
169     const size_t threads_;
170     typedef ClosableMPMCQueue<InputDecayed> InQueue;
171     typedef ClosableMPMCQueue<OutputDecayed> OutQueue;
172
173     class Puller : public GenImpl<InputDecayed&&, Puller> {
174       InQueue* queue_;
175
176      public:
177       explicit Puller(InQueue* queue) : queue_(queue) {}
178
179       template <class Handler>
180       bool apply(Handler&& handler) const {
181         InputDecayed input;
182         while (queue_->readUnlessClosed(input)) {
183           if (!handler(std::move(input))) {
184             return false;
185           }
186         }
187         return true;
188       }
189
190       template <class Body>
191       void foreach(Body&& body) const {
192         InputDecayed input;
193         while (queue_->readUnlessClosed(input)) {
194           body(std::move(input));
195         }
196       }
197     };
198
199     template <bool all = false>
200     class Pusher : public Operator<Pusher<all>> {
201       OutQueue* queue_;
202
203      public:
204       explicit Pusher(OutQueue* queue) : queue_(queue) {}
205
206       template <class Value, class InnerSource>
207       void compose(const GenImpl<Value, InnerSource>& source) const {
208         if (all) {
209           source.self().foreach([&](Value value) {
210             queue_->writeUnlessClosed(std::forward<Value>(value));
211           });
212         } else {
213           source.self().apply([&](Value value) {
214             return queue_->writeUnlessClosed(std::forward<Value>(value));
215           });
216         }
217       }
218     };
219
220     template <bool all = false>
221     class Executor {
222       InQueue inQueue_;
223       OutQueue outQueue_;
224       Puller puller_;
225       Pusher<all> pusher_;
226       std::vector<std::thread> workers_;
227       const Ops* ops_;
228
229       void work() {
230         puller_ | *ops_ | pusher_;
231       }
232
233      public:
234       Executor(size_t threads, const Ops* ops)
235           : inQueue_(threads * 4),
236             outQueue_(threads * 4),
237             puller_(&inQueue_),
238             pusher_(&outQueue_),
239             ops_(ops) {
240         inQueue_.openProducer();
241         outQueue_.openConsumer();
242         for (size_t t = 0; t < threads; ++t) {
243           inQueue_.openConsumer();
244           outQueue_.openProducer();
245           workers_.emplace_back([this] {
246             SCOPE_EXIT {
247               inQueue_.closeOutputConsumer();
248               outQueue_.closeInputProducer();
249             };
250             this->work();
251           });
252         }
253       }
254
255       ~Executor() {
256         if (inQueue_.producers()) {
257           inQueue_.closeInputProducer();
258         }
259         if (outQueue_.consumers()) {
260           outQueue_.closeOutputConsumer();
261         }
262         while (!workers_.empty()) {
263           workers_.back().join();
264           workers_.pop_back();
265         }
266         CHECK(!inQueue_.consumers());
267         CHECK(!outQueue_.producers());
268       }
269
270       void closeInputProducer() { inQueue_.closeInputProducer(); }
271
272       void closeOutputConsumer() { outQueue_.closeOutputConsumer(); }
273
274       bool writeUnlessClosed(Input&& input) {
275         return inQueue_.writeUnlessClosed(std::forward<Input>(input));
276       }
277
278       bool writeUnlessFull(Input&& input) {
279         return inQueue_.writeUnlessFull(std::forward<Input>(input));
280       }
281
282       bool readUnlessClosed(OutputDecayed& output) {
283         return outQueue_.readUnlessClosed(output);
284       }
285
286       bool readUnlessEmpty(OutputDecayed& output) {
287         return outQueue_.readUnlessEmpty(output);
288       }
289     };
290
291    public:
292     Generator(Source source, Ops ops, size_t threads)
293         : source_(std::move(source)),
294           ops_(std::move(ops)),
295           threads_(
296               threads
297                   ? threads
298                   : size_t(std::max<long>(1, sysconf(_SC_NPROCESSORS_CONF)))) {}
299
300     template <class Handler>
301     bool apply(Handler&& handler) const {
302       Executor<false> executor(threads_, &ops_);
303       bool more = true;
304       source_.apply([&](Input input) {
305         if (executor.writeUnlessFull(std::forward<Input>(input))) {
306           return true;
307         }
308         OutputDecayed output;
309         while (executor.readUnlessEmpty(output)) {
310           if (!handler(std::move(output))) {
311             more = false;
312             return false;
313           }
314         }
315         if (!executor.writeUnlessClosed(std::forward<Input>(input))) {
316           return false;
317         }
318         return true;
319       });
320       executor.closeInputProducer();
321
322       if (more) {
323         OutputDecayed output;
324         while (executor.readUnlessClosed(output)) {
325           if (!handler(std::move(output))) {
326             more = false;
327             break;
328           }
329         }
330       }
331       executor.closeOutputConsumer();
332
333       return more;
334     }
335
336     template <class Body>
337     void foreach(Body&& body) const {
338       Executor<true> executor(threads_, &ops_);
339       source_.foreach([&](Input input) {
340         if (executor.writeUnlessFull(std::forward<Input>(input))) {
341           return;
342         }
343         OutputDecayed output;
344         while (executor.readUnlessEmpty(output)) {
345           body(std::move(output));
346         }
347         CHECK(executor.writeUnlessClosed(std::forward<Input>(input)));
348       });
349       executor.closeInputProducer();
350
351       OutputDecayed output;
352       while (executor.readUnlessClosed(output)) {
353         body(std::move(output));
354       }
355       executor.closeOutputConsumer();
356     }
357   };
358
359   template <class Value, class Source>
360   Generator<Value, Source> compose(const GenImpl<Value, Source>& source) const {
361     return Generator<Value, Source>(source.self(), ops_, threads_);
362   }
363
364   template <class Value, class Source>
365   Generator<Value, Source> compose(GenImpl<Value, Source>&& source) const {
366     return Generator<Value, Source>(std::move(source.self()), ops_, threads_);
367   }
368 };
369
370 /**
371  * ChunkedRangeSource - For slicing up ranges into a sequence of chunks given a
372  * maximum chunk size.
373  *
374  * Usually used through the 'chunked' helper, like:
375  *
376  *   int n
377  *     = chunked(values)
378  *     | parallel  // each thread processes a chunk
379  *     | concat   // but can still process values one at a time
380  *     | filter(isPrime)
381  *     | atomic_count;
382  */
383 template <class Iterator>
384 class ChunkedRangeSource
385     : public GenImpl<RangeSource<Iterator>&&, ChunkedRangeSource<Iterator>> {
386   int chunkSize_;
387   Range<Iterator> range_;
388
389  public:
390   ChunkedRangeSource() = default;
391   ChunkedRangeSource(int chunkSize, Range<Iterator> range)
392       : chunkSize_(chunkSize), range_(std::move(range)) {}
393
394   template <class Handler>
395   bool apply(Handler&& handler) const {
396     auto remaining = range_;
397     while (!remaining.empty()) {
398       auto chunk = remaining.subpiece(0, chunkSize_);
399       remaining.advance(chunk.size());
400       auto gen = RangeSource<Iterator>(chunk);
401       if (!handler(std::move(gen))) {
402         return false;
403       }
404     }
405     return true;
406   }
407 };
408
409 } // namespace detail
410
411 } // namespace gen
412 } // namespace folly