Apply clang-format to folly/gen/ (template decls)
[folly.git] / folly / gen / ParallelMap-inl.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #ifndef FOLLY_GEN_PARALLELMAP_H_
18 #error This file may only be included from folly/gen/ParallelMap.h
19 #endif
20
21 #include <atomic>
22 #include <cassert>
23 #include <thread>
24 #include <type_traits>
25 #include <utility>
26 #include <vector>
27
28 #include <folly/MPMCPipeline.h>
29 #include <folly/experimental/EventCount.h>
30
31 namespace folly {
32 namespace gen {
33 namespace detail {
34
35 /**
36  * PMap - Map in parallel (using threads). For producing a sequence of
37  * values by passing each value from a source collection through a
38  * predicate while running the predicate in parallel in different
39  * threads.
40  *
41  * This type is usually used through the 'pmap' helper function:
42  *
43  *   auto squares = seq(1, 10) | pmap(fibonacci, 4) | sum;
44  */
45 template <class Predicate>
46 class PMap : public Operator<PMap<Predicate>> {
47   Predicate pred_;
48   size_t nThreads_;
49  public:
50   PMap() = default;
51
52   PMap(Predicate pred, size_t nThreads)
53     : pred_(std::move(pred)),
54       nThreads_(nThreads) { }
55
56   template <
57       class Value,
58       class Source,
59       class Input = typename std::decay<Value>::type,
60       class Output = typename std::decay<
61           typename std::result_of<Predicate(Value)>::type>::type>
62   class Generator :
63     public GenImpl<Output, Generator<Value, Source, Input, Output>> {
64     Source source_;
65     Predicate pred_;
66     const size_t nThreads_;
67
68     class ExecutionPipeline {
69       std::vector<std::thread> workers_;
70       std::atomic<bool> done_{false};
71       const Predicate& pred_;
72       MPMCPipeline<Input, Output> pipeline_;
73       EventCount wake_;
74
75      public:
76       ExecutionPipeline(const Predicate& pred, size_t nThreads)
77         : pred_(pred),
78           pipeline_(nThreads, nThreads) {
79         workers_.reserve(nThreads);
80         for (size_t i = 0; i < nThreads; i++) {
81           workers_.push_back(std::thread([this] { this->predApplier(); }));
82         }
83       }
84
85       ~ExecutionPipeline() {
86         assert(pipeline_.sizeGuess() == 0);
87         assert(done_.load());
88         for (auto& w : workers_) { w.join(); }
89       }
90
91       void stop() {
92         // prevent workers from consuming more than we produce.
93         done_.store(true, std::memory_order_release);
94         wake_.notifyAll();
95       }
96
97       bool write(Value&& value) {
98         bool wrote = pipeline_.write(std::forward<Value>(value));
99         if (wrote) {
100           wake_.notify();
101         }
102         return wrote;
103       }
104
105       void blockingWrite(Value&& value) {
106         pipeline_.blockingWrite(std::forward<Value>(value));
107         wake_.notify();
108       }
109
110       bool read(Output& out) {
111         return pipeline_.read(out);
112       }
113
114       void blockingRead(Output& out) {
115         pipeline_.blockingRead(out);
116       }
117
118      private:
119       void predApplier() {
120         // Each thread takes a value from the pipeline_, runs the
121         // predicate and enqueues the result. The pipeline preserves
122         // ordering. NOTE: don't use blockingReadStage<0> to read from
123         // the pipeline_ as there may not be any: end-of-data is signaled
124         // separately using done_/wake_.
125         Input in;
126         for (;;) {
127           auto key = wake_.prepareWait();
128
129           typename MPMCPipeline<Input, Output>::template Ticket<0> ticket;
130           if (pipeline_.template readStage<0>(ticket, in)) {
131             wake_.cancelWait();
132             Output out = pred_(std::move(in));
133             pipeline_.template blockingWriteStage<0>(ticket,
134                                                      std::move(out));
135             continue;
136           }
137
138           if (done_.load(std::memory_order_acquire)) {
139             wake_.cancelWait();
140             break;
141           }
142
143           // Not done_, but no items in the queue.
144           wake_.wait(key);
145         }
146       }
147     };
148
149   public:
150     Generator(Source source, const Predicate& pred, size_t nThreads)
151       : source_(std::move(source)),
152         pred_(pred),
153         nThreads_(nThreads ? nThreads : sysconf(_SC_NPROCESSORS_ONLN)) {
154     }
155
156     template <class Body>
157     void foreach(Body&& body) const {
158       ExecutionPipeline pipeline(pred_, nThreads_);
159
160       size_t wrote = 0;
161       size_t read = 0;
162       source_.foreach([&](Value value) {
163         if (pipeline.write(std::forward<Value>(value))) {
164           // input queue not yet full, saturate it before we process
165           // anything downstream
166           ++wrote;
167           return;
168         }
169
170         // input queue full; drain ready items from the queue
171         Output out;
172         while (pipeline.read(out)) {
173           ++read;
174           body(std::move(out));
175         }
176
177         // write the value we were going to write before we made room.
178         pipeline.blockingWrite(std::forward<Value>(value));
179         ++wrote;
180       });
181
182       pipeline.stop();
183
184       // flush the output queue
185       while (read < wrote) {
186         Output out;
187         pipeline.blockingRead(out);
188         ++read;
189         body(std::move(out));
190       }
191     }
192
193     template <class Handler>
194     bool apply(Handler&& handler) const {
195       ExecutionPipeline pipeline(pred_, nThreads_);
196
197       size_t wrote = 0;
198       size_t read = 0;
199       bool more = true;
200       source_.apply([&](Value value) {
201         if (pipeline.write(std::forward<Value>(value))) {
202           // input queue not yet full, saturate it before we process
203           // anything downstream
204           ++wrote;
205           return true;
206         }
207
208         // input queue full; drain ready items from the queue
209         Output out;
210         while (pipeline.read(out)) {
211           ++read;
212           if (!handler(std::move(out))) {
213             more = false;
214             return false;
215           }
216         }
217
218         // write the value we were going to write before we made room.
219         pipeline.blockingWrite(std::forward<Value>(value));
220         ++wrote;
221         return true;
222       });
223
224       pipeline.stop();
225
226       // flush the output queue
227       while (read < wrote) {
228         Output out;
229         pipeline.blockingRead(out);
230         ++read;
231         if (more) {
232           more = more && handler(std::move(out));
233         }
234       }
235       return more;
236     }
237
238     static constexpr bool infinite = Source::infinite;
239   };
240
241   template <class Source, class Value, class Gen = Generator<Value, Source>>
242   Gen compose(GenImpl<Value, Source>&& source) const {
243     return Gen(std::move(source.self()), pred_, nThreads_);
244   }
245
246   template <class Source, class Value, class Gen = Generator<Value, Source>>
247   Gen compose(const GenImpl<Value, Source>& source) const {
248     return Gen(source.self(), pred_, nThreads_);
249   }
250 };
251 } // namespace detail
252 } // namespace gen
253 } // namespace folly