f3aa6d24416d247d00e5bd6c1f7bd8ce067973dd
[folly.git] / folly / wangle / detail / Core.h
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <atomic>
20 #include <mutex>
21 #include <stdexcept>
22 #include <vector>
23
24 #include <folly/Optional.h>
25 #include <folly/SmallLocks.h>
26
27 #include <folly/wangle/Try.h>
28 #include <folly/wangle/Promise.h>
29 #include <folly/wangle/Future.h>
30 #include <folly/wangle/Executor.h>
31 #include <folly/wangle/detail/FSM.h>
32
33 namespace folly { namespace wangle { namespace detail {
34
35 // As of GCC 4.8.1, the std::function in libstdc++ optimizes only for pointers
36 // to functions, using a helper avoids a call to malloc.
37 template<typename T>
38 void empty_callback(Try<T>&&) { }
39
40 enum class State {
41   Waiting,
42   Interruptible,
43   Interrupted,
44   Done,
45 };
46
47 /** The shared state object for Future and Promise. */
48 template<typename T>
49 class Core : protected FSM<State> {
50  public:
51   // This must be heap-constructed. There's probably a way to enforce that in
52   // code but since this is just internal detail code and I don't know how
53   // off-hand, I'm punting.
54   Core() : FSM<State>(State::Waiting) {}
55   ~Core() {
56     assert(calledBack_);
57     assert(detached_ == 2);
58   }
59
60   // not copyable
61   Core(Core const&) = delete;
62   Core& operator=(Core const&) = delete;
63
64   // not movable (see comment in the implementation of Future::then)
65   Core(Core&&) noexcept = delete;
66   Core& operator=(Core&&) = delete;
67
68   Try<T>& getTry() {
69     if (ready()) {
70       return *result_;
71     } else {
72       throw FutureNotReady();
73     }
74   }
75
76   template <typename F>
77   void setCallback(F func) {
78     auto setCallback_ = [&]{
79       if (callback_) {
80         throw std::logic_error("setCallback called twice");
81       }
82
83       callback_ = std::move(func);
84     };
85
86     FSM_START
87       case State::Waiting:
88       case State::Interruptible:
89       case State::Interrupted:
90         FSM_UPDATE(state, setCallback_);
91         break;
92
93       case State::Done:
94         FSM_UPDATE2(State::Done,
95           setCallback_,
96           [&]{ maybeCallback(); });
97         break;
98     FSM_END
99   }
100
101   void setResult(Try<T>&& t) {
102     FSM_START
103       case State::Waiting:
104       case State::Interruptible:
105       case State::Interrupted:
106         FSM_UPDATE2(State::Done,
107           [&]{ result_ = std::move(t); },
108           [&]{ maybeCallback(); });
109         break;
110
111       case State::Done:
112         throw std::logic_error("setResult called twice");
113     FSM_END
114   }
115
116   bool ready() const {
117     return getState() == State::Done;
118   }
119
120   // Called by a destructing Future
121   void detachFuture() {
122     if (!callback_) {
123       setCallback(empty_callback<T>);
124     }
125     activate();
126     detachOne();
127   }
128
129   // Called by a destructing Promise
130   void detachPromise() {
131     if (!ready()) {
132       setResult(Try<T>(std::make_exception_ptr(BrokenPromise())));
133     }
134     detachOne();
135   }
136
137   void deactivate() {
138     active_ = false;
139   }
140
141   void activate() {
142     active_ = true;
143     if (ready()) {
144       maybeCallback();
145     }
146   }
147
148   bool isActive() { return active_; }
149
150   void setExecutor(Executor* x) {
151     executor_ = x;
152   }
153
154   void raise(std::exception_ptr const& e) {
155     FSM_START
156       case State::Interruptible:
157         FSM_UPDATE2(State::Interrupted,
158           [&]{ interrupt_ = e; },
159           [&]{ interruptHandler_(interrupt_); });
160         break;
161
162       case State::Waiting:
163       case State::Interrupted:
164         FSM_UPDATE(State::Interrupted,
165           [&]{ interrupt_ = e; });
166         break;
167
168       case State::Done:
169         FSM_BREAK
170     FSM_END
171   }
172
173   void setInterruptHandler(std::function<void(std::exception_ptr const&)> fn) {
174     FSM_START
175       case State::Waiting:
176       case State::Interruptible:
177         FSM_UPDATE(State::Interruptible,
178           [&]{ interruptHandler_ = std::move(fn); });
179         break;
180
181       case State::Interrupted:
182         fn(interrupt_);
183         FSM_BREAK
184
185       case State::Done:
186         FSM_BREAK
187     FSM_END
188   }
189
190  private:
191   void maybeCallback() {
192     assert(ready());
193     if (!calledBack_ && isActive() && callback_) {
194       // TODO(5306911) we should probably try/catch
195       calledBack_ = true;
196       Executor* x = executor_;
197       if (x) {
198         MoveWrapper<std::function<void(Try<T>&&)>> cb(std::move(callback_));
199         MoveWrapper<folly::Optional<Try<T>>> val(std::move(result_));
200         x->add([cb, val]() mutable { (*cb)(std::move(**val)); });
201       } else {
202         callback_(std::move(*result_));
203       }
204     }
205   }
206
207   void detachOne() {
208     auto d = ++detached_;
209     assert(d >= 1);
210     assert(d <= 2);
211     if (d == 2) {
212       // we should have already executed the callback with the value
213       assert(calledBack_);
214       delete this;
215     }
216   }
217
218   folly::Optional<Try<T>> result_;
219   std::function<void(Try<T>&&)> callback_;
220   std::atomic<bool> calledBack_ {false};
221   std::atomic<unsigned char> detached_ {0};
222   std::atomic<bool> active_ {true};
223   std::atomic<Executor*> executor_ {nullptr};
224   std::exception_ptr interrupt_;
225   std::function<void(std::exception_ptr const&)> interruptHandler_;
226 };
227
228 template <typename... Ts>
229 struct VariadicContext {
230   VariadicContext() : total(0), count(0) {}
231   Promise<std::tuple<Try<Ts>... > > p;
232   std::tuple<Try<Ts>... > results;
233   size_t total;
234   std::atomic<size_t> count;
235   typedef Future<std::tuple<Try<Ts>...>> type;
236 };
237
238 template <typename... Ts, typename THead, typename... Fs>
239 typename std::enable_if<sizeof...(Fs) == 0, void>::type
240 whenAllVariadicHelper(VariadicContext<Ts...> *ctx, THead&& head, Fs&&... tail) {
241   head.setCallback_([ctx](Try<typename THead::value_type>&& t) {
242     std::get<sizeof...(Ts) - sizeof...(Fs) - 1>(ctx->results) = std::move(t);
243     if (++ctx->count == ctx->total) {
244       ctx->p.setValue(std::move(ctx->results));
245       delete ctx;
246     }
247   });
248 }
249
250 template <typename... Ts, typename THead, typename... Fs>
251 typename std::enable_if<sizeof...(Fs) != 0, void>::type
252 whenAllVariadicHelper(VariadicContext<Ts...> *ctx, THead&& head, Fs&&... tail) {
253   head.setCallback_([ctx](Try<typename THead::value_type>&& t) {
254     std::get<sizeof...(Ts) - sizeof...(Fs) - 1>(ctx->results) = std::move(t);
255     if (++ctx->count == ctx->total) {
256       ctx->p.setValue(std::move(ctx->results));
257       delete ctx;
258     }
259   });
260   // template tail-recursion
261   whenAllVariadicHelper(ctx, std::forward<Fs>(tail)...);
262 }
263
264 template <typename T>
265 struct WhenAllContext {
266   explicit WhenAllContext() : count(0), total(0) {}
267   Promise<std::vector<Try<T> > > p;
268   std::vector<Try<T> > results;
269   std::atomic<size_t> count;
270   size_t total;
271 };
272
273 template <typename T>
274 struct WhenAnyContext {
275   explicit WhenAnyContext(size_t n) : done(false), ref_count(n) {};
276   Promise<std::pair<size_t, Try<T>>> p;
277   std::atomic<bool> done;
278   std::atomic<size_t> ref_count;
279   void decref() {
280     if (--ref_count == 0) {
281       delete this;
282     }
283   }
284 };
285
286 template <typename T>
287 struct WhenAllLaterContext {
288   explicit WhenAllLaterContext() : count(0), total(0) {}
289   std::function<void(std::vector<Try<T>>&&)> fn;
290   std::vector<Try<T> > results;
291   std::atomic<size_t> count;
292   size_t total;
293 };
294
295 }}} // namespace