Codemod: use #include angle brackets in folly and thrift
[folly.git] / folly / gen / Parallel-inl.h
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #ifndef FOLLY_GEN_PARALLEL_H_
18 #error This file may only be included from folly/gen/ParallelGen.h
19 #endif
20
21 #include <folly/MPMCQueue.h>
22 #include <folly/ScopeGuard.h>
23 #include <folly/experimental/EventCount.h>
24 #include <atomic>
25 #include <thread>
26 #include <vector>
27
28 namespace folly {
29 namespace gen {
30 namespace detail {
31
32 template <typename T>
33 class ClosableMPMCQueue {
34   MPMCQueue<T> queue_;
35   std::atomic<size_t> producers_{0};
36   std::atomic<size_t> consumers_{0};
37   folly::EventCount wakeProducer_;
38   folly::EventCount wakeConsumer_;
39
40  public:
41   explicit ClosableMPMCQueue(size_t capacity) : queue_(capacity) {}
42
43   ~ClosableMPMCQueue() {
44     CHECK(!producers());
45     CHECK(!consumers());
46   }
47
48   void openProducer() { ++producers_; }
49   void openConsumer() { ++consumers_; }
50
51   void closeInputProducer() {
52     int64_t producers = producers_--;
53     CHECK(producers);
54     if (producers == 1) { // last producer
55       wakeConsumer_.notifyAll();
56     }
57   }
58
59   void closeOutputConsumer() {
60     int64_t consumers = consumers_--;
61     CHECK(consumers);
62     if (consumers == 1) { // last consumer
63       wakeProducer_.notifyAll();
64     }
65   }
66
67   size_t producers() const {
68     return producers_.load(std::memory_order_acquire);
69   }
70
71   size_t consumers() const {
72     return consumers_.load(std::memory_order_acquire);
73   }
74
75   template <typename... Args>
76   bool writeUnlessFull(Args&&... args) noexcept {
77     if (queue_.write(std::forward<Args>(args)...)) {
78       // wake consumers to pick up new value
79       wakeConsumer_.notify();
80       return true;
81     }
82     return false;
83   }
84
85   template <typename... Args>
86   bool writeUnlessClosed(Args&&... args) {
87     // write if there's room
88     while (!queue_.writeIfNotFull(std::forward<Args>(args)...)) {
89       // if write fails, check if there are still consumers listening
90       auto key = wakeProducer_.prepareWait();
91       if (!consumers()) {
92         // no consumers left; bail out
93         wakeProducer_.cancelWait();
94         return false;
95       }
96       wakeProducer_.wait(key);
97     }
98     // wake consumers to pick up new value
99     wakeConsumer_.notify();
100     return true;
101   }
102
103   bool readUnlessEmpty(T& out) {
104     if (queue_.read(out)) {
105       // wake producers to fill empty space
106       wakeProducer_.notify();
107       return true;
108     }
109     return false;
110   }
111
112   bool readUnlessClosed(T& out) {
113     while (!queue_.readIfNotEmpty(out)) {
114       auto key = wakeConsumer_.prepareWait();
115       if (!producers()) {
116         // wake producers to fill empty space
117         wakeProducer_.notify();
118         return false;
119       }
120       wakeConsumer_.wait(key);
121     }
122     // wake writers blocked by full queue
123     wakeProducer_.notify();
124     return true;
125   }
126 };
127
128 template <class Sink>
129 class Sub : public Operator<Sub<Sink>> {
130   Sink sink_;
131
132  public:
133   explicit Sub(Sink sink) : sink_(sink) {}
134
135   template <class Value,
136             class Source,
137             class Result =
138                 decltype(std::declval<Sink>().compose(std::declval<Source>())),
139             class Just = Just<typename std::decay<Result>::type>>
140   Just compose(const GenImpl<Value, Source>& source) const {
141     return Just(source | sink_);
142   }
143 };
144
145 template <class Ops>
146 class Parallel : public Operator<Parallel<Ops>> {
147   Ops ops_;
148   size_t threads_;
149
150  public:
151   Parallel(Ops ops, size_t threads) : ops_(std::move(ops)), threads_(threads) {}
152
153   template <class Input,
154             class Source,
155             class InputDecayed = typename std::decay<Input>::type,
156             class Composed =
157                 decltype(std::declval<Ops>().compose(Empty<InputDecayed&&>())),
158             class Output = typename Composed::ValueType,
159             class OutputDecayed = typename std::decay<Output>::type>
160   class Generator : public GenImpl<OutputDecayed&&,
161                                    Generator<Input,
162                                              Source,
163                                              InputDecayed,
164                                              Composed,
165                                              Output,
166                                              OutputDecayed>> {
167     const Source source_;
168     const Ops ops_;
169     const size_t threads_;
170     typedef ClosableMPMCQueue<InputDecayed> InQueue;
171     typedef ClosableMPMCQueue<OutputDecayed> OutQueue;
172
173     class Puller : public GenImpl<InputDecayed&&, Puller> {
174       InQueue* queue_;
175
176      public:
177       explicit Puller(InQueue* queue) : queue_(queue) {}
178
179       template <class Handler>
180       bool apply(Handler&& handler) const {
181         InputDecayed input;
182         while (queue_->readUnlessClosed(input)) {
183           if (!handler(std::move(input))) {
184             return false;
185           }
186         }
187         return true;
188       }
189
190       template <class Body>
191       void foreach(Body&& body) const {
192         InputDecayed input;
193         while (queue_->readUnlessClosed(input)) {
194           body(std::move(input));
195         }
196       }
197     };
198
199     template <bool all = false>
200     class Pusher : public Operator<Pusher<all>> {
201       OutQueue* queue_;
202
203      public:
204       explicit Pusher(OutQueue* queue) : queue_(queue) {}
205
206       template <class Value, class InnerSource>
207       void compose(const GenImpl<Value, InnerSource>& source) const {
208         if (all) {
209           source.self().foreach([&](Value value) {
210             queue_->writeUnlessClosed(std::forward<Value>(value));
211           });
212         } else {
213           source.self().apply([&](Value value) {
214             return queue_->writeUnlessClosed(std::forward<Value>(value));
215           });
216         }
217       }
218     };
219
220     template <bool all = false>
221     class Executor {
222       InQueue inQueue_;
223       OutQueue outQueue_;
224       Puller puller_;
225       Pusher<all> pusher_;
226       std::vector<std::thread> workers_;
227       const Ops* ops_;
228
229       void work() {
230         puller_ | *ops_ | pusher_;
231       };
232
233      public:
234       Executor(size_t threads, const Ops* ops)
235           : inQueue_(threads * 4),
236             outQueue_(threads * 4),
237             puller_(&inQueue_),
238             pusher_(&outQueue_),
239             ops_(ops) {
240         inQueue_.openProducer();
241         outQueue_.openConsumer();
242         for (int t = 0; t < threads; ++t) {
243           inQueue_.openConsumer();
244           outQueue_.openProducer();
245           workers_.emplace_back([this] {
246             SCOPE_EXIT {
247               inQueue_.closeOutputConsumer();
248               outQueue_.closeInputProducer();
249             };
250             this->work();
251           });
252         }
253       }
254
255       ~Executor() {
256         if (inQueue_.producers()) {
257           inQueue_.closeInputProducer();
258         }
259         if (outQueue_.consumers()) {
260           outQueue_.closeOutputConsumer();
261         }
262         while (!workers_.empty()) {
263           workers_.back().join();
264           workers_.pop_back();
265         }
266         CHECK(!inQueue_.consumers());
267         CHECK(!outQueue_.producers());
268       }
269
270       void closeInputProducer() { inQueue_.closeInputProducer(); }
271
272       void closeOutputConsumer() { outQueue_.closeOutputConsumer(); }
273
274       bool writeUnlessClosed(Input&& input) {
275         return inQueue_.writeUnlessClosed(std::forward<Input>(input));
276       }
277
278       bool writeUnlessFull(Input&& input) {
279         return inQueue_.writeUnlessFull(std::forward<Input>(input));
280       }
281
282       bool readUnlessClosed(OutputDecayed& output) {
283         return outQueue_.readUnlessClosed(output);
284       }
285
286       bool readUnlessEmpty(OutputDecayed& output) {
287         return outQueue_.readUnlessEmpty(output);
288       }
289     };
290
291    public:
292     Generator(Source source, Ops ops, size_t threads)
293         : source_(std::move(source)),
294           ops_(std::move(ops)),
295           threads_(threads
296                        ?: std::max<size_t>(1, sysconf(_SC_NPROCESSORS_CONF))) {}
297
298     template <class Handler>
299     bool apply(Handler&& handler) const {
300       Executor<false> executor(threads_, &ops_);
301       bool more = true;
302       source_.apply([&](Input input) {
303         if (executor.writeUnlessFull(std::forward<Input>(input))) {
304           return true;
305         }
306         OutputDecayed output;
307         while (executor.readUnlessEmpty(output)) {
308           if (!handler(std::move(output))) {
309             more = false;
310             return false;
311           }
312         }
313         if (!executor.writeUnlessClosed(std::forward<Input>(input))) {
314           return false;
315         }
316         return true;
317       });
318       executor.closeInputProducer();
319
320       if (more) {
321         OutputDecayed output;
322         while (executor.readUnlessClosed(output)) {
323           if (!handler(std::move(output))) {
324             more = false;
325             break;
326           }
327         }
328       }
329       executor.closeOutputConsumer();
330
331       return more;
332     }
333
334     template <class Body>
335     void foreach(Body&& body) const {
336       Executor<true> executor(threads_, &ops_);
337       source_.foreach([&](Input input) {
338         if (executor.writeUnlessFull(std::forward<Input>(input))) {
339           return;
340         }
341         OutputDecayed output;
342         while (executor.readUnlessEmpty(output)) {
343           body(std::move(output));
344         }
345         CHECK(executor.writeUnlessClosed(std::forward<Input>(input)));
346       });
347       executor.closeInputProducer();
348
349       OutputDecayed output;
350       while (executor.readUnlessClosed(output)) {
351         body(std::move(output));
352       }
353       executor.closeOutputConsumer();
354     }
355   };
356
357   template <class Value, class Source>
358   Generator<Value, Source> compose(const GenImpl<Value, Source>& source) const {
359     return Generator<Value, Source>(source.self(), ops_, threads_);
360   }
361
362   template <class Value, class Source>
363   Generator<Value, Source> compose(GenImpl<Value, Source>&& source) const {
364     return Generator<Value, Source>(std::move(source.self()), ops_, threads_);
365   }
366 };
367
368 /**
369  * ChunkedRangeSource - For slicing up ranges into a sequence of chunks given a
370  * maximum chunk size.
371  *
372  * Usually used through the 'chunked' helper, like:
373  *
374  *   int n
375  *     = chunked(values)
376  *     | parallel  // each thread processes a chunk
377  *     | concat   // but can still process values one at a time
378  *     | filter(isPrime)
379  *     | atomic_count;
380  */
381 template <class Iterator>
382 class ChunkedRangeSource
383     : public GenImpl<RangeSource<Iterator>&&, ChunkedRangeSource<Iterator>> {
384   int chunkSize_;
385   Range<Iterator> range_;
386
387  public:
388   ChunkedRangeSource() {}
389   ChunkedRangeSource(int chunkSize, Range<Iterator> range)
390       : chunkSize_(chunkSize), range_(std::move(range)) {}
391
392   template <class Handler>
393   bool apply(Handler&& handler) const {
394     auto remaining = range_;
395     while (!remaining.empty()) {
396       auto chunk = remaining.subpiece(0, chunkSize_);
397       remaining.advance(chunk.size());
398       auto gen = RangeSource<Iterator>(chunk);
399       if (!handler(std::move(gen))) {
400         return false;
401       }
402     }
403     return true;
404   }
405 };
406
407 } // namespace detail
408
409 } // namespace gen
410 } // namespace folly