2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
22 #include <folly/Baton.h>
23 #include <folly/Optional.h>
24 #include <folly/futures/detail/Core.h>
25 #include <folly/futures/Timekeeper.h>
32 Timekeeper* getTimekeeperSingleton();
36 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
37 other.core_ = nullptr;
41 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
42 std::swap(core_, other.core_);
49 const typename std::enable_if<!std::is_void<F>::value, F>::type& val)
53 *this = p.getFuture();
59 typename std::enable_if<!std::is_void<F>::value, F>::type&& val)
62 p.setValue(std::forward<F>(val));
63 *this = p.getFuture();
68 typename std::enable_if<std::is_void<F>::value, int>::type>
69 Future<void>::Future() : core_(nullptr) {
72 *this = p.getFuture();
77 Future<T>::~Future() {
82 void Future<T>::detach() {
84 core_->detachFuture();
90 void Future<T>::throwIfInvalid() const {
97 void Future<T>::setCallback_(F&& func) {
99 core_->setCallback(std::move(func));
106 typename std::enable_if<isFuture<F>::value,
107 Future<typename isFuture<T>::Inner>>::type
108 Future<T>::unwrap() {
109 return then([](Future<typename isFuture<T>::Inner> internal_future) {
110 return internal_future;
116 // Variant: returns a value
117 // e.g. f.then([](Try<T>&& t){ return t.value(); });
119 template <typename F, typename R, bool isTry, typename... Args>
120 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
121 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
122 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
123 typedef typename R::ReturnsFuture::Inner B;
127 // wrap these so we can move them into the lambda
128 folly::MoveWrapper<Promise<B>> p;
129 folly::MoveWrapper<F> funcm(std::forward<F>(func));
131 // grab the Future now before we lose our handle on the Promise
132 auto f = p->getFuture();
134 f.setExecutor(getExecutor());
137 /* This is a bit tricky.
139 We can't just close over *this in case this Future gets moved. So we
140 make a new dummy Future. We could figure out something more
141 sophisticated that avoids making a new Future object when it can, as an
142 optimization. But this is correct.
144 core_ can't be moved, it is explicitly disallowed (as is copying). But
145 if there's ever a reason to allow it, this is one place that makes that
146 assumption and would need to be fixed. We use a standard shared pointer
147 for core_ (by copying it in), which means in essence obj holds a shared
148 pointer to itself. But this shouldn't leak because Promise will not
149 outlive the continuation, because Promise will setException() with a
150 broken Promise if it is destructed before completed. We could use a
151 weak pointer but it would have to be converted to a shared pointer when
152 func is executed (because the Future returned by func may possibly
153 persist beyond the callback, if it gets moved), and so it is an
154 optimization to just make it shared from the get-go.
156 We have to move in the Promise and func using the MoveWrapper
157 hack. (func could be copied but it's a big drag on perf).
159 Two subtle but important points about this design. detail::Core has no
160 back pointers to Future or Promise, so if Future or Promise get moved
161 (and they will be moved in performant code) we don't have to do
162 anything fancy. And because we store the continuation in the
163 detail::Core, not in the Future, we can execute the continuation even
164 after the Future has gone out of scope. This is an intentional design
165 decision. It is likely we will want to be able to cancel a continuation
166 in some circumstances, but I think it should be explicit not implicit
167 in the destruction of the Future used to create it.
170 [p, funcm](Try<T>&& t) mutable {
171 if (!isTry && t.hasException()) {
172 p->setException(std::move(t.exception()));
175 return (*funcm)(t.template get<isTry, Args>()...);
183 // Variant: returns a Future
184 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
186 template <typename F, typename R, bool isTry, typename... Args>
187 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
188 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
189 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
190 typedef typename R::ReturnsFuture::Inner B;
194 // wrap these so we can move them into the lambda
195 folly::MoveWrapper<Promise<B>> p;
196 folly::MoveWrapper<F> funcm(std::forward<F>(func));
198 // grab the Future now before we lose our handle on the Promise
199 auto f = p->getFuture();
201 f.setExecutor(getExecutor());
205 [p, funcm](Try<T>&& t) mutable {
206 if (!isTry && t.hasException()) {
207 p->setException(std::move(t.exception()));
210 auto f2 = (*funcm)(t.template get<isTry, Args>()...);
211 // that didn't throw, now we can steal p
212 f2.setCallback_([p](Try<B>&& b) mutable {
213 p->fulfilTry(std::move(b));
215 } catch (const std::exception& e) {
216 p->setException(exception_wrapper(std::current_exception(), e));
218 p->setException(exception_wrapper(std::current_exception()));
226 template <typename T>
227 template <typename R, typename Caller, typename... Args>
228 Future<typename isFuture<R>::Inner>
229 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
230 typedef typename std::remove_cv<
231 typename std::remove_reference<
232 typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
233 return then([instance, func](Try<T>&& t){
234 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
239 Future<void> Future<T>::then() {
240 return then([] (Try<T>&& t) {});
243 // onError where the callback returns T
246 typename std::enable_if<
247 !detail::Extract<F>::ReturnsFuture::value,
249 Future<T>::onError(F&& func) {
250 typedef typename detail::Extract<F>::FirstArg Exn;
252 std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
253 "Return type of onError callback must be T or Future<T>");
256 auto f = p.getFuture();
257 auto pm = folly::makeMoveWrapper(std::move(p));
258 auto funcm = folly::makeMoveWrapper(std::move(func));
259 setCallback_([pm, funcm](Try<T>&& t) mutable {
260 if (!t.template withException<Exn>([&] (Exn& e) {
265 pm->fulfilTry(std::move(t));
272 // onError where the callback returns Future<T>
275 typename std::enable_if<
276 detail::Extract<F>::ReturnsFuture::value,
278 Future<T>::onError(F&& func) {
280 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
281 "Return type of onError callback must be T or Future<T>");
282 typedef typename detail::Extract<F>::FirstArg Exn;
285 auto f = p.getFuture();
286 auto pm = folly::makeMoveWrapper(std::move(p));
287 auto funcm = folly::makeMoveWrapper(std::move(func));
288 setCallback_([pm, funcm](Try<T>&& t) mutable {
289 if (!t.template withException<Exn>([&] (Exn& e) {
291 auto f2 = (*funcm)(e);
292 f2.setCallback_([pm](Try<T>&& t2) mutable {
293 pm->fulfilTry(std::move(t2));
295 } catch (const std::exception& e2) {
296 pm->setException(exception_wrapper(std::current_exception(), e2));
298 pm->setException(exception_wrapper(std::current_exception()));
301 pm->fulfilTry(std::move(t));
310 Future<T> Future<T>::ensure(F func) {
311 MoveWrapper<F> funcw(std::move(func));
312 return this->then([funcw](Try<T>&& t) {
314 return makeFuture(std::move(t));
320 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
321 auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
322 return within(dur, tk)
323 .onError([funcw](TimedOut const&) { return (*funcw)(); });
327 typename std::add_lvalue_reference<T>::type Future<T>::value() {
330 return core_->getTry().value();
334 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
337 return core_->getTry().value();
341 Try<T>& Future<T>::getTry() {
344 return core_->getTry();
348 Optional<Try<T>> Future<T>::poll() {
350 if (core_->ready()) {
351 o = std::move(core_->getTry());
357 template <typename Executor>
358 inline Future<T> Future<T>::via(Executor* executor) && {
361 setExecutor(executor);
363 return std::move(*this);
367 template <typename Executor>
368 inline Future<T> Future<T>::via(Executor* executor) & {
371 MoveWrapper<Promise<T>> p;
372 auto f = p->getFuture();
373 then([p](Try<T>&& t) mutable { p->fulfilTry(std::move(t)); });
374 return std::move(f).via(executor);
378 bool Future<T>::isReady() const {
380 return core_->ready();
384 void Future<T>::raise(exception_wrapper exception) {
385 core_->raise(std::move(exception));
391 Future<typename std::decay<T>::type> makeFuture(T&& t) {
392 Promise<typename std::decay<T>::type> p;
393 p.setValue(std::forward<T>(t));
394 return p.getFuture();
397 inline // for multiple translation units
398 Future<void> makeFuture() {
401 return p.getFuture();
407 typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
408 -> Future<decltype(func())> {
409 Promise<decltype(func())> p;
414 return p.getFuture();
418 auto makeFutureTry(F const& func) -> Future<decltype(func())> {
420 return makeFutureTry(std::move(copy));
424 Future<T> makeFuture(std::exception_ptr const& e) {
427 return p.getFuture();
431 Future<T> makeFuture(exception_wrapper ew) {
433 p.setException(std::move(ew));
434 return p.getFuture();
437 template <class T, class E>
438 typename std::enable_if<std::is_base_of<std::exception, E>::value,
440 makeFuture(E const& e) {
442 p.setException(make_exception_wrapper<E>(e));
443 return p.getFuture();
447 Future<T> makeFuture(Try<T>&& t) {
448 Promise<typename std::decay<T>::type> p;
449 p.fulfilTry(std::move(t));
450 return p.getFuture();
454 inline Future<void> makeFuture(Try<void>&& t) {
455 if (t.hasException()) {
456 return makeFuture<void>(std::move(t.exception()));
463 template <typename Executor>
464 Future<void> via(Executor* executor) {
465 return makeFuture().via(executor);
470 template <typename... Fs>
471 typename detail::VariadicContext<
472 typename std::decay<Fs>::type::value_type...>::type
476 new detail::VariadicContext<typename std::decay<Fs>::type::value_type...>();
477 ctx->total = sizeof...(fs);
478 auto f_saved = ctx->p.getFuture();
479 detail::whenAllVariadicHelper(ctx,
480 std::forward<typename std::decay<Fs>::type>(fs)...);
486 template <class InputIterator>
489 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
490 whenAll(InputIterator first, InputIterator last)
493 typename std::iterator_traits<InputIterator>::value_type::value_type T;
496 return makeFuture(std::vector<Try<T>>());
498 size_t n = std::distance(first, last);
500 auto ctx = new detail::WhenAllContext<T>();
502 ctx->results.resize(n);
504 auto f_saved = ctx->p.getFuture();
506 for (size_t i = 0; first != last; ++first, ++i) {
509 f.setCallback_([ctx, i, n](Try<T>&& t) {
510 ctx->results[i] = std::move(t);
511 if (++ctx->count == n) {
512 ctx->p.setValue(std::move(ctx->results));
521 template <class InputIterator>
526 std::iterator_traits<InputIterator>::value_type::value_type> > >
527 whenAny(InputIterator first, InputIterator last) {
529 typename std::iterator_traits<InputIterator>::value_type::value_type T;
531 auto ctx = new detail::WhenAnyContext<T>(std::distance(first, last));
532 auto f_saved = ctx->p.getFuture();
534 for (size_t i = 0; first != last; first++, i++) {
536 f.setCallback_([i, ctx](Try<T>&& t) {
537 if (!ctx->done.exchange(true)) {
538 ctx->p.setValue(std::make_pair(i, std::move(t)));
547 template <class InputIterator>
548 Future<std::vector<std::pair<size_t, Try<typename
549 std::iterator_traits<InputIterator>::value_type::value_type>>>>
550 whenN(InputIterator first, InputIterator last, size_t n) {
552 std::iterator_traits<InputIterator>::value_type::value_type T;
553 typedef std::vector<std::pair<size_t, Try<T>>> V;
560 auto ctx = std::make_shared<ctx_t>();
563 // for each completed Future, increase count and add to vector, until we
564 // have n completed futures at which point we fulfil our Promise with the
569 it->then([ctx, n, i](Try<T>&& t) {
571 auto c = ++ctx->completed;
573 assert(ctx->v.size() < n);
574 v.push_back(std::make_pair(i, std::move(t)));
576 ctx->p.fulfilTry(Try<V>(std::move(v)));
586 ctx->p.setException(std::runtime_error("Not enough futures"));
589 return ctx->p.getFuture();
592 template <class It, class T, class F, class ItT, class Arg>
593 typename std::enable_if<!isFutureResult<F, T, Arg>::value, Future<T>>::type
594 reduce(It first, It last, T initial, F func) {
596 return makeFuture(std::move(initial));
599 typedef isTry<Arg> IsTry;
601 return whenAll(first, last)
602 .then([initial, func](std::vector<Try<ItT>>& vals) mutable {
603 for (auto& val : vals) {
604 initial = func(std::move(initial),
605 // Either return a ItT&& or a Try<ItT>&& depending
606 // on the type of the argument of func.
607 val.template get<IsTry::value, Arg&&>());
613 template <class It, class T, class F, class ItT, class Arg>
614 typename std::enable_if<isFutureResult<F, T, Arg>::value, Future<T>>::type
615 reduce(It first, It last, T initial, F func) {
617 return makeFuture(std::move(initial));
620 typedef isTry<Arg> IsTry;
622 auto f = first->then([initial, func](Try<ItT>& head) mutable {
623 return func(std::move(initial),
624 head.template get<IsTry::value, Arg&&>());
627 for (++first; first != last; ++first) {
628 f = whenAll(f, *first).then([func](std::tuple<Try<T>, Try<ItT>>& t) {
629 return func(std::move(std::get<0>(t).value()),
630 // Either return a ItT&& or a Try<ItT>&& depending
631 // on the type of the argument of func.
632 std::get<1>(t).template get<IsTry::value, Arg&&>());
640 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
641 return within(dur, TimedOut(), tk);
646 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
649 Context(E ex) : exception(std::move(ex)), promise(), token(false) {}
652 std::atomic<bool> token;
654 auto ctx = std::make_shared<Context>(std::move(e));
657 tk = folly::detail::getTimekeeperSingleton();
661 .then([ctx](Try<void> const& t) {
662 if (ctx->token.exchange(true) == false) {
663 if (t.hasException()) {
664 ctx->promise.setException(std::move(t.exception()));
666 ctx->promise.setException(std::move(ctx->exception));
671 this->then([ctx](Try<T>&& t) {
672 if (ctx->token.exchange(true) == false) {
673 ctx->promise.fulfilTry(std::move(t));
677 return ctx->promise.getFuture();
681 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
682 return whenAll(*this, futures::sleep(dur, tk))
683 .then([](std::tuple<Try<T>, Try<void>> tup) {
684 Try<T>& t = std::get<0>(tup);
685 return makeFuture<T>(std::move(t));
692 void waitImpl(Future<T>& f) {
693 // short-circuit if there's nothing to do
694 if (f.isReady()) return;
697 f = f.then([&](Try<T> t) {
699 return makeFuture(std::move(t));
703 // There's a race here between the return here and the actual finishing of
704 // the future. f is completed, but the setup may not have finished on done
705 // after the baton has posted.
706 while (!f.isReady()) {
707 std::this_thread::yield();
712 void waitImpl(Future<T>& f, Duration dur) {
713 // short-circuit if there's nothing to do
714 if (f.isReady()) return;
716 auto baton = std::make_shared<Baton<>>();
717 f = f.then([baton](Try<T> t) {
719 return makeFuture(std::move(t));
722 // Let's preserve the invariant that if we did not timeout (timed_wait returns
723 // true), then the returned Future is complete when it is returned to the
724 // caller. We need to wait out the race for that Future to complete.
725 if (baton->timed_wait(std::chrono::system_clock::now() + dur)) {
726 while (!f.isReady()) {
727 std::this_thread::yield();
733 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
734 while (!f.isReady()) {
742 Future<T>& Future<T>::wait() & {
743 detail::waitImpl(*this);
748 Future<T>&& Future<T>::wait() && {
749 detail::waitImpl(*this);
750 return std::move(*this);
754 Future<T>& Future<T>::wait(Duration dur) & {
755 detail::waitImpl(*this, dur);
760 Future<T>&& Future<T>::wait(Duration dur) && {
761 detail::waitImpl(*this, dur);
762 return std::move(*this);
766 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
767 detail::waitViaImpl(*this, e);
772 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
773 detail::waitViaImpl(*this, e);
774 return std::move(*this);
779 return std::move(wait().value());
783 inline void Future<void>::get() {
788 T Future<T>::get(Duration dur) {
791 return std::move(value());
798 inline void Future<void>::get(Duration dur) {
808 T Future<T>::getVia(DrivableExecutor* e) {
809 return std::move(waitVia(e).value());
813 inline void Future<void>::getVia(DrivableExecutor* e) {
818 Future<bool> Future<T>::willEqual(Future<T>& f) {
819 return whenAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
820 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
821 return std::get<0>(t).value() == std::get<1>(t).value();
830 Future<T> Future<T>::filter(F predicate) {
831 auto p = folly::makeMoveWrapper(std::move(predicate));
832 return this->then([p](T val) {
833 T const& valConstRef = val;
834 if (!(*p)(valConstRef)) {
835 throw PredicateDoesNotObtain();
844 Future<Z> chainHelper(Future<Z> f) {
848 template <class Z, class F, class Fn, class... Callbacks>
849 Future<Z> chainHelper(F f, Fn fn, Callbacks... fns) {
850 return chainHelper<Z>(f.then(fn), fns...);
854 template <class A, class Z, class... Callbacks>
855 std::function<Future<Z>(Try<A>)>
856 chain(Callbacks... fns) {
857 MoveWrapper<Promise<A>> pw;
858 MoveWrapper<Future<Z>> fw(chainHelper<Z>(pw->getFuture(), fns...));
859 return [=](Try<A> t) mutable {
860 pw->fulfilTry(std::move(t));
861 return std::move(*fw);
869 // I haven't included a Future<T&> specialization because I don't forsee us
870 // using it, however it is not difficult to add when needed. Refer to
871 // Future<void> for guidance. std::future and boost::future code would also be