Fix copyright lines
[folly.git] / folly / gen / Parallel-inl.h
1 /*
2  * Copyright 2014-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #ifndef FOLLY_GEN_PARALLEL_H_
18 #error This file may only be included from folly/gen/ParallelGen.h
19 #endif
20
21 #include <folly/MPMCQueue.h>
22 #include <folly/ScopeGuard.h>
23 #include <folly/experimental/EventCount.h>
24 #include <atomic>
25 #include <thread>
26 #include <vector>
27
28 namespace folly {
29 namespace gen {
30 namespace detail {
31
32 template <typename T>
33 class ClosableMPMCQueue {
34   MPMCQueue<T> queue_;
35   std::atomic<size_t> producers_{0};
36   std::atomic<size_t> consumers_{0};
37   folly::EventCount wakeProducer_;
38   folly::EventCount wakeConsumer_;
39
40  public:
41   explicit ClosableMPMCQueue(size_t capacity) : queue_(capacity) {}
42
43   ~ClosableMPMCQueue() {
44     CHECK(!producers());
45     CHECK(!consumers());
46   }
47
48   void openProducer() { ++producers_; }
49   void openConsumer() { ++consumers_; }
50
51   void closeInputProducer() {
52     size_t producers = producers_--;
53     CHECK(producers);
54     if (producers == 1) { // last producer
55       wakeConsumer_.notifyAll();
56     }
57   }
58
59   void closeOutputConsumer() {
60     size_t consumers = consumers_--;
61     CHECK(consumers);
62     if (consumers == 1) { // last consumer
63       wakeProducer_.notifyAll();
64     }
65   }
66
67   size_t producers() const {
68     return producers_.load(std::memory_order_acquire);
69   }
70
71   size_t consumers() const {
72     return consumers_.load(std::memory_order_acquire);
73   }
74
75   template <typename... Args>
76   bool writeUnlessFull(Args&&... args) noexcept {
77     if (queue_.write(std::forward<Args>(args)...)) {
78       // wake consumers to pick up new value
79       wakeConsumer_.notify();
80       return true;
81     }
82     return false;
83   }
84
85   template <typename... Args>
86   bool writeUnlessClosed(Args&&... args) {
87     // write if there's room
88     while (!queue_.writeIfNotFull(std::forward<Args>(args)...)) {
89       // if write fails, check if there are still consumers listening
90       auto key = wakeProducer_.prepareWait();
91       if (!consumers()) {
92         // no consumers left; bail out
93         wakeProducer_.cancelWait();
94         return false;
95       }
96       wakeProducer_.wait(key);
97     }
98     // wake consumers to pick up new value
99     wakeConsumer_.notify();
100     return true;
101   }
102
103   bool readUnlessEmpty(T& out) {
104     if (queue_.read(out)) {
105       // wake producers to fill empty space
106       wakeProducer_.notify();
107       return true;
108     }
109     return false;
110   }
111
112   bool readUnlessClosed(T& out) {
113     while (!queue_.readIfNotEmpty(out)) {
114       auto key = wakeConsumer_.prepareWait();
115       if (!producers()) {
116         // wake producers to fill empty space
117         wakeProducer_.notify();
118         return false;
119       }
120       wakeConsumer_.wait(key);
121     }
122     // wake writers blocked by full queue
123     wakeProducer_.notify();
124     return true;
125   }
126 };
127
128 template <class Sink>
129 class Sub : public Operator<Sub<Sink>> {
130   Sink sink_;
131
132  public:
133   explicit Sub(Sink sink) : sink_(sink) {}
134
135   template <
136       class Value,
137       class Source,
138       class Result =
139           decltype(std::declval<Sink>().compose(std::declval<Source>())),
140       class Just = SingleCopy<typename std::decay<Result>::type>>
141   Just compose(const GenImpl<Value, Source>& source) const {
142     return Just(source | sink_);
143   }
144 };
145
146 template <class Ops>
147 class Parallel : public Operator<Parallel<Ops>> {
148   Ops ops_;
149   size_t threads_;
150
151  public:
152   Parallel(Ops ops, size_t threads) : ops_(std::move(ops)), threads_(threads) {}
153
154   template <
155       class Input,
156       class Source,
157       class InputDecayed = typename std::decay<Input>::type,
158       class Composed =
159           decltype(std::declval<Ops>().compose(Empty<InputDecayed&&>())),
160       class Output = typename Composed::ValueType,
161       class OutputDecayed = typename std::decay<Output>::type>
162   class Generator : public GenImpl<OutputDecayed&&,
163                                    Generator<Input,
164                                              Source,
165                                              InputDecayed,
166                                              Composed,
167                                              Output,
168                                              OutputDecayed>> {
169     const Source source_;
170     const Ops ops_;
171     const size_t threads_;
172     typedef ClosableMPMCQueue<InputDecayed> InQueue;
173     typedef ClosableMPMCQueue<OutputDecayed> OutQueue;
174
175     class Puller : public GenImpl<InputDecayed&&, Puller> {
176       InQueue* queue_;
177
178      public:
179       explicit Puller(InQueue* queue) : queue_(queue) {}
180
181       template <class Handler>
182       bool apply(Handler&& handler) const {
183         InputDecayed input;
184         while (queue_->readUnlessClosed(input)) {
185           if (!handler(std::move(input))) {
186             return false;
187           }
188         }
189         return true;
190       }
191
192       template <class Body>
193       void foreach(Body&& body) const {
194         InputDecayed input;
195         while (queue_->readUnlessClosed(input)) {
196           body(std::move(input));
197         }
198       }
199     };
200
201     template <bool all = false>
202     class Pusher : public Operator<Pusher<all>> {
203       OutQueue* queue_;
204
205      public:
206       explicit Pusher(OutQueue* queue) : queue_(queue) {}
207
208       template <class Value, class InnerSource>
209       void compose(const GenImpl<Value, InnerSource>& source) const {
210         if (all) {
211           source.self().foreach([&](Value value) {
212             queue_->writeUnlessClosed(std::forward<Value>(value));
213           });
214         } else {
215           source.self().apply([&](Value value) {
216             return queue_->writeUnlessClosed(std::forward<Value>(value));
217           });
218         }
219       }
220     };
221
222     template <bool all = false>
223     class Executor {
224       InQueue inQueue_;
225       OutQueue outQueue_;
226       Puller puller_;
227       Pusher<all> pusher_;
228       std::vector<std::thread> workers_;
229       const Ops* ops_;
230
231       void work() {
232         puller_ | *ops_ | pusher_;
233       }
234
235      public:
236       Executor(size_t threads, const Ops* ops)
237           : inQueue_(threads * 4),
238             outQueue_(threads * 4),
239             puller_(&inQueue_),
240             pusher_(&outQueue_),
241             ops_(ops) {
242         inQueue_.openProducer();
243         outQueue_.openConsumer();
244         for (size_t t = 0; t < threads; ++t) {
245           inQueue_.openConsumer();
246           outQueue_.openProducer();
247           workers_.emplace_back([this] {
248             SCOPE_EXIT {
249               inQueue_.closeOutputConsumer();
250               outQueue_.closeInputProducer();
251             };
252             this->work();
253           });
254         }
255       }
256
257       ~Executor() {
258         if (inQueue_.producers()) {
259           inQueue_.closeInputProducer();
260         }
261         if (outQueue_.consumers()) {
262           outQueue_.closeOutputConsumer();
263         }
264         while (!workers_.empty()) {
265           workers_.back().join();
266           workers_.pop_back();
267         }
268         CHECK(!inQueue_.consumers());
269         CHECK(!outQueue_.producers());
270       }
271
272       void closeInputProducer() { inQueue_.closeInputProducer(); }
273
274       void closeOutputConsumer() { outQueue_.closeOutputConsumer(); }
275
276       bool writeUnlessClosed(Input&& input) {
277         return inQueue_.writeUnlessClosed(std::forward<Input>(input));
278       }
279
280       bool writeUnlessFull(Input&& input) {
281         return inQueue_.writeUnlessFull(std::forward<Input>(input));
282       }
283
284       bool readUnlessClosed(OutputDecayed& output) {
285         return outQueue_.readUnlessClosed(output);
286       }
287
288       bool readUnlessEmpty(OutputDecayed& output) {
289         return outQueue_.readUnlessEmpty(output);
290       }
291     };
292
293    public:
294     Generator(Source source, Ops ops, size_t threads)
295         : source_(std::move(source)),
296           ops_(std::move(ops)),
297           threads_(
298               threads
299                   ? threads
300                   : size_t(std::max<long>(1, sysconf(_SC_NPROCESSORS_CONF)))) {}
301
302     template <class Handler>
303     bool apply(Handler&& handler) const {
304       Executor<false> executor(threads_, &ops_);
305       bool more = true;
306       source_.apply([&](Input input) {
307         if (executor.writeUnlessFull(std::forward<Input>(input))) {
308           return true;
309         }
310         OutputDecayed output;
311         while (executor.readUnlessEmpty(output)) {
312           if (!handler(std::move(output))) {
313             more = false;
314             return false;
315           }
316         }
317         if (!executor.writeUnlessClosed(std::forward<Input>(input))) {
318           return false;
319         }
320         return true;
321       });
322       executor.closeInputProducer();
323
324       if (more) {
325         OutputDecayed output;
326         while (executor.readUnlessClosed(output)) {
327           if (!handler(std::move(output))) {
328             more = false;
329             break;
330           }
331         }
332       }
333       executor.closeOutputConsumer();
334
335       return more;
336     }
337
338     template <class Body>
339     void foreach(Body&& body) const {
340       Executor<true> executor(threads_, &ops_);
341       source_.foreach([&](Input input) {
342         if (executor.writeUnlessFull(std::forward<Input>(input))) {
343           return;
344         }
345         OutputDecayed output;
346         while (executor.readUnlessEmpty(output)) {
347           body(std::move(output));
348         }
349         CHECK(executor.writeUnlessClosed(std::forward<Input>(input)));
350       });
351       executor.closeInputProducer();
352
353       OutputDecayed output;
354       while (executor.readUnlessClosed(output)) {
355         body(std::move(output));
356       }
357       executor.closeOutputConsumer();
358     }
359   };
360
361   template <class Value, class Source>
362   Generator<Value, Source> compose(const GenImpl<Value, Source>& source) const {
363     return Generator<Value, Source>(source.self(), ops_, threads_);
364   }
365
366   template <class Value, class Source>
367   Generator<Value, Source> compose(GenImpl<Value, Source>&& source) const {
368     return Generator<Value, Source>(std::move(source.self()), ops_, threads_);
369   }
370 };
371
372 /**
373  * ChunkedRangeSource - For slicing up ranges into a sequence of chunks given a
374  * maximum chunk size.
375  *
376  * Usually used through the 'chunked' helper, like:
377  *
378  *   int n
379  *     = chunked(values)
380  *     | parallel  // each thread processes a chunk
381  *     | concat   // but can still process values one at a time
382  *     | filter(isPrime)
383  *     | atomic_count;
384  */
385 template <class Iterator>
386 class ChunkedRangeSource
387     : public GenImpl<RangeSource<Iterator>&&, ChunkedRangeSource<Iterator>> {
388   int chunkSize_;
389   Range<Iterator> range_;
390
391  public:
392   ChunkedRangeSource() = default;
393   ChunkedRangeSource(int chunkSize, Range<Iterator> range)
394       : chunkSize_(chunkSize), range_(std::move(range)) {}
395
396   template <class Handler>
397   bool apply(Handler&& handler) const {
398     auto remaining = range_;
399     while (!remaining.empty()) {
400       auto chunk = remaining.subpiece(0, chunkSize_);
401       remaining.advance(chunk.size());
402       auto gen = RangeSource<Iterator>(chunk);
403       if (!handler(std::move(gen))) {
404         return false;
405       }
406     }
407     return true;
408   }
409 };
410
411 } // namespace detail
412
413 } // namespace gen
414 } // namespace folly