2 * Copyright 2014 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
24 #include <folly/Optional.h>
25 #include <folly/SmallLocks.h>
27 #include <folly/wangle/Try.h>
28 #include <folly/wangle/Promise.h>
29 #include <folly/wangle/Future.h>
30 #include <folly/wangle/Executor.h>
31 #include <folly/wangle/detail/FSM.h>
33 namespace folly { namespace wangle { namespace detail {
35 // As of GCC 4.8.1, the std::function in libstdc++ optimizes only for pointers
36 // to functions, using a helper avoids a call to malloc.
38 void empty_callback(Try<T>&&) { }
47 /** The shared state object for Future and Promise. */
49 class Core : protected FSM<State> {
51 // This must be heap-constructed. There's probably a way to enforce that in
52 // code but since this is just internal detail code and I don't know how
53 // off-hand, I'm punting.
54 Core() : FSM<State>(State::Waiting) {}
57 assert(detached_ == 2);
61 Core(Core const&) = delete;
62 Core& operator=(Core const&) = delete;
64 // not movable (see comment in the implementation of Future::then)
65 Core(Core&&) noexcept = delete;
66 Core& operator=(Core&&) = delete;
72 throw FutureNotReady();
77 void setCallback(F func) {
78 auto setCallback_ = [&]{
80 throw std::logic_error("setCallback called twice");
83 callback_ = std::move(func);
88 case State::Interruptible:
89 case State::Interrupted:
90 FSM_UPDATE(state, setCallback_);
94 FSM_UPDATE2(State::Done,
96 [&]{ maybeCallback(); });
101 void setResult(Try<T>&& t) {
104 case State::Interruptible:
105 case State::Interrupted:
106 FSM_UPDATE2(State::Done,
107 [&]{ result_ = std::move(t); },
108 [&]{ maybeCallback(); });
112 throw std::logic_error("setResult called twice");
117 return getState() == State::Done;
120 // Called by a destructing Future
121 void detachFuture() {
123 setCallback(empty_callback<T>);
129 // Called by a destructing Promise
130 void detachPromise() {
132 setResult(Try<T>(std::make_exception_ptr(BrokenPromise())));
148 bool isActive() { return active_; }
150 void setExecutor(Executor* x) {
154 void raise(std::exception_ptr const& e) {
156 case State::Interruptible:
157 FSM_UPDATE2(State::Interrupted,
158 [&]{ interrupt_ = e; },
159 [&]{ interruptHandler_(interrupt_); });
163 case State::Interrupted:
164 FSM_UPDATE(State::Interrupted,
165 [&]{ interrupt_ = e; });
173 void setInterruptHandler(std::function<void(std::exception_ptr const&)> fn) {
176 case State::Interruptible:
177 FSM_UPDATE(State::Interruptible,
178 [&]{ interruptHandler_ = std::move(fn); });
181 case State::Interrupted:
191 void maybeCallback() {
193 if (!calledBack_ && isActive() && callback_) {
194 // TODO(5306911) we should probably try/catch
196 Executor* x = executor_;
198 MoveWrapper<std::function<void(Try<T>&&)>> cb(std::move(callback_));
199 MoveWrapper<folly::Optional<Try<T>>> val(std::move(result_));
200 x->add([cb, val]() mutable { (*cb)(std::move(**val)); });
202 callback_(std::move(*result_));
208 auto d = ++detached_;
212 // we should have already executed the callback with the value
218 folly::Optional<Try<T>> result_;
219 std::function<void(Try<T>&&)> callback_;
220 std::atomic<bool> calledBack_ {false};
221 std::atomic<unsigned char> detached_ {0};
222 std::atomic<bool> active_ {true};
223 std::atomic<Executor*> executor_ {nullptr};
224 std::exception_ptr interrupt_;
225 std::function<void(std::exception_ptr const&)> interruptHandler_;
228 template <typename... Ts>
229 struct VariadicContext {
230 VariadicContext() : total(0), count(0) {}
231 Promise<std::tuple<Try<Ts>... > > p;
232 std::tuple<Try<Ts>... > results;
234 std::atomic<size_t> count;
235 typedef Future<std::tuple<Try<Ts>...>> type;
238 template <typename... Ts, typename THead, typename... Fs>
239 typename std::enable_if<sizeof...(Fs) == 0, void>::type
240 whenAllVariadicHelper(VariadicContext<Ts...> *ctx, THead&& head, Fs&&... tail) {
241 head.setCallback_([ctx](Try<typename THead::value_type>&& t) {
242 std::get<sizeof...(Ts) - sizeof...(Fs) - 1>(ctx->results) = std::move(t);
243 if (++ctx->count == ctx->total) {
244 ctx->p.setValue(std::move(ctx->results));
250 template <typename... Ts, typename THead, typename... Fs>
251 typename std::enable_if<sizeof...(Fs) != 0, void>::type
252 whenAllVariadicHelper(VariadicContext<Ts...> *ctx, THead&& head, Fs&&... tail) {
253 head.setCallback_([ctx](Try<typename THead::value_type>&& t) {
254 std::get<sizeof...(Ts) - sizeof...(Fs) - 1>(ctx->results) = std::move(t);
255 if (++ctx->count == ctx->total) {
256 ctx->p.setValue(std::move(ctx->results));
260 // template tail-recursion
261 whenAllVariadicHelper(ctx, std::forward<Fs>(tail)...);
264 template <typename T>
265 struct WhenAllContext {
266 explicit WhenAllContext() : count(0), total(0) {}
267 Promise<std::vector<Try<T> > > p;
268 std::vector<Try<T> > results;
269 std::atomic<size_t> count;
273 template <typename T>
274 struct WhenAnyContext {
275 explicit WhenAnyContext(size_t n) : done(false), ref_count(n) {};
276 Promise<std::pair<size_t, Try<T>>> p;
277 std::atomic<bool> done;
278 std::atomic<size_t> ref_count;
280 if (--ref_count == 0) {
286 template <typename T>
287 struct WhenAllLaterContext {
288 explicit WhenAllLaterContext() : count(0), total(0) {}
289 std::function<void(std::vector<Try<T>>&&)> fn;
290 std::vector<Try<T> > results;
291 std::atomic<size_t> count;