2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
22 #include <folly/Baton.h>
23 #include <folly/futures/detail/Core.h>
24 #include <folly/futures/Timekeeper.h>
31 Timekeeper* getTimekeeperSingleton();
35 Future<T>::Future(Future<T>&& other) noexcept : core_(nullptr) {
36 *this = std::move(other);
40 Future<T>& Future<T>::operator=(Future<T>&& other) {
41 std::swap(core_, other.core_);
48 const typename std::enable_if<!std::is_void<F>::value, F>::type& val)
52 *this = p.getFuture();
58 typename std::enable_if<!std::is_void<F>::value, F>::type&& val)
61 p.setValue(std::forward<F>(val));
62 *this = p.getFuture();
67 typename std::enable_if<std::is_void<F>::value, int>::type>
68 Future<void>::Future() : core_(nullptr) {
71 *this = p.getFuture();
76 Future<T>::~Future() {
81 void Future<T>::detach() {
83 core_->detachFuture();
89 void Future<T>::throwIfInvalid() const {
96 void Future<T>::setCallback_(F&& func) {
98 core_->setCallback(std::move(func));
101 // Variant: returns a value
102 // e.g. f.then([](Try<T>&& t){ return t.value(); });
104 template <typename F, typename R, bool isTry, typename... Args>
105 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
106 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
107 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
108 typedef typename R::ReturnsFuture::Inner B;
112 // wrap these so we can move them into the lambda
113 folly::MoveWrapper<Promise<B>> p;
114 folly::MoveWrapper<F> funcm(std::forward<F>(func));
116 // grab the Future now before we lose our handle on the Promise
117 auto f = p->getFuture();
119 f.setExecutor(getExecutor());
122 /* This is a bit tricky.
124 We can't just close over *this in case this Future gets moved. So we
125 make a new dummy Future. We could figure out something more
126 sophisticated that avoids making a new Future object when it can, as an
127 optimization. But this is correct.
129 core_ can't be moved, it is explicitly disallowed (as is copying). But
130 if there's ever a reason to allow it, this is one place that makes that
131 assumption and would need to be fixed. We use a standard shared pointer
132 for core_ (by copying it in), which means in essence obj holds a shared
133 pointer to itself. But this shouldn't leak because Promise will not
134 outlive the continuation, because Promise will setException() with a
135 broken Promise if it is destructed before completed. We could use a
136 weak pointer but it would have to be converted to a shared pointer when
137 func is executed (because the Future returned by func may possibly
138 persist beyond the callback, if it gets moved), and so it is an
139 optimization to just make it shared from the get-go.
141 We have to move in the Promise and func using the MoveWrapper
142 hack. (func could be copied but it's a big drag on perf).
144 Two subtle but important points about this design. detail::Core has no
145 back pointers to Future or Promise, so if Future or Promise get moved
146 (and they will be moved in performant code) we don't have to do
147 anything fancy. And because we store the continuation in the
148 detail::Core, not in the Future, we can execute the continuation even
149 after the Future has gone out of scope. This is an intentional design
150 decision. It is likely we will want to be able to cancel a continuation
151 in some circumstances, but I think it should be explicit not implicit
152 in the destruction of the Future used to create it.
155 [p, funcm](Try<T>&& t) mutable {
156 if (!isTry && t.hasException()) {
157 p->setException(std::move(t.exception()));
160 return (*funcm)(t.template get<isTry, Args>()...);
168 // Variant: returns a Future
169 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
171 template <typename F, typename R, bool isTry, typename... Args>
172 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
173 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
174 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
175 typedef typename R::ReturnsFuture::Inner B;
179 // wrap these so we can move them into the lambda
180 folly::MoveWrapper<Promise<B>> p;
181 folly::MoveWrapper<F> funcm(std::forward<F>(func));
183 // grab the Future now before we lose our handle on the Promise
184 auto f = p->getFuture();
186 f.setExecutor(getExecutor());
190 [p, funcm](Try<T>&& t) mutable {
191 if (!isTry && t.hasException()) {
192 p->setException(std::move(t.exception()));
195 auto f2 = (*funcm)(t.template get<isTry, Args>()...);
196 // that didn't throw, now we can steal p
197 f2.setCallback_([p](Try<B>&& b) mutable {
198 p->fulfilTry(std::move(b));
200 } catch (const std::exception& e) {
201 p->setException(exception_wrapper(std::current_exception(), e));
203 p->setException(exception_wrapper(std::current_exception()));
211 template <typename T>
212 template <typename R, typename Caller, typename... Args>
213 Future<typename isFuture<R>::Inner>
214 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
215 typedef typename std::remove_cv<
216 typename std::remove_reference<
217 typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
218 return then([instance, func](Try<T>&& t){
219 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
224 Future<void> Future<T>::then() {
225 return then([] (Try<T>&& t) {});
228 // onError where the callback returns T
231 typename std::enable_if<
232 !detail::Extract<F>::ReturnsFuture::value,
234 Future<T>::onError(F&& func) {
235 typedef typename detail::Extract<F>::FirstArg Exn;
237 std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
238 "Return type of onError callback must be T or Future<T>");
241 auto f = p.getFuture();
242 auto pm = folly::makeMoveWrapper(std::move(p));
243 auto funcm = folly::makeMoveWrapper(std::move(func));
244 setCallback_([pm, funcm](Try<T>&& t) mutable {
245 if (!t.template withException<Exn>([&] (Exn& e) {
250 pm->fulfilTry(std::move(t));
257 // onError where the callback returns Future<T>
260 typename std::enable_if<
261 detail::Extract<F>::ReturnsFuture::value,
263 Future<T>::onError(F&& func) {
265 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
266 "Return type of onError callback must be T or Future<T>");
267 typedef typename detail::Extract<F>::FirstArg Exn;
270 auto f = p.getFuture();
271 auto pm = folly::makeMoveWrapper(std::move(p));
272 auto funcm = folly::makeMoveWrapper(std::move(func));
273 setCallback_([pm, funcm](Try<T>&& t) mutable {
274 if (!t.template withException<Exn>([&] (Exn& e) {
276 auto f2 = (*funcm)(e);
277 f2.setCallback_([pm](Try<T>&& t2) mutable {
278 pm->fulfilTry(std::move(t2));
280 } catch (const std::exception& e2) {
281 pm->setException(exception_wrapper(std::current_exception(), e2));
283 pm->setException(exception_wrapper(std::current_exception()));
286 pm->fulfilTry(std::move(t));
295 Future<T> Future<T>::ensure(F func) {
296 MoveWrapper<F> funcw(std::move(func));
297 return this->then([funcw](Try<T>&& t) {
299 return makeFuture(std::move(t));
305 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
306 auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
307 return within(dur, tk)
308 .onError([funcw](TimedOut const&) { return (*funcw)(); });
312 typename std::add_lvalue_reference<T>::type Future<T>::value() {
315 return core_->getTry().value();
319 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
322 return core_->getTry().value();
326 Try<T>& Future<T>::getTry() {
329 return core_->getTry();
333 template <typename Executor>
334 inline Future<T> Future<T>::via(Executor* executor) && {
337 setExecutor(executor);
339 return std::move(*this);
343 template <typename Executor>
344 inline Future<T> Future<T>::via(Executor* executor) & {
347 MoveWrapper<Promise<T>> p;
348 auto f = p->getFuture();
349 then([p](Try<T>&& t) mutable { p->fulfilTry(std::move(t)); });
350 return std::move(f).via(executor);
354 bool Future<T>::isReady() const {
356 return core_->ready();
360 void Future<T>::raise(exception_wrapper exception) {
361 core_->raise(std::move(exception));
367 Future<typename std::decay<T>::type> makeFuture(T&& t) {
368 Promise<typename std::decay<T>::type> p;
369 p.setValue(std::forward<T>(t));
370 return p.getFuture();
373 inline // for multiple translation units
374 Future<void> makeFuture() {
377 return p.getFuture();
383 typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
384 -> Future<decltype(func())> {
385 Promise<decltype(func())> p;
390 return p.getFuture();
394 auto makeFutureTry(F const& func) -> Future<decltype(func())> {
396 return makeFutureTry(std::move(copy));
400 Future<T> makeFuture(std::exception_ptr const& e) {
403 return p.getFuture();
407 Future<T> makeFuture(exception_wrapper ew) {
409 p.setException(std::move(ew));
410 return p.getFuture();
413 template <class T, class E>
414 typename std::enable_if<std::is_base_of<std::exception, E>::value,
416 makeFuture(E const& e) {
418 p.setException(make_exception_wrapper<E>(e));
419 return p.getFuture();
423 Future<T> makeFuture(Try<T>&& t) {
424 Promise<typename std::decay<T>::type> p;
425 p.fulfilTry(std::move(t));
426 return p.getFuture();
430 inline Future<void> makeFuture(Try<void>&& t) {
431 if (t.hasException()) {
432 return makeFuture<void>(std::move(t.exception()));
439 template <typename Executor>
440 Future<void> via(Executor* executor) {
441 return makeFuture().via(executor);
446 template <typename... Fs>
447 typename detail::VariadicContext<
448 typename std::decay<Fs>::type::value_type...>::type
452 new detail::VariadicContext<typename std::decay<Fs>::type::value_type...>();
453 ctx->total = sizeof...(fs);
454 auto f_saved = ctx->p.getFuture();
455 detail::whenAllVariadicHelper(ctx,
456 std::forward<typename std::decay<Fs>::type>(fs)...);
462 template <class InputIterator>
465 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
466 whenAll(InputIterator first, InputIterator last)
469 typename std::iterator_traits<InputIterator>::value_type::value_type T;
472 return makeFuture(std::vector<Try<T>>());
474 size_t n = std::distance(first, last);
476 auto ctx = new detail::WhenAllContext<T>();
478 ctx->results.resize(n);
480 auto f_saved = ctx->p.getFuture();
482 for (size_t i = 0; first != last; ++first, ++i) {
485 f.setCallback_([ctx, i, n](Try<T>&& t) {
486 ctx->results[i] = std::move(t);
487 if (++ctx->count == n) {
488 ctx->p.setValue(std::move(ctx->results));
497 template <class InputIterator>
502 std::iterator_traits<InputIterator>::value_type::value_type> > >
503 whenAny(InputIterator first, InputIterator last) {
505 typename std::iterator_traits<InputIterator>::value_type::value_type T;
507 auto ctx = new detail::WhenAnyContext<T>(std::distance(first, last));
508 auto f_saved = ctx->p.getFuture();
510 for (size_t i = 0; first != last; first++, i++) {
512 f.setCallback_([i, ctx](Try<T>&& t) {
513 if (!ctx->done.exchange(true)) {
514 ctx->p.setValue(std::make_pair(i, std::move(t)));
523 template <class InputIterator>
524 Future<std::vector<std::pair<size_t, Try<typename
525 std::iterator_traits<InputIterator>::value_type::value_type>>>>
526 whenN(InputIterator first, InputIterator last, size_t n) {
528 std::iterator_traits<InputIterator>::value_type::value_type T;
529 typedef std::vector<std::pair<size_t, Try<T>>> V;
536 auto ctx = std::make_shared<ctx_t>();
539 // for each completed Future, increase count and add to vector, until we
540 // have n completed futures at which point we fulfil our Promise with the
545 it->then([ctx, n, i](Try<T>&& t) {
547 auto c = ++ctx->completed;
549 assert(ctx->v.size() < n);
550 v.push_back(std::make_pair(i, std::move(t)));
552 ctx->p.fulfilTry(Try<V>(std::move(v)));
562 ctx->p.setException(std::runtime_error("Not enough futures"));
565 return ctx->p.getFuture();
569 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
570 return within(dur, TimedOut(), tk);
575 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
578 Context(E ex) : exception(std::move(ex)), promise(), token(false) {}
581 std::atomic<bool> token;
583 auto ctx = std::make_shared<Context>(std::move(e));
586 tk = folly::detail::getTimekeeperSingleton();
590 .then([ctx](Try<void> const& t) {
591 if (ctx->token.exchange(true) == false) {
592 if (t.hasException()) {
593 ctx->promise.setException(std::move(t.exception()));
595 ctx->promise.setException(std::move(ctx->exception));
600 this->then([ctx](Try<T>&& t) {
601 if (ctx->token.exchange(true) == false) {
602 ctx->promise.fulfilTry(std::move(t));
606 return ctx->promise.getFuture();
610 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
611 return whenAll(*this, futures::sleep(dur, tk))
612 .then([](std::tuple<Try<T>, Try<void>> tup) {
613 Try<T>& t = std::get<0>(tup);
614 return makeFuture<T>(std::move(t));
621 void waitImpl(Future<T>& f) {
622 // short-circuit if there's nothing to do
623 if (f.isReady()) return;
626 f = f.then([&](Try<T> t) {
628 return makeFuture(std::move(t));
632 // There's a race here between the return here and the actual finishing of
633 // the future. f is completed, but the setup may not have finished on done
634 // after the baton has posted.
635 while (!f.isReady()) {
636 std::this_thread::yield();
641 void waitImpl(Future<T>& f, Duration dur) {
642 // short-circuit if there's nothing to do
643 if (f.isReady()) return;
645 auto baton = std::make_shared<Baton<>>();
646 f = f.then([baton](Try<T> t) {
648 return makeFuture(std::move(t));
651 // Let's preserve the invariant that if we did not timeout (timed_wait returns
652 // true), then the returned Future is complete when it is returned to the
653 // caller. We need to wait out the race for that Future to complete.
654 if (baton->timed_wait(std::chrono::system_clock::now() + dur)) {
655 while (!f.isReady()) {
656 std::this_thread::yield();
662 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
663 while (!f.isReady()) {
671 Future<T>& Future<T>::wait() & {
672 detail::waitImpl(*this);
677 Future<T>&& Future<T>::wait() && {
678 detail::waitImpl(*this);
679 return std::move(*this);
683 Future<T>& Future<T>::wait(Duration dur) & {
684 detail::waitImpl(*this, dur);
689 Future<T>&& Future<T>::wait(Duration dur) && {
690 detail::waitImpl(*this, dur);
691 return std::move(*this);
695 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
696 detail::waitViaImpl(*this, e);
701 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
702 detail::waitViaImpl(*this, e);
703 return std::move(*this);
708 return std::move(wait().value());
712 inline void Future<void>::get() {
717 T Future<T>::get(Duration dur) {
720 return std::move(value());
727 inline void Future<void>::get(Duration dur) {
737 T Future<T>::getVia(DrivableExecutor* e) {
738 return std::move(waitVia(e).value());
742 inline void Future<void>::getVia(DrivableExecutor* e) {
750 Future<Z> chainHelper(Future<Z> f) {
754 template <class Z, class F, class Fn, class... Callbacks>
755 Future<Z> chainHelper(F f, Fn fn, Callbacks... fns) {
756 return chainHelper<Z>(f.then(fn), fns...);
760 template <class A, class Z, class... Callbacks>
761 std::function<Future<Z>(Try<A>)>
762 chain(Callbacks... fns) {
763 MoveWrapper<Promise<A>> pw;
764 MoveWrapper<Future<Z>> fw(chainHelper<Z>(pw->getFuture(), fns...));
765 return [=](Try<A> t) mutable {
766 pw->fulfilTry(std::move(t));
767 return std::move(*fw);
775 // I haven't included a Future<T&> specialization because I don't forsee us
776 // using it, however it is not difficult to add when needed. Refer to
777 // Future<void> for guidance. std::future and boost::future code would also be